Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,12 @@ The pipeline always creates an internal `AbortController` whose signal is passed
to transforms via `TransformCallbackOptions`. If an external signal is provided,
the internal signal follows it. When aborted, on error, or when the consumer
stops iterating, the internal controller is aborted, notifying all transforms.
Aborting the external signal rejects pending and future pulls from the returned
async iterable with the signal's abort reason. If the pipeline aborts while
waiting for an upstream source iterator's `next()` result, the pipeline rejects
the consumer's pending pull and calls the source iterator's `return()` method
when available. This does not require canceling the promise returned by the
upstream `next()` call.

```typescript
function pull(
Expand Down
2 changes: 2 additions & 0 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ The <dfn method for="Stream">pull(source, ...args)</dfn> method creates a [=pull
<li>Let |signal| be |options|["{{PullOptions/signal}}"] if present; otherwise `undefined`.
<li>If |signal| is not `undefined` and [=AbortSignal/aborted=], return an async iterable that immediately throws the abort reason.
<li>Let |pipelineController| be a new {{AbortController}}. If |signal| is not `undefined`, set |pipelineController|'s signal to follow |signal|.
<li>If |pipelineController|'s signal aborts before a pull from the returned async iterable settles, reject that pull with |pipelineController|'s abort reason. Future pulls must also reject with that reason.
<li>If |pipelineController|'s signal aborts while waiting for an upstream source iterator's `next()` result, reject the returned async iterable's pending pull and call that upstream iterator's `return()` method if present. This does not require canceling the promise returned by the upstream `next()` call.
<li>Let |transformOptions| be a {{TransformCallbackOptions}} with signal set to |pipelineController|'s signal.
<li>Let |composed| be the result of [=compose transform pipeline=] with |normalized|, |transforms|, and |transformOptions|.
<li>Return |composed|. On return (consumer breaks), the pipeline controller is aborted.
Expand Down
51 changes: 51 additions & 0 deletions src/pull.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,57 @@ describe('pull()', () => {
await collectBytes(output);
}, /Abort/);
});

it('should reject a pending source next() when aborted', async () => {
const controller = new AbortController();
const reason = new DOMException('Aborted', 'AbortError');
let returned = false;
const source: AsyncIterable<Uint8Array[]> = {
[Symbol.asyncIterator]() {
return {
next() {
return new Promise<IteratorResult<Uint8Array[]>>(() => undefined);
},
return() {
returned = true;
return Promise.resolve({ done: true, value: undefined });
},
};
},
};

const iterator = pull(source, { signal: controller.signal })[Symbol.asyncIterator]();
const next = iterator.next();
controller.abort(reason);

await assert.rejects(next, { name: 'AbortError' });
assert.strictEqual(returned, true);
});

it('should reject future next() calls after abort', async () => {
const controller = new AbortController();
const reason = new DOMException('Aborted', 'AbortError');
const source = from('test');
const iterator = pull(source, { signal: controller.signal })[Symbol.asyncIterator]();

controller.abort(reason);

await assert.rejects(iterator.next(), { name: 'AbortError' });
});

it('should reject future next() calls after abort for sync sources without transforms', async () => {
const controller = new AbortController();
const reason = new DOMException('Aborted', 'AbortError');
const iterator = pull(['a', 'b'], { signal: controller.signal })[Symbol.asyncIterator]();

const first = await iterator.next();
assert.strictEqual(first.done, false);
assert.strictEqual(decode(concatBytes(first.value)), 'a');

controller.abort(reason);

await assert.rejects(iterator.next(), { name: 'AbortError' });
});
});

describe('error handling', () => {
Expand Down
128 changes: 104 additions & 24 deletions src/pull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
type TransformCallbackOptions,
type StatefulTransformFn,
type TransformYield,
type AsyncStreamableYield,
type AsyncTransformResult,
type SyncTransform,
type SyncTransformObject,
Expand Down Expand Up @@ -460,6 +461,76 @@ async function* syncToAsync<T>(source: Iterable<T>): AsyncGenerator<T> {
}
}

/**
* Return the standard abort reason for a signal.
*/
function getAbortReason(signal: AbortSignal): unknown {
return signal.reason ?? new DOMException('Aborted', 'AbortError');
}

/**
* Race an operation with an abort signal.
*/
function raceWithAbort<T>(operation: Promise<T>, signal: AbortSignal): Promise<T> {
if (signal.aborted) {
return Promise.reject(getAbortReason(signal));
}

return new Promise<T>((resolve, reject) => {
const onAbort = () => {
reject(getAbortReason(signal));
};

signal.addEventListener('abort', onAbort, { once: true });

operation.then(
(value) => {
signal.removeEventListener('abort', onAbort);
resolve(value);
},
(error) => {
signal.removeEventListener('abort', onAbort);
reject(error);
}
);
});
}

/**
* Wrap an async iterable so pending next() calls reject on abort.
*/
async function* abortableAsyncIterable<T>(
source: AsyncIterable<T>,
signal: AbortSignal
): AsyncGenerator<T> {
const iterator = source[Symbol.asyncIterator]();
let completed = false;

try {
while (true) {
const result = await raceWithAbort(Promise.resolve(iterator.next()), signal);
if (result.done) {
completed = true;
return;
}
yield result.value;
}
} finally {
if (!completed && typeof iterator.return === 'function') {
try {
const returned = Promise.resolve(iterator.return());
if (!signal.aborted) {
await returned;
} else {
returned.catch(() => undefined);
}
} catch {
// Suppress return() failures during teardown.
}
}
}
}

/**
* Create an async pipeline from source through transforms.
*/
Expand All @@ -470,13 +541,29 @@ async function* createAsyncPipeline(
): AsyncGenerator<Uint8Array[]> {
// Check for abort
if (signal?.aborted) {
throw signal.reason ?? new DOMException('Aborted', 'AbortError');
throw getAbortReason(signal);
}

const controller = new AbortController();
let abortHandler: (() => void) | undefined;
if (signal) {
abortHandler = () => {
try {
controller.abort(getAbortReason(signal));
} catch { /* transform signal listeners may throw — suppress */ }
};
signal.addEventListener('abort', abortHandler);
}

// Normalize source to async
let normalized: AsyncIterable<Uint8Array[]>;
if (isAsyncIterable(source)) {
normalized = normalizeAsyncSource(source);
normalized = normalizeAsyncSource(
abortableAsyncIterable(
source as AsyncIterable<AsyncStreamableYield>,
controller.signal
)
);
} else if (isSyncIterable(source)) {
normalized = syncToAsync(normalizeSyncSource(source as SyncStreamable));
} else {
Expand All @@ -485,28 +572,25 @@ async function* createAsyncPipeline(

// Fast path: no transforms, just yield normalized source directly
if (transforms.length === 0) {
for await (const batch of normalized) {
if (signal?.aborted) {
throw signal.reason ?? new DOMException('Aborted', 'AbortError');
let completed = false;
try {
for await (const batch of abortableAsyncIterable(normalized, controller.signal)) {
yield batch;
}
completed = true;
} finally {
if (!completed && !controller.signal.aborted) {
try {
controller.abort(new DOMException('Aborted', 'AbortError'));
} catch { /* transform signal listeners may throw — suppress */ }
}
if (signal && abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
yield batch;
}
return;
}

// Create internal controller for transform cancellation.
// Note: if signal was already aborted, we threw above — no need to check here.
const controller = new AbortController();
let abortHandler: (() => void) | undefined;
if (signal) {
abortHandler = () => {
try {
controller.abort(signal.reason ?? new DOMException('Aborted', 'AbortError'));
} catch { /* transform signal listeners may throw — suppress */ }
};
signal.addEventListener('abort', abortHandler);
}

// Add flush signal
let current: AsyncIterable<Uint8Array[] | null> = withFlushSignalAsync(normalized);

Expand All @@ -524,11 +608,7 @@ async function* createAsyncPipeline(
// Yield results (filter out null from final output)
let completed = false;
try {
for await (const batch of current) {
// Check for abort on each iteration
if (controller.signal.aborted) {
throw controller.signal.reason ?? new DOMException('Aborted', 'AbortError');
}
for await (const batch of abortableAsyncIterable(current, controller.signal)) {
if (batch !== null) {
yield batch;
}
Expand Down