Skip to content

Commit 7f2aa3d

Browse files
authored
feat(core): HTTP server diagnostics channel utility (#20779)
Add platform-portable building blocks that server SDKs use to instrument incoming HTTP requests without depending on OTel HTTP instrumentation: - `getHttpServerSubscriptions`: diagnostics_channel listener for `http.server.request.start`, set up isolation scope, request data, trace continuation, optional body capture and request-session tracking - if `spans: true` option set, then also create the root server span around the request lifecycle, applying static-asset/status-code filtering, `ignoreIncomingRequests` and `onSpanCreated` hooks - `recordRequestSession`: release-health session aggregation per request - `patchRequestToCaptureBody`: opt-in incoming request body capture Add `kind` field on `StartSpanOptions` so OTel-based SDKs can set SpanKind on the underlying span, and update `headersToDict` to allow `number`-valued headers to support Node.js types. Replace inline `instrumentServer` Proxy/emit-wrapping implementation in node-light's HTTP integration with core's `getHttpServerSubscriptions`, which does the same work (isolation scope, request data, body capture, trace continuation, best-effort transaction name). Replace inline `instrumentServer` Proxy/emit-wrapping in node-core's `httpServerIntegration.ts` with core's `getHttpServerSubscriptions`. OTel-specific concerns (header propagation, double-wrap context guard, `_startSpanCallback` dispatch) move into a `wrapServerEmitRequest` callback that `instrumentServer` invokes inside the per-request lifecycle. Re-export `recordRequestSession` from core so existing test continues to pass. Duplicate request-isolation/session/body-capture plumbing removed, logic now lives in `@sentry/core`'s subscription factory.
1 parent b4983b0 commit 7f2aa3d

11 files changed

Lines changed: 863 additions & 355 deletions

File tree

packages/node-core/src/utils/captureRequestBody.ts renamed to packages/core/src/integrations/http/patch-request-to-capture-body.ts

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
import type { IncomingMessage } from 'node:http';
2-
import type { Scope } from '@sentry/core';
3-
import { debug, getMaxBodyByteLength, type MaxRequestBodySize } from '@sentry/core';
4-
import { DEBUG_BUILD } from '../debug-build';
1+
import type { Scope } from '../../scope';
2+
import { debug } from '../../utils/debug-logger';
3+
import { DEBUG_BUILD } from '../../debug-build';
4+
import type { HttpIncomingMessage } from './types';
5+
import { getMaxBodyByteLength, type MaxRequestBodySize } from '../../utils/request';
56

67
/**
78
* This method patches the request object to capture the body.
8-
* Instead of actually consuming the streamed body ourselves, which has potential side effects,
9-
* we monkey patch `req.on('data')` to intercept the body chunks.
10-
* This way, we only read the body if the user also consumes the body, ensuring we do not change any behavior in unexpected ways.
9+
* Instead of actually consuming the streamed body ourselves, which has
10+
* potential side effects, we monkey patch `req.on('data')` to intercept
11+
* the body chunks. This way, we only read the body if the user also consumes
12+
* the body, ensuring we do not change any behavior in unexpected ways.
1113
*/
1214
export function patchRequestToCaptureBody(
13-
req: IncomingMessage,
15+
req: HttpIncomingMessage,
1416
isolationScope: Scope,
1517
maxIncomingRequestBodySize: Exclude<MaxRequestBodySize, 'none'>,
1618
integrationName: string,
@@ -20,18 +22,16 @@ export function patchRequestToCaptureBody(
2022

2123
DEBUG_BUILD && debug.log(integrationName, 'Patching request.on');
2224

23-
/**
24-
* We need to keep track of the original callbacks, in order to be able to remove listeners again.
25-
* Since `off` depends on having the exact same function reference passed in, we need to be able to map
26-
* original listeners to our wrapped ones.
27-
*/
25+
// keep track of the original callbacks to remove listeners later
26+
// `off` depends on having the exact same function reference passed in,
27+
// so we need to be able to map original listeners to our wrapped ones.
2828
const callbackMap = new WeakMap();
2929

3030
const maxBodySize = getMaxBodyByteLength(maxIncomingRequestBodySize);
3131

3232
try {
3333
// eslint-disable-next-line @typescript-eslint/unbound-method
34-
req.on = new Proxy(req.on, {
34+
req.on = req.addListener = new Proxy(req.on, {
3535
apply: (target, thisArg, args: Parameters<typeof req.on>) => {
3636
const [event, listener, ...restArgs] = args;
3737

@@ -73,7 +73,7 @@ export function patchRequestToCaptureBody(
7373

7474
// Ensure we also remove callbacks correctly
7575
// eslint-disable-next-line @typescript-eslint/unbound-method
76-
req.off = new Proxy(req.off, {
76+
req.off = req.removeListener = new Proxy(req.off, {
7777
apply: (target, thisArg, args: Parameters<typeof req.off>) => {
7878
const [, listener] = args;
7979

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import type { Client } from '../../client';
2+
import { debug } from '../../utils/debug-logger';
3+
import { DEBUG_BUILD } from '../../debug-build';
4+
import type { Scope } from '../../scope';
5+
import type { HttpServerResponse } from './types';
6+
import type { AggregationCounts } from '../../types-hoist/session';
7+
import { safeUnref } from '../../utils/timer';
8+
9+
const clientToRequestSessionAggregatesMap = new WeakMap<
10+
Client,
11+
{ [timestampRoundedToSeconds: string]: { exited: number; crashed: number; errored: number } }
12+
>();
13+
14+
/**
15+
* Starts a session and tracks it in the context of a given isolation scope.
16+
* When the passed response is finished, the session is put into a task and
17+
* is aggregated with other sessions that may happen in a certain time window
18+
* (sessionFlushingDelayMs).
19+
*
20+
* The sessions are always aggregated by the client that is on the current
21+
* scope at the time of ending the response (if there is one).
22+
*/
23+
export function recordRequestSession(
24+
client: Client,
25+
{
26+
requestIsolationScope,
27+
response,
28+
sessionFlushingDelayMS,
29+
}: {
30+
requestIsolationScope: Scope;
31+
response: HttpServerResponse;
32+
sessionFlushingDelayMS?: number;
33+
},
34+
): void {
35+
requestIsolationScope.setSDKProcessingMetadata({
36+
requestSession: { status: 'ok' },
37+
});
38+
39+
response.once('close', () => {
40+
const requestSession = requestIsolationScope.getScopeData().sdkProcessingMetadata.requestSession;
41+
42+
if (client && requestSession) {
43+
DEBUG_BUILD && debug.log(`Recorded request session with status: ${requestSession.status}`);
44+
45+
const roundedDate = new Date();
46+
roundedDate.setSeconds(0, 0);
47+
const dateBucketKey = roundedDate.toISOString();
48+
49+
const existingClientAggregate = clientToRequestSessionAggregatesMap.get(client);
50+
const bucket = existingClientAggregate?.[dateBucketKey] || { exited: 0, crashed: 0, errored: 0 };
51+
bucket[({ ok: 'exited', crashed: 'crashed', errored: 'errored' } as const)[requestSession.status]]++;
52+
53+
if (existingClientAggregate) {
54+
existingClientAggregate[dateBucketKey] = bucket;
55+
} else {
56+
DEBUG_BUILD && debug.log('Opened new request session aggregate.');
57+
const newClientAggregate = { [dateBucketKey]: bucket };
58+
clientToRequestSessionAggregatesMap.set(client, newClientAggregate);
59+
60+
const flushPendingClientAggregates = (): void => {
61+
clearTimeout(timeout);
62+
unregisterClientFlushHook();
63+
clientToRequestSessionAggregatesMap.delete(client);
64+
65+
const aggregatePayload: AggregationCounts[] = Object.entries(newClientAggregate).map(
66+
([timestamp, value]) => ({
67+
started: timestamp,
68+
exited: value.exited,
69+
errored: value.errored,
70+
crashed: value.crashed,
71+
}),
72+
);
73+
client.sendSession({ aggregates: aggregatePayload });
74+
};
75+
76+
const unregisterClientFlushHook = client.on('flush', () => {
77+
DEBUG_BUILD && debug.log('Sending request session aggregate due to client flush');
78+
flushPendingClientAggregates();
79+
});
80+
const timeout = setTimeout(() => {
81+
DEBUG_BUILD && debug.log('Sending request session aggregate due to flushing schedule');
82+
flushPendingClientAggregates();
83+
}, sessionFlushingDelayMS);
84+
safeUnref(timeout);
85+
}
86+
}
87+
});
88+
}

0 commit comments

Comments
 (0)