Skip to content

Commit 783785d

Browse files
committed
stream: preserve AsyncLocalStorage context in finished()
1 parent 964e41c commit 783785d

File tree

2 files changed

+63
-14
lines changed

2 files changed

+63
-14
lines changed

lib/internal/streams/end-of-stream.js

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ const {
4343
willEmitClose: _willEmitClose,
4444
kIsClosedPromise,
4545
} = require('internal/streams/utils');
46+
47+
const { AsyncResource } = require('async_hooks');
48+
49+
// Lazy load
4650
let addAbortListener;
4751

4852
function isRequest(stream) {
@@ -63,7 +67,13 @@ function eos(stream, options, callback) {
6367
validateFunction(callback, 'callback');
6468
validateAbortSignal(options.signal, 'options.signal');
6569

66-
callback = once(callback);
70+
const originalCallback = callback;
71+
72+
const resource = new AsyncResource('stream:finished');
73+
74+
callback = once(function(err) {
75+
resource.runInAsyncScope(originalCallback, stream, err);
76+
});
6777

6878
if (isReadableStream(stream) || isWritableStream(stream)) {
6979
return eosWeb(stream, options, callback);
@@ -109,7 +119,7 @@ function eos(stream, options, callback) {
109119
}
110120

111121
if (!readable || readableFinished) {
112-
callback.call(stream);
122+
callback();
113123
}
114124
};
115125

@@ -128,12 +138,12 @@ function eos(stream, options, callback) {
128138
}
129139

130140
if (!writable || writableFinished) {
131-
callback.call(stream);
141+
callback();
132142
}
133143
};
134144

135145
const onerror = (err) => {
136-
callback.call(stream, err);
146+
callback(err);
137147
};
138148

139149
let closed = isClosed(stream);
@@ -144,21 +154,19 @@ function eos(stream, options, callback) {
144154
const errored = isWritableErrored(stream) || isReadableErrored(stream);
145155

146156
if (errored && typeof errored !== 'boolean') {
147-
return callback.call(stream, errored);
157+
return callback(errored);
148158
}
149159

150160
if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
151161
if (!isReadableFinished(stream, false))
152-
return callback.call(stream,
153-
new ERR_STREAM_PREMATURE_CLOSE());
162+
return callback(new ERR_STREAM_PREMATURE_CLOSE());
154163
}
155164
if (writable && !writableFinished) {
156165
if (!isWritableFinished(stream, false))
157-
return callback.call(stream,
158-
new ERR_STREAM_PREMATURE_CLOSE());
166+
return callback(new ERR_STREAM_PREMATURE_CLOSE());
159167
}
160168

161-
callback.call(stream);
169+
callback();
162170
};
163171

164172
const onclosed = () => {
@@ -167,10 +175,10 @@ function eos(stream, options, callback) {
167175
const errored = isWritableErrored(stream) || isReadableErrored(stream);
168176

169177
if (errored && typeof errored !== 'boolean') {
170-
return callback.call(stream, errored);
178+
return callback(errored);
171179
}
172180

173-
callback.call(stream);
181+
callback();
174182
};
175183

176184
const onrequest = () => {
@@ -309,15 +317,20 @@ function finished(stream, opts) {
309317
validateBoolean(opts.cleanup, 'cleanup');
310318
autoCleanup = opts.cleanup;
311319
}
320+
321+
const resource = new AsyncResource('stream:finished:promise');
322+
312323
return new Promise((resolve, reject) => {
313324
const cleanup = eos(stream, opts, (err) => {
314325
if (autoCleanup) {
315326
cleanup();
316327
}
328+
329+
// Use the AsyncResource to call resolve/reject
317330
if (err) {
318-
reject(err);
331+
resource.runInAsyncScope(reject, undefined, err);
319332
} else {
320-
resolve();
333+
resource.runInAsyncScope(resolve);
321334
}
322335
});
323336
});
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const http = require('http');
6+
const { AsyncLocalStorage } = require('async_hooks');
7+
const { finished } = require('stream');
8+
9+
// This test verifies that AsyncLocalStorage context is maintained
10+
// when using stream.finished()
11+
12+
const als = new AsyncLocalStorage();
13+
const store = { foo: 'bar' };
14+
15+
{
16+
const server = http.createServer(common.mustCall((req, res) => {
17+
als.run(store, () => {
18+
finished(res, common.mustCall(() => {
19+
assert.strictEqual(als.getStore()?.foo, 'bar');
20+
}));
21+
});
22+
23+
setTimeout(() => res.end(), 0);
24+
}));
25+
26+
server.listen(0, common.mustCall(() => {
27+
const port = server.address().port;
28+
29+
http.get(`http://localhost:${port}`, common.mustCall((res) => {
30+
res.resume();
31+
res.on('end', common.mustCall(() => {
32+
server.close();
33+
}));
34+
}));
35+
}));
36+
}

0 commit comments

Comments
 (0)