-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathduplex.ts
More file actions
103 lines (95 loc) · 2.76 KB
/
Copy pathduplex.ts
File metadata and controls
103 lines (95 loc) · 2.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* Duplex channel implementation for bidirectional streaming.
*
* Creates a pair of connected channels where data written to one
* channel's writer appears in the other channel's readable.
*/
import { push } from './push.js';
import type { DuplexChannel, DuplexOptions, Writer } from './types.js';
/**
* Create a pair of connected duplex channels for bidirectional communication.
*
* Similar to Unix socketpair() - creates two endpoints where data written
* to one endpoint's writer appears in the other endpoint's readable.
*
* @param options - Optional configuration for both channels
* @returns A tuple of two connected DuplexChannel instances
*
* @example
* ```typescript
* const [client, server] = Stream.duplex();
*
* // Server echoes back what it receives
* (async () => {
* await using srv = server;
* for await (const chunks of srv.readable) {
* await srv.writer.writev(chunks);
* }
* })();
*
* // Client sends and receives
* {
* await using conn = client;
* await conn.writer.write('Hello');
* for await (const chunks of conn.readable) {
* console.log(new TextDecoder().decode(chunks[0])); // "Hello"
* break;
* }
* }
* ```
*/
export function duplex(options?: DuplexOptions): [DuplexChannel, DuplexChannel] {
const { highWaterMark, backpressure, signal, a, b } = options ?? {};
// Channel A writes to B's readable (A→B direction)
const { writer: aWriter, readable: bReadable } = push({
highWaterMark: a?.highWaterMark ?? highWaterMark,
backpressure: a?.backpressure ?? backpressure,
signal,
});
// Channel B writes to A's readable (B→A direction)
const { writer: bWriter, readable: aReadable } = push({
highWaterMark: b?.highWaterMark ?? highWaterMark,
backpressure: b?.backpressure ?? backpressure,
signal,
});
// Track closed state for idempotency
let aWriterRef: Writer | null = aWriter;
let bWriterRef: Writer | null = bWriter;
const channelA: DuplexChannel = {
get writer() {
return aWriter;
},
readable: aReadable,
async close() {
if (aWriterRef === null) return;
const writer = aWriterRef;
aWriterRef = null;
// Try sync first, fall back to async
if (writer.endSync() < 0) {
await writer.end();
}
},
[Symbol.asyncDispose]() {
return this.close();
},
};
const channelB: DuplexChannel = {
get writer() {
return bWriter;
},
readable: bReadable,
async close() {
if (bWriterRef === null) return;
const writer = bWriterRef;
bWriterRef = null;
// Try sync first, fall back to async
if (writer.endSync() < 0) {
await writer.end();
}
},
[Symbol.asyncDispose]() {
return this.close();
},
};
return [channelA, channelB];
}