-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.ts
520 lines (466 loc) · 15.4 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
export type IncompleteJsonOptions = {
/**
* When enabled, strings will only be emitted when their full value has been read.
*/
prohibitPartialStrings?: boolean
/**
* When enabled, numbers will only be emitted when their full value has been read.
*
* *Note*: In plain parse mode, if the JSON consists of only a single numeric value and this option is enabled,
* the value will *never* be emitted, as it is impossible to tell whether the number is complete.
* This is not the case in ReadableStream mode, as the ending is explicitly signaled.
*/
prohibitPartialNumbers?: boolean
}
export class IncompleteJson<T> {
/**
* Parse a prefix of a JSON serialized string into as much data as possible
* Options available to ensure atomic string/number parsing and mark completed objects
*/
static parse<T>(
string: string,
options?: IncompleteJsonOptions,
): Incomplete<T> {
if (!string) return undefined
const parser = new IncompleteJson<T>(options)
parser.addChunk(string)
if (!options?.prohibitPartialNumbers || !string.match(/(\d|\.)$/)) {
parser.done()
}
return parser.readValue()
}
/**
* Parse a ReadableStream<string> of JSON data into a ReadableStream<TData> of
* as much data as can be extracted from the stream at each moment in time.
*/
static fromReadable<T>(
readable: ReadableStream<string>,
options?: IncompleteJsonOptions,
): ReadableStream<Incomplete<T>> {
const parser = new IncompleteJson<T>(options)
let prior: Incomplete<T>
const transformer = new TransformStream<string, Incomplete<T>>({
start() {},
transform(chunk, controller) {
parser.addChunk(chunk)
const next = parser.readValue()
if (next !== prior) {
controller.enqueue(next)
prior = next
}
},
flush(controller) {
parser.done()
const next = parser.readValue()
if (next !== prior) {
controller.enqueue(next)
prior = next
}
},
})
return readable.pipeThrough(transformer)
}
private consumed = ""
private unconsumed = ""
private inString = false
private inNumber = false
private charsNeededToClose: string = ""
private context: ("key" | "val" | "arr")[] = []
private isDone = false
private truncationInfo:
| {
index: number
append: string
result?: Incomplete<T>
}
| undefined = undefined
private internalObjectStreamComplete: string
private internalObjectRawLiteral = "value"
constructor(private options?: IncompleteJsonOptions) {
this.internalObjectStreamComplete =
"__" +
Number(String(Math.random()).slice(2)).toString(36) +
Number(String(Math.random()).slice(2)).toString(36) +
Number(String(Math.random()).slice(2)).toString(36)
}
/**
* Add a chunk of data to the stream.
*
* This runs in time linear to the size of the chunk.
*/
addChunk(chunk: string) {
if (this.isDone) throw Error("Cannot add chunk to parser marked done")
// called to save the current state as a "safe" spot to truncate and provide a result
const markTruncateSpot = (delta = 0) =>
(this.truncationInfo = {
index: this.consumed.length + delta,
append: this.charsNeededToClose,
})
// consume everything we didn't consume last time, then the new chunk
const toConsume = this.unconsumed + chunk
this.unconsumed = ""
for (let i = 0; i < toConsume.length; i++) {
const c = toConsume[i]
// atomically consume escape sequences
if (this.inString && c === "\\") {
// we have seen the `\`
i++
const escaped = toConsume[i]
// unicode escapes are of the form \uXXXX
if (escaped === "u") {
if (toConsume[i + 4] !== undefined) {
// if we can grab 4 chars forward, do so
this.consumed += c + escaped + toConsume.slice(i + 1, i + 5)
} else {
// otherwise, save the rest of the string for later
this.unconsumed = c + escaped + toConsume.slice(i + 1, i + 5)
}
// we have seen either 4 chars or until the end of the string
// (if this goes over the end the loop exists normally)
i += 4
} else if (escaped !== undefined) {
// standard two char escape (\n, \t, etc.)
this.consumed += c + escaped
} else {
// end of chunk. save the \ to tack onto front of next chunk
this.unconsumed = c
}
// restart from after the sequence
continue
}
if (!this.inString && !isNaN(+c)) {
this.inNumber = true
}
if (
this.inNumber &&
isNaN(+c) &&
c !== "-" &&
c !== "e" &&
c !== "+" &&
c !== "E" &&
c !== "."
) {
this.inNumber = false
}
// inject completed object sentinels as required
// basically, convert:
// `A { B ` => `A {isComplete: false, value: { B `
// `A { B } C` => `A {isComplete: false, value: { B }, isComplete: true} C`
// `A [ B ` => `A {isComplete: false, value: [ B `
// `A [ B ] C` => `A {isComplete: false, value: [ B ], isComplete: true} C`
// Flattened and replaced with the IncompleteJson.ObjectStreamComplete in this.cachedJSONParse
if (!this.inString && c === "}") {
this.consumed += `}, "${this.internalObjectStreamComplete}": true}`
this.charsNeededToClose = this.charsNeededToClose.slice(2)
markTruncateSpot()
} else if (!this.inString && c === "{") {
this.consumed += `{"${this.internalObjectStreamComplete}": false, "${this.internalObjectRawLiteral}": {`
this.charsNeededToClose = "}" + this.charsNeededToClose
} else if (!this.inString && c === "[") {
this.consumed += `{"${this.internalObjectStreamComplete}": false, "${this.internalObjectRawLiteral}": [`
this.charsNeededToClose = "}" + this.charsNeededToClose
} else if (!this.inString && c === "]") {
this.consumed += `], "${this.internalObjectStreamComplete}": true}`
this.charsNeededToClose = this.charsNeededToClose.slice(2)
markTruncateSpot()
} else {
//otherwise, consume the char itself
this.consumed += c
}
if (this.inString && c !== '"') {
// if partial strings allowed, every location in a string is a potential truncate spot
// EXCEPT in key strings - the following cannot be completed: { "ab
if (
this.context[0] !== "key" &&
!this.options?.prohibitPartialStrings
) {
markTruncateSpot()
}
// skip over the special char handling
continue
}
// consuming a matching closing " - pop it from the stack
if (c === this.charsNeededToClose[0] && c === '"') {
this.charsNeededToClose = this.charsNeededToClose.slice(1)
// good place to truncate, unless we're in a key
if (this.context[0] !== "key") {
markTruncateSpot()
}
}
if (this.inNumber && !this.options?.prohibitPartialNumbers) {
// symbols found in numbers
if (c === "e" || c === "." || c === "E") {
// unparsable as suffixes, trim them if partials allowed
markTruncateSpot(-1)
}
}
if (c === '"') {
// toggle string mode
this.inString = !this.inString
// if we aren't prohibiting partial strings and we are starting a new string,
// note how to close this string
if (!this.options?.prohibitPartialStrings && this.inString) {
this.charsNeededToClose = '"' + this.charsNeededToClose
if (this.context[0] !== "key") {
markTruncateSpot()
}
}
}
if (c === ",") {
// truncate right before the `,`
markTruncateSpot(-1)
// when parsing object, comma switches from val context to key
if (this.context[0] === "val") {
this.context[0] = "key"
}
}
// colon switches from key context to val
if (c === ":") {
if (this.context[0] === "key") {
this.context[0] = "val"
}
}
// in array: strings can always be truncated
if (c === "[") {
this.context.unshift("arr")
this.charsNeededToClose = "]" + this.charsNeededToClose
markTruncateSpot()
}
// in object: strings can be truncated in values, but not keys!
if (c === "{") {
this.context.unshift("key")
this.charsNeededToClose = "}" + this.charsNeededToClose
markTruncateSpot()
}
// exiting our context, pop!
if (c === "}" || c === "]") {
this.context.shift()
}
}
}
/**
* Mark the stream complete.
* This will force emit values that were not emitted previously due to potentially being incomplete.
*
* Example: a chunk that ends in a number will not be emitted because the next chunk may continue with a number,
* which would be appended to the existing value (significantly changing the meaning of the represented data)
* ```ts
* > const ij = new IncompleteJSON<number>()
* > ij.addChunk('1234')
* > ij.readValue()
* undefined
* > ij.addChunk('5')
* > ij.readValue()
* undefined
* > ij.done()
* > ij.readValue()
* 12345
* ```
*/
done() {
if (this.isDone) return
this.isDone = true
const rawData = this.consumed + this.charsNeededToClose
try {
const result = this.cachedJSONParse(rawData)
this.truncationInfo = {
index: this.consumed.length,
append: this.charsNeededToClose,
result,
}
} catch {
// pass: this JSON parse is expected to fail in some cases,
// for instance when IncompleteJSON.parse is called without a complete stream.
// the existing truncationInfo will still be good
}
}
/**
* Attempt to parse the consumed chunks into as much data as is available
*
* This operation runs in time linear to the length of the stream
*
* While modern JSON parsers are significantly faster than modern LLM's, care
* should be taken on very large inputs to not call readValue more then needed
*/
readValue(): Incomplete<T> {
if (!this.consumed || !this.truncationInfo) {
return undefined
}
if (!("result" in this.truncationInfo)) {
try {
this.truncationInfo.result = this.cachedJSONParse(
this.consumed.slice(0, this.truncationInfo.index) +
this.truncationInfo.append,
)
} catch (e) {
console.error(
"ERROR: readValue called with bogus internal state.",
this,
)
throw e
}
}
return this.truncationInfo.result
}
private rawParseCache = { key: "", value: undefined as Incomplete<T> }
private cachedJSONParse(str: string): Incomplete<T> {
if (str !== this.rawParseCache.key) {
this.rawParseCache = {
key: str,
value: JSON.parse(str, (k, v) => {
if (
typeof v === "object" &&
v &&
this.internalObjectStreamComplete in v
) {
const raw = v[this.internalObjectRawLiteral]
raw[ItemDoneStreaming] = v[this.internalObjectStreamComplete]
return raw
}
return v
}),
}
}
return this.rawParseCache.value
}
}
const PollyfillTextDecoderStream = () => {
try {
return new TextDecoderStream()
} catch {
const decoder = new TextDecoder()
return new TransformStream<Uint8Array, string>({
transform(chunk, controller) {
const text = decoder.decode(chunk, { stream: true })
if (text.length !== 0) {
controller.enqueue(text)
}
},
flush(controller) {
const text = decoder.decode()
if (text.length !== 0) {
controller.enqueue(text)
}
},
})
}
}
/**
* Given an OpenAi streaming style `Response`, convert it to either an error object
* if request was unsuccessful, or a `ReadableStream<string>` of
* `*.choices[0].delta.content` values for each line received
*/
export const ReadableFromOpenAIResponse = (
response: Response,
):
| (ReadableStream<string> & { error?: never })
| {
error: Promise<{ error: { message: string; type: string; code: string } }>
} => {
if (response.status !== 200) {
return { error: response.json() }
}
if (!response.body) {
throw Error("Response is non-erroneous but has no body.")
}
let partial = ""
return response.body.pipeThrough(PollyfillTextDecoderStream()).pipeThrough(
new TransformStream<string, string>({
transform(value, controller) {
const chunk = partial + value
partial = ""
const lines = chunk
.split("\n\n")
.map((x) => x.trim())
.filter((x) => x && x.startsWith("data: "))
.map((x) => x.slice("data: ".length))
for (const line of lines) {
if (line === "[DONE]") {
break
}
try {
const json = JSON.parse(line)
const content = json.choices[0].delta.content
if (content !== undefined) {
controller.enqueue(content)
}
} catch {
// data line incomplete?
partial += "data: " + line
}
}
},
}),
)
}
/**
* Convert an `ReadableStream` to a `AsyncIterable` for use in `for await (... of ... ) { ... }` loops
*
* By the spec, a `ReadableStream` is already `AsyncIterable`, but most browsers to not support this (server-side support is better).
*
* See https://github.com/microsoft/TypeScript/issues/29867, https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of
*/
export async function* AsAsyncIterable<T>(
readable: ReadableStream<T>,
): AsyncIterable<T> {
const reader = readable.getReader()
try {
while (true) {
const { done, value } = await reader.read()
if (done) return
yield value
}
} finally {
reader.releaseLock()
}
}
/**
* Symbolic sentinel added to objects/arrays when the stream has completed defining the value.
*
* This will be present as a symbolic key with value `true` on all objects/arrays that
* have finished streaming, and will be missing otherwise.
*
* Ex:
* ```ts
* const values = IncompleteJson.parse<{ value: string }[]>(
* '[{"value": "a"}, {"value": "ab',
* )
*
* values?.[ItemDoneStreaming] // false
* values?.[0]?.[ItemDoneStreaming] // true
* values?.[1]?.[ItemDoneStreaming] // false
* ```
*/
export const ItemDoneStreaming = Symbol("gjp-4-gpt.ItemDoneStreaming")
/**
* Deep partial of `T`
*
* When `S` is defined, reading `[S]: true` from an object anywhere in the tree
* coerces it and all its children back into a non-partial (`Complete<>`) mode.
*/
export type Incomplete<T> =
| (T extends (infer V)[]
?
| Complete<V[]>
| ({ [ItemDoneStreaming]?: never } & Exclude<
Incomplete<V>,
undefined
>[])
: T extends object
?
| Complete<T>
| ({ [ItemDoneStreaming]?: never } & {
[P in keyof T]?: Incomplete<T[P]>
})
: T)
| undefined
/**
* Deeply adds the entry `[S]: true` to every object in `T`
*/
export type Complete<T> = T extends (infer V)[]
? { [ItemDoneStreaming]: true } & Complete<V>[]
: T extends object
? { [ItemDoneStreaming]: true } & {
[P in keyof T]: Complete<T[P]>
}
: T