Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/core/src/core/runTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ export async function runTests(context: Rstest): Promise<void> {
updateSnapshot: context.snapshotManager.options.updateSnapshot,
onCoverageResult: (coverage) => mergedCoverageMap?.merge(coverage),
onTraceEvents: traceRun.onEvents,
coverageMergeWorker: coverageProvider?.coverageMergeWorker,
coverageMergeWorkerStreaming:
coverageProvider?.coverageMergeWorkerStreaming,
});

return {
Expand Down
257 changes: 257 additions & 0 deletions packages/core/src/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { readFile, unlink } from 'node:fs/promises';
import os from 'node:os';
import { monitorEventLoopDelay, performance } from 'node:perf_hooks';
import { fileURLToPath } from 'node:url';
import type { Worker } from 'node:worker_threads';
import type { SnapshotUpdateState } from '@vitest/snapshot';
import { basename, dirname, join, resolve } from 'pathe';
import { getFileTaskId } from '../runtime/runner';
Expand All @@ -22,7 +25,9 @@ import type {
import {
color,
getForceColorEnv,
isDebug,
isDeno,
logger,
needFlagExperimentalDetectModule,
toError,
} from '../utils';
Expand All @@ -39,6 +44,14 @@ const getNumCpus = (): number => {
return os.availableParallelism?.() ?? os.cpus().length;
};

/**
* Minimum number of per-file coverage payloads before end-of-run ingest is
* offloaded to a `worker_threads` pool. Below this, the host parses the handful
* of files itself — spinning up workers would cost more than it saves. See
* issue #1326.
*/
const COVERAGE_MERGE_MIN_FILES = 8;

const parseWorkers = (maxWorkers: string | number): number => {
const parsed = Number.parseInt(maxWorkers.toString(), 10);

Expand Down Expand Up @@ -278,6 +291,19 @@ export const createPool = async ({
onCoverageResult?: (coverage: CoverageMapData) => void;
/** Perfetto trace events forwarded for caller-owned dumping. */
onTraceEvents?: (events: TraceEvent[]) => void;
/**
* Absolute path to the coverage provider's off-main-thread merge worker.
* When set (and enough files are produced), end-of-run coverage ingest runs
* in a `worker_threads` pool instead of on the host event loop. See issue
* #1326.
*/
coverageMergeWorker?: string;
/**
* Whether {@link coverageMergeWorker} supports the streaming ingest
* protocol. Gates the streaming path so a batch-only worker (e.g. v8) is
* never driven in streaming mode. See issue #1326.
*/
coverageMergeWorkerStreaming?: boolean;
}) => Promise<{
results: TestFileResult[];
testResults: TestResult[];
Expand Down Expand Up @@ -369,6 +395,32 @@ export const createPool = async ({
throw `Invalid pool configuration: maxWorkers(${maxWorkers}) cannot be less than minWorkers(${minWorkers}).`;
}

// Opt-in one-shot environment banner (`DEBUG=rstest`, zero overhead
// otherwise). Captures the run fingerprint that explains pool utilization —
// worker kind/count, isolate, coverage provider, CPU count + loadavg, memory
// + heap ceiling, node/platform — so a perf report (e.g. issue #1326) is
// self-contained and we don't have to ask the reporter for CI specs.
if (isDebug()) {
const { coverage } = context.normalizedConfig;
const { getHeapStatistics } = await import('node:v8');
const mb = (bytes: number) => Math.round(bytes / 1024 / 1024);
logger.debug(
`pool: command=${context.command} workerKind=${workerKind} maxWorkers=${maxWorkers} minWorkers=${minWorkers} isolate=${isolate} coverage=${
coverage?.enabled ? coverage.provider : 'disabled'
}`,
);
logger.debug(
`pool: cpus=${numCpus} (${os.cpus()[0]?.model ?? 'unknown'}) loadavg=${os
.loadavg()
.map((n) => n.toFixed(2))
.join('/')} mem(free/total)=${mb(os.freemem())}/${mb(
os.totalmem(),
)}MB heapLimit=${mb(
getHeapStatistics().heap_size_limit,
)}MB node=${process.version} ${process.platform}/${process.arch}`,
);
}

const pool = new Pool({
workerEntry: resolve(__dirname, './worker.js'),
isolate,
Expand Down Expand Up @@ -464,6 +516,8 @@ export const createPool = async ({
updateSnapshot,
onCoverageResult,
onTraceEvents,
coverageMergeWorker,
coverageMergeWorkerStreaming,
}) => {
const projectName = project.name;
const runtimeConfig = getRuntimeConfig(project);
Expand All @@ -473,6 +527,75 @@ export const createPool = async ({
});
const setupAssets = setupEntries.flatMap((entry) => entry.files || []);

// [#1326] Paths to per-file coverage JSON written to disk by test workers.
// Collected during the run (cheap) and ingested off the host event loop
// once all workers have exited (see end-of-run block below).
const coverageFiles: string[] = [];

// Opt-in diagnostics (`DEBUG=rstest`). Zero overhead otherwise. Measures
// host event-loop saturation during the run plus the end-of-run coverage
// ingest cost, so a slow/under-utilized run can be attributed without
// another round-trip: high event-loop delay ⇒ the host loop is the
// bottleneck (e.g. coverage being merged inline); low delay but low CPU
// ⇒ the cost is worker-side (instrumentation / fork churn). See #1326.
const diag = isDebug();
const eld = diag ? monitorEventLoopDelay({ resolution: 20 }) : undefined;
eld?.enable();

// [#1326 follow-up — experimental, RSTEST_COV_INGEST=stream] Streaming
// ingest: a single long-lived merge thread consumes per-file coverage
// paths AS THEY ARRIVE and unlinks each temp file immediately, so the
// corpus never accumulates on disk and only ONE deduped map is ever
// resident — vs the end-of-run fan-out which materializes the whole
// corpus and up to N partial maps (N× amplification). The host loop still
// only handles path strings, so utilization stays high.
// Streaming is gated on a provider CAPABILITY flag, not the mere presence
// of a merge worker — a batch-only worker (v8) must never be handed the
// streaming protocol (it would crash async and silently drop coverage).
// Default ON for capable providers; `RSTEST_COV_INGEST=batch` forces the
// #1348 end-of-run fan-out, `=stream` is explicit opt-in.
let streamingIngest =
process.env.RSTEST_COV_INGEST !== 'batch' &&
!!coverageMergeWorker &&
coverageMergeWorkerStreaming === true;
let streamWorker: Worker | undefined;
let streamFinal: Promise<CoverageMapData | undefined> | undefined;
let streamedCount = 0;
if (streamingIngest) {
try {
const { Worker } = await import('node:worker_threads');
streamWorker = new Worker(coverageMergeWorker!, {
workerData: { streaming: true },
});
streamFinal = new Promise<CoverageMapData | undefined>(
(resolveFinal, rejectFinal) => {
let settled = false;
streamWorker!.once('message', (m: CoverageMapData) => {
settled = true;
resolveFinal(m);
});
streamWorker!.once('error', (e) => {
settled = true;
rejectFinal(e);
});
// Unlike the #1348 fan-out, register `exit` too: a worker that
// dies without posting (OOM-kill, load failure) can never orphan
// the awaiter — it rejects instead of hanging forever.
streamWorker!.once('exit', (code) => {
if (!settled)
rejectFinal(
new Error(`coverage merge worker exited (code ${code})`),
);
});
},
);
} catch {
// worker_threads unavailable / merge worker failed to load: fall back
// to the batch path by collecting paths into `coverageFiles`.
streamingIngest = false;
}
}

const results = await Promise.all(
entries.map(async (entryInfo, index) => {
const task = await buildTask({
Expand Down Expand Up @@ -500,6 +623,22 @@ export const createPool = async ({
);
});

// [#1326] When the provider supports it, the worker shipped only a
// path to its on-disk coverage; collect it (cheap) and defer all
// read + parse + merge to end-of-run, off the scheduling loop, so no
// per-file coverage graph is deserialized on the host during the run.
const covFile = result.coverageFile;
if (covFile) {
if (streamingIngest && streamWorker) {
// Hand the path to the long-lived consumer immediately (cheap —
// the host only posts a string; the worker reads+merges+unlinks).
streamWorker.postMessage({ type: 'file', path: covFile });
streamedCount++;
} else {
coverageFiles.push(covFile);
}
delete result.coverageFile;
}
if (result.coverage) {
onCoverageResult?.(result.coverage);
delete result.coverage;
Expand All @@ -514,6 +653,124 @@ export const createPool = async ({
}),
);

if (diag && eld) {
eld.disable();
const ms = (ns: number) => (ns / 1e6).toFixed(1);
logger.debug(
`pool(${projectName}): host event-loop delay during run — mean=${ms(eld.mean)}ms p99=${ms(eld.percentile(99))}ms max=${ms(eld.max)}ms (high p99/max ⇒ the host loop is the bottleneck)`,
);
}

// [#1326 follow-up] Streaming ingest: most merging already happened during
// the run; just drain the consumer's small backlog, take the single
// merged map, and tear it down. No corpus on disk, no N× amplification.
if (streamingIngest && streamWorker && streamFinal) {
const ingestStart = diag ? performance.now() : 0;
streamWorker.postMessage({ type: 'done' });
const finalMap = await streamFinal.catch((error) => {
// The streaming consumer already unlinked the temp files it merged, so
// we cannot fall back to a batch re-read here. Surface a real WARNING
// (not a debug-only line) so a partial/empty coverage report is never
// silent — the user can re-run with RSTEST_COV_INGEST=batch.
logger.warn(
`coverage(${projectName}): streaming ingest failed (${
toError(error).message
}) — coverage for this project may be incomplete. Re-run with RSTEST_COV_INGEST=batch to use the end-of-run merge.`,
);
return undefined;
});
if (finalMap) {
onCoverageResult?.(finalMap);
}
await streamWorker.terminate();
if (diag) {
logger.debug(
`coverage(${projectName}): ingest strategy=streaming files=${streamedCount} took=${(performance.now() - ingestStart).toFixed(0)}ms (drain tail)`,
);
}
} else if (coverageFiles.length) {
// [#1326] Off-main-thread coverage ingest. All test workers have exited
// (Promise.all resolved), so reading + parsing + merging the per-file
// coverage now runs off the scheduling critical path — a terminal tail,
// not a during-run plateau. With a provider-supplied merge worker, the
// expensive JSON.parse runs in a worker_threads pool and only a few small
// merged partials cross back to the host.
const ingestStart = diag ? performance.now() : 0;
let ingestStrategy: 'worker-threads' | 'main-thread' = 'main-thread';
let ingestThreads = 0;
let ingested = false;
if (
coverageMergeWorker &&
coverageFiles.length >= COVERAGE_MERGE_MIN_FILES
) {
try {
const { Worker } = await import('node:worker_threads');
const threadCount = Math.min(getNumCpus(), coverageFiles.length);
ingestThreads = threadCount;
const chunks: string[][] = Array.from(
{ length: threadCount },
() => [],
);
coverageFiles.forEach((file, i) =>
chunks[i % threadCount]!.push(file),
);
const partials = await Promise.all(
chunks
.filter((chunk) => chunk.length)
.map(
(files) =>
new Promise<CoverageMapData>(
(resolveChunk, rejectChunk) => {
const worker = new Worker(coverageMergeWorker, {
workerData: { files },
});
worker.once('message', (partial: CoverageMapData) => {
worker.terminate();
resolveChunk(partial);
});
worker.once('error', rejectChunk);
},
),
),
);
for (const partial of partials) {
onCoverageResult?.(partial);
}
ingestStrategy = 'worker-threads';
ingested = true;
} catch (error) {
// worker_threads unavailable or the merge worker failed to load:
// fall back to a host-side parse of the same files below.
logger.debug(
`coverage(${projectName}): worker-threads ingest failed (${
toError(error).message
}) — falling back to main-thread parse`,
);
}
}
if (!ingested) {
const parsed = await Promise.all(
coverageFiles.map(
async (file) =>
JSON.parse(await readFile(file, 'utf8')) as CoverageMapData,
),
);
for (const coverage of parsed) {
onCoverageResult?.(coverage);
}
}
await Promise.all(
coverageFiles.map((file) => unlink(file).catch(() => {})),
);
if (diag) {
logger.debug(
`coverage(${projectName}): ingest strategy=${ingestStrategy}` +
`${ingestStrategy === 'worker-threads' ? ` threads=${ingestThreads}` : ''}` +
` files=${coverageFiles.length} took=${(performance.now() - ingestStart).toFixed(0)}ms`,
);
}
}

for (const result of results) {
if (result.snapshotResult) {
context.snapshotManager.add(result.snapshotResult);
Expand Down
27 changes: 23 additions & 4 deletions packages/core/src/runtime/worker/runInPool.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { FileCoverageData } from 'istanbul-lib-coverage';
import { randomUUID } from 'node:crypto';
import { writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { isMainThread, threadId } from 'node:worker_threads';
import { join } from 'pathe';
import { install } from 'source-map-support';
import type {
MaybePromise,
Expand Down Expand Up @@ -652,13 +656,28 @@ export const runInPool = async (
outputModule: options.context.outputModule,
});
if (coverageMap) {
// Attach coverage data to test result
results.coverage = {};
// Build the plain coverage object (the istanbul `CoverageMapData` shape).
const covObj: Record<string, FileCoverageData> = {};
Object.entries(coverageMap.toJSON()).forEach(([key, value]) => {
if ('toJSON' in value)
results.coverage![key] = value.toJSON() as FileCoverageData;
else results.coverage![key] = value;
covObj[key] = value.toJSON() as FileCoverageData;
else covObj[key] = value;
});
// [#1326] When the provider supplies an off-main-thread merge worker,
// write this file's coverage to disk and ship only the PATH. This keeps
// the host from V8-deserializing a full per-file coverage object graph
// on its single event loop during the run — the measured cause of the
// worker-pool plateau when coverage is enabled. A `randomUUID` filename
// is collision-proof across pool kinds (forks share nothing, but threads
// share `process.pid` and `taskId` resets per project). Providers without
// a merge worker keep shipping the inline IPC payload.
if (coverageProvider.coverageMergeWorker) {
const file = join(tmpdir(), `rstest-cov-${randomUUID()}.json`);
await writeFile(file, JSON.stringify(covObj));
results.coverageFile = file;
} else {
results.coverage = covObj;
}
}
}

Expand Down
Loading