Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
734 changes: 729 additions & 5 deletions package-lock.json

Large diffs are not rendered by default.

25 changes: 22 additions & 3 deletions packages/cli/src/commands/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,29 @@ const mockCommand: CommandModule = {
description: `Provide a seed so that Prism generates dynamic examples deterministically`,
string: true,
demandOption: true,
default: null
default: null,
},
}),
handler: async parsedArgs => {
parsedArgs.jsonSchemaFakerFillProperties = parsedArgs['json-schema-faker-fillProperties'];
const { multiprocess, dynamic, port, host, cors, document, errors, verboseLevel, ignoreExamples, seed, jsonSchemaFakerFillProperties } =
parsedArgs as unknown as CreateMockServerOptions;
parsedArgs.otelExporterUrl = parsedArgs['otel-exporter-url'];
parsedArgs.otelServiceName = parsedArgs['otel-service-name'];
const {
multiprocess,
dynamic,
port,
host,
cors,
document,
errors,
verboseLevel,
ignoreExamples,
seed,
jsonSchemaFakerFillProperties,
telemetry,
otelExporterUrl,
otelServiceName,
} = parsedArgs as unknown as CreateMockServerOptions;

const createPrism = multiprocess ? createMultiProcessPrism : createSingleProcessPrism;
const options = {
Expand All @@ -59,6 +75,9 @@ const mockCommand: CommandModule = {
ignoreExamples,
seed,
jsonSchemaFakerFillProperties,
telemetry,
otelExporterUrl,
otelServiceName,
};

await runPrismAndSetupWatcher(createPrism, options);
Expand Down
9 changes: 7 additions & 2 deletions packages/cli/src/commands/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const proxyCommand: CommandModule = {
.coerce('upstream', (value: string) => {
try {
return new URL(value);
} catch (e) {
} catch {
throw new Error(`Invalid upstream URL provided: ${value}`);
}
})
Expand All @@ -39,6 +39,8 @@ const proxyCommand: CommandModule = {
}),
handler: async parsedArgs => {
parsedArgs.validateRequest = parsedArgs['validate-request'];
parsedArgs.otelExporterUrl = parsedArgs['otel-exporter-url'];
parsedArgs.otelServiceName = parsedArgs['otel-service-name'];
const p: CreateProxyServerOptions = pick(
parsedArgs as unknown as CreateProxyServerOptions,
'dynamic',
Expand All @@ -54,7 +56,10 @@ const proxyCommand: CommandModule = {
'ignoreExamples',
'seed',
'upstreamProxy',
'jsonSchemaFakerFillProperties'
'jsonSchemaFakerFillProperties',
'telemetry',
'otelExporterUrl',
'otelServiceName'
);

const createPrism = p.multiprocess ? createMultiProcessPrism : createSingleProcessPrism;
Expand Down
18 changes: 18 additions & 0 deletions packages/cli/src/commands/sharedOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ const sharedOptions: Dictionary<Options> = {
// custom levels like "success" and "start" are set to the same severity value as "info"
choices: Object.keys(pino.levels.values).concat('silent'),
},

telemetry: {
description: 'Enable OpenTelemetry tracing. Can also be enabled with the PRISM_TELEMETRY env var.',
boolean: true,
default: false,
},

'otel-exporter-url': {
description:
'OTLP collector endpoint, e.g. http://localhost:4318/v1/traces. Falls back to the OTEL_EXPORTER_OTLP_ENDPOINT env var.',
string: true,
},

'otel-service-name': {
description: 'service.name reported to the collector. Falls back to the OTEL_SERVICE_NAME env var, then "prism".',
string: true,
default: 'prism',
},
};

export default sharedOptions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { registerTelemetryShutdown } from '../createServer';
import { ITelemetry } from '@stoplight/prism-http-server';
import { Logger } from 'pino';

describe('registerTelemetryShutdown', () => {
const signals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM'];

// Remove any listeners our helper registered so tests don't leak handlers into each other.
afterEach(() => {
signals.forEach(signal => process.removeAllListeners(signal));
});

const makeLogger = () => ({ error: jest.fn() }) as unknown as Logger;

it.each(signals)('flushes telemetry and exits when %s is received', async signal => {
const shutdown = jest.fn(() => Promise.resolve());
const telemetry: ITelemetry = { shutdown };
const exit = jest.fn();

registerTelemetryShutdown(telemetry, makeLogger(), exit);

// Drive the registered handler and wait for its async flush to settle.
await Promise.all(process.listeners(signal).map(listener => (listener as () => unknown)()));

expect(shutdown).toHaveBeenCalledTimes(1);
expect(exit).toHaveBeenCalledWith(0);
});

it('only flushes once even if multiple signals fire', async () => {
const shutdown = jest.fn(() => Promise.resolve());
const exit = jest.fn();

registerTelemetryShutdown({ shutdown }, makeLogger(), exit);

await Promise.all(process.listeners('SIGINT').map(listener => (listener as () => unknown)()));
await Promise.all(process.listeners('SIGTERM').map(listener => (listener as () => unknown)()));

expect(shutdown).toHaveBeenCalledTimes(1);
expect(exit).toHaveBeenCalledTimes(1);
});

it('logs an error but still exits when shutdown rejects', async () => {
const shutdown = jest.fn(() => Promise.reject(new Error('export failed')));
const logger = makeLogger();
const exit = jest.fn();

registerTelemetryShutdown({ shutdown }, logger, exit);

await Promise.all(process.listeners('SIGTERM').map(listener => (listener as () => unknown)()));

expect(logger.error).toHaveBeenCalledWith(expect.stringContaining('export failed'));
expect(exit).toHaveBeenCalledWith(0);
});
});
45 changes: 42 additions & 3 deletions packages/cli/src/util/createServer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createLogger } from '@stoplight/prism-core';
import { IHttpConfig, IHttpRequest } from '@stoplight/prism-http';
import { createServer as createHttpServer } from '@stoplight/prism-http-server';
import { createServer as createHttpServer, initTelemetry, ITelemetry } from '@stoplight/prism-http-server';
import * as chalk from 'chalk';
import cluster from 'node:cluster';
import * as E from 'fp-ts/Either';
Expand Down Expand Up @@ -100,10 +100,21 @@ async function createPrismServerWithLogger(options: CreateBaseServerOptions, log
}
: { ...shared, isProxy: false };

const telemetryEnabled = options.telemetry || process.env.PRISM_TELEMETRY === 'true';
if (telemetryEnabled) {
const telemetry = initTelemetry({
enabled: true,
exporterUrl: options.otelExporterUrl,
serviceName: options.otelServiceName,
});
registerTelemetryShutdown(telemetry, logInstance);
}

const server = createHttpServer(operations, {
cors: options.cors,
config,
components: { logger: logInstance.child({ name: 'HTTP SERVER' }) },
telemetry: telemetryEnabled,
});

const address = await server.listen(options.port, options.host);
Expand Down Expand Up @@ -139,8 +150,8 @@ function pipeOutputToSignale(stream: Readable) {
try {
const repairedJson = jsonrepair(chunk);
return JSON.parse(repairedJson);
} catch (error) {
signale.await({ prefix: chalk.bgWhiteBright.black('[CLI]'), message: 'Invalid JSON and unable to correct'});
} catch {
signale.await({ prefix: chalk.bgWhiteBright.black('[CLI]'), message: 'Invalid JSON and unable to correct' });
}
})
)
Expand All @@ -153,6 +164,31 @@ function isProxyServerOptions(options: CreateBaseServerOptions): options is Crea
return 'upstream' in options;
}

/**
* Flushes and shuts down the OpenTelemetry SDK on process termination so that spans buffered by
* the BatchSpanProcessor are exported instead of being dropped when Prism exits. The `exit`
* callback is injectable so the shutdown sequence can be unit-tested without terminating the
* test process.
*/
export function registerTelemetryShutdown(
telemetry: ITelemetry,
logInstance: pino.Logger,
exit: (code: number) => void = code => process.exit(code)
): void {
let shuttingDown = false;
const flushAndExit = () => {
if (shuttingDown) return;
shuttingDown = true;
return telemetry
.shutdown()
.catch((e: Error) => logInstance.error(`Error shutting down OpenTelemetry: ${e.message}`))
.finally(() => exit(0));
};

process.once('SIGINT', flushAndExit);
process.once('SIGTERM', flushAndExit);
}

/**
* @property {boolean} jsonSchemaFakerFillProperties - Used to override the default json-schema-faker extension value
*/
Expand All @@ -168,6 +204,9 @@ type CreateBaseServerOptions = {
ignoreExamples: boolean;
seed: string;
jsonSchemaFakerFillProperties: boolean;
telemetry: boolean;
otelExporterUrl?: string;
otelServiceName?: string;
};

export interface CreateProxyServerOptions extends CreateBaseServerOptions {
Expand Down
6 changes: 6 additions & 0 deletions packages/http-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
"access": "public"
},
"dependencies": {
"@opentelemetry/api": "^1.9.1",
"@opentelemetry/exporter-trace-otlp-http": "^0.219.0",
"@opentelemetry/instrumentation-http": "^0.219.0",
"@opentelemetry/resources": "^2.8.0",
"@opentelemetry/sdk-node": "^0.219.0",
"@opentelemetry/semantic-conventions": "^1.41.0",
"@stoplight/prism-core": "^5.15.11",
"@stoplight/prism-http": "^5.15.11",
"@stoplight/types": "^14.1.0",
Expand Down
32 changes: 32 additions & 0 deletions packages/http-server/src/__tests__/telemetry.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { initTelemetry } from '../telemetry';

describe('initTelemetry', () => {
describe('when disabled', () => {
it('returns a no-op handle without starting the SDK', async () => {
const telemetry = initTelemetry({ enabled: false });

// shutdown should resolve immediately and not throw
await expect(telemetry.shutdown()).resolves.toBeUndefined();
});
});

describe('when enabled', () => {
let telemetry: ReturnType<typeof initTelemetry>;

afterEach(async () => {
if (telemetry) {
await telemetry.shutdown();
}
});

it('starts the SDK and returns a shutdown handle', () => {
telemetry = initTelemetry({
enabled: true,
exporterUrl: 'http://localhost:4318/v1/traces',
serviceName: 'prism-test',
});

expect(typeof telemetry.shutdown).toBe('function');
});
});
});
1 change: 1 addition & 0 deletions packages/http-server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { createServer } from './server';
export { initTelemetry, ITelemetry, ITelemetryConfig } from './telemetry';
38 changes: 36 additions & 2 deletions packages/http-server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import { pipe } from 'fp-ts/function';
import * as TE from 'fp-ts/TaskEither';
import * as E from 'fp-ts/Either';
import * as IOE from 'fp-ts/IOEither';
import { SpanStatusCode, trace, type Span } from '@opentelemetry/api';

const tracer = trace.getTracer('@stoplight/prism-http-server');

function searchParamsToNameValues(searchParams: URLSearchParams): IHttpNameValues {
const params = {};
Expand Down Expand Up @@ -81,7 +84,7 @@ function parseRequestBody(request: IncomingMessage) {
export const createServer = (operations: IHttpOperation[], opts: IPrismHttpServerOpts): IPrismHttpServer => {
const { components, config } = opts;

const handler: MicriHandler = async (request, reply) => {
const handleRequest = async (request: IncomingMessage, reply: ServerResponse, span?: Span) => {
const { url, method, headers } = request;

const body = await parseRequestBody(request).catch(async e => {
Expand Down Expand Up @@ -112,14 +115,23 @@ export const createServer = (operations: IHttpOperation[], opts: IPrismHttpServe
body,
};

if (span) {
span.updateName(`${input.method.toUpperCase()} ${input.url.path}`);
span.setAttribute('http.request.method', input.method.toUpperCase());
span.setAttribute('url.path', input.url.path);
}

components.logger.info({ input }, 'Request received');

const requestConfig: E.Either<Error, IHttpConfig> = pipe(
getHttpConfigFromRequest(input),
E.map(operationSpecificConfig => ({ ...config, mock: merge(config.mock, operationSpecificConfig) }))
);

pipe(
// The pipeline writes the response via `send()` itself and resolves to an Either<Error, void>.
// We await it (so the span can stay open until the response is written) but intentionally do not
// return the resolved value, otherwise Micri would try to send it as a second response body.
await pipe(
TE.fromEither(requestConfig),
TE.chain(requestConfig => prism.request(input, operations, requestConfig)),
TE.chainIOEitherK(response => {
Expand Down Expand Up @@ -167,6 +179,8 @@ export const createServer = (operations: IHttpOperation[], opts: IPrismHttpServe
output.statusCode,
serialize(output.body, reply.getHeader('content-type') as string | undefined)
);

if (span) span.setAttribute('http.response.status_code', output.statusCode);
}, E.toError)
);
}),
Expand All @@ -182,11 +196,31 @@ export const createServer = (operations: IHttpOperation[], opts: IPrismHttpServe
reply.end();
}

if (span) {
span.setAttribute('http.response.status_code', e.status || 500);
span.recordException(e);
span.setStatus({ code: SpanStatusCode.ERROR, message: e.message });
}

components.logger.error({ input }, `Request terminated with error: ${e}`);
})
)();
};

const handler: MicriHandler = (request, reply) => {
if (!opts.telemetry) {
return handleRequest(request, reply);
}

return tracer.startActiveSpan('prism.request', async span => {
try {
return await handleRequest(request, reply, span);
} finally {
span.end();
}
});
};

function setCommonCORSHeaders(incomingHeaders: IncomingHttpHeaders, res: ServerResponse) {
res.setHeader('Access-Control-Allow-Origin', incomingHeaders['origin'] || '*');
res.setHeader('Access-Control-Allow-Headers', incomingHeaders['access-control-request-headers'] || '*');
Expand Down
Loading
Loading