Skip to content

Commit bfb4b36

Browse files
authored
fix: shard overload handle (#2895)
1 parent a68dbb4 commit bfb4b36

File tree

2 files changed

+38
-16
lines changed

2 files changed

+38
-16
lines changed

src/services/api/streaming.ts

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ import {parseMultipart} from '@mjackson/multipart-parser';
22
import qs from 'qs';
33

44
import {
5+
isErrorChunk,
56
isKeepAliveChunk,
67
isQueryResponseChunk,
78
isSessionChunk,
89
isStreamDataChunk,
910
} from '../../store/reducers/query/utils';
1011
import type {Actions, StreamQueryParams} from '../../types/api/query';
11-
import type {QueryResponseChunk, SessionChunk, StreamDataChunk} from '../../types/store/streaming';
12+
import type {
13+
QueryResponseChunk,
14+
SessionChunk,
15+
StreamDataChunk,
16+
StreamingChunk,
17+
} from '../../types/store/streaming';
1218
import {
1319
BINARY_DATA_IN_PLAIN_TEXT_DISPLAY,
1420
DEV_ENABLE_TRACING_FOR_ALL_REQUESTS,
@@ -94,24 +100,34 @@ export class StreamingAPI extends BaseYdbAPI {
94100
const traceId = response.headers.get('traceresponse')?.split('-')[1];
95101

96102
await parseMultipart(response.body, {boundary: BOUNDARY}, async (part) => {
103+
const text = await part.text();
104+
105+
let chunk: unknown;
97106
try {
98-
const chunk = JSON.parse(await part.text());
99-
100-
if (isSessionChunk(chunk)) {
101-
const sessionChunk = chunk;
102-
sessionChunk.meta.trace_id = traceId;
103-
options.onSessionChunk(chunk);
104-
} else if (isStreamDataChunk(chunk)) {
105-
options.onStreamDataChunk(chunk);
106-
} else if (isQueryResponseChunk(chunk)) {
107-
options.onQueryResponseChunk(chunk);
108-
} else if (isKeepAliveChunk(chunk)) {
109-
// Logging for debug purposes
110-
console.info('Received keep alive chunk');
111-
}
107+
chunk = JSON.parse(text);
112108
} catch (e) {
113109
throw new Error(`Error parsing chunk: ${e}`);
114110
}
111+
112+
if (isErrorChunk(chunk)) {
113+
await response.body?.cancel().catch(() => {});
114+
throw chunk;
115+
}
116+
117+
const streamingChunk = chunk as StreamingChunk;
118+
119+
if (isSessionChunk(streamingChunk)) {
120+
const sessionChunk = streamingChunk;
121+
sessionChunk.meta.trace_id = traceId;
122+
options.onSessionChunk(streamingChunk);
123+
} else if (isStreamDataChunk(streamingChunk)) {
124+
options.onStreamDataChunk(streamingChunk);
125+
} else if (isQueryResponseChunk(streamingChunk)) {
126+
options.onQueryResponseChunk(streamingChunk);
127+
} else if (isKeepAliveChunk(streamingChunk)) {
128+
// Logging for debug purposes
129+
console.info('Received keep alive chunk');
130+
}
115131
});
116132
}
117133
}

src/store/reducers/query/utils.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type {Actions} from '../../../types/api/query';
1+
import type {Actions, ErrorResponse} from '../../../types/api/query';
22
import type {QueryAction, QueryMode, QuerySyntax} from '../../../types/store/query';
33
import type {
44
QueryResponseChunk,
@@ -51,6 +51,12 @@ export function isKeepAliveChunk(content: StreamingChunk): content is SessionChu
5151
return content?.meta?.event === 'KeepAlive';
5252
}
5353

54+
export function isErrorChunk(content: unknown): content is ErrorResponse {
55+
return Boolean(
56+
content && typeof content === 'object' && ('error' in content || 'issues' in content),
57+
);
58+
}
59+
5460
export const prepareQueryWithPragmas = (query: string, pragmas?: string): string => {
5561
if (!pragmas || !pragmas.trim()) {
5662
return query;

0 commit comments

Comments
 (0)