-
-
Notifications
You must be signed in to change notification settings - Fork 866
fix(realtime): gracefully recover from ECONNRESET errors when sending stream data from tasks to the server #2611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: a236826 The changes in this PR will be included in the next version bump. This PR includes changesets to release 24 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
WalkthroughReworks realtime streaming: removes relayRealtimeStreams and adds RedisRealtimeStreams with per-client chunking, inactivity timeouts, SSE pinging, getLastChunkIndex, and updated ingest/stream signatures (supports Last-Event-ID resume). Adds S2RealtimeStreams and S2-backed metadata stream path (S2MetadataStream) plus client-side MetadataStream buffering, retry/backoff, HEAD-based resume, and runStream reconnects. API and SDK changes: createStream, client future flags, run subscription stream handling, and propagation of realtimeStreamsVersion through trigger paths. DB schema and migrations add realtimeStreams and realtimeStreamsVersion. Docker adds toxiproxy and nginx-h2; extensive tests added. Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
7272420 to
50fbba5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (16)
docker/docker-compose.yml (1)
144-156: Good addition for testing network failure scenarios.The toxiproxy service is well-configured for simulating connection resets and other network faults locally. The setup enables effective testing of the stream resumption logic.
Optional: Consider IPv4/Linux compatibility notes.
A minor consideration for developers: the config uses IPv6 listen address (
[::]), which should work on most Docker Desktop setups (Mac/Windows). On Linux systems, you may need to verify IPv6 is enabled in Docker, or alternatively update the config to use0.0.0.0:30303for IPv4-only. Additionally,host.docker.internal:3030may require extra configuration on Linux (some distributions definehost-gatewayinstead)—consider documenting this in the repo's development setup guide if not already present.apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
19-27: Harden resume header parsing (guard NaN/negative).Ensure only non-negative integers are used; otherwise ignore the header to avoid bad resumes.
Apply:
- const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); - const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; + const resumeHeader = request.headers.get("x-resume-from-chunk"); + const parsed = resumeHeader != null ? Number(resumeHeader) : undefined; + const resumeFromChunkNumber = + Number.isInteger(parsed) && parsed >= 0 ? parsed : undefined;apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
142-147: Avoid unbounded buffering on the tee’d local branch.If no reader locks the relayed stream quickly, the tee can buffer indefinitely. Consider dropping the local branch if not locked within waitForBufferTimeout, or switching to a bounded ring buffer.
Minimal option: schedule a cleanup if not locked:
this.handleLocalIngestion(localStream, runId, streamId).catch((err) => { logger.error("[RelayRealtimeStreams][ingestData] Error in local ingestion:", { err }); }); + // If no reader locks this buffer soon, drop it to bound memory. + setTimeout(() => { + const key = `${runId}:${streamId}`; + const rec = this._buffers.get(key); + if (rec && !rec.locked) this.deleteBuffer(key); + }, this.waitForBufferTimeout).unref?.();apps/webapp/app/services/realtime/types.ts (1)
8-13: Document resume semantics and consider type aliases.
- Clarify in JSDoc that resumeFromChunk is a zero-based, non-negative integer.
- Team guideline: prefer types over interfaces; consider
type StreamIngestor = { … }when convenient.Example:
/** resumeFromChunk: zero-based, non-negative integer to start chunk indexing from. */apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (1)
56-69: Harden resume header parsing (guard NaN/negative).Mirror the safer parsing to avoid propagating invalid values.
Apply:
- const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); - const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; + const resumeHeader = request.headers.get("x-resume-from-chunk"); + const parsed = resumeHeader != null ? Number(resumeHeader) : undefined; + const resumeFromChunkNumber = + Number.isInteger(parsed) && parsed >= 0 ? parsed : undefined;apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (3)
176-184: Don’t log full chunk payloads. Log size/metadata instead.Payloads can be large/sensitive. Prefer size and indices.
Apply:
- logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", { - streamKey, - runId, - chunkIndex: currentChunkIndex, - resumeFromChunk: startChunk, - value, - }); + logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", { + streamKey, + runId, + chunkIndex: currentChunkIndex, + resumeFromChunk: startChunk, + valueLength: value.length, + });
216-223: Include last known chunk index on ECONNRESET to aid immediate resume.Return X-Last-Chunk-Index so clients can resume without an extra HEAD.
Apply:
- if ("code" in error && error.code === "ECONNRESET") { - logger.info("[RealtimeStreams][ingestData] Connection reset during ingestData:", { - error, - }); - return new Response(null, { status: 500 }); - } + if ("code" in error && error.code === "ECONNRESET") { + logger.info("[RealtimeStreams][ingestData] Connection reset during ingestData:", { + error, + }); + // Best effort: report last chunk index to caller + let last = -1; + try { + last = await this.getLastChunkIndex(runId, streamId); + } catch {} + return new Response(null, { + status: 500, + headers: { "X-Last-Chunk-Index": String(last) }, + }); + }
74-88: SSE completion: consider closing on END_SENTINEL regardless of batch position.If END_SENTINEL is present, the stream is logically done. Current check (only when last in batch) might delay close unnecessarily.
Apply:
- if (data === END_SENTINEL && i === entries.length - 1) { + if (data === END_SENTINEL) { controller.close(); return; }packages/core/src/v3/runMetadata/metadataStream.ts (8)
1-3: Prefer isomorphic fetch over node:http(s).Using node:https/http makes this non‑isomorphic. Consider fetch + ReadableStream for POST/HEAD to align with our TS guidance and enable edge/browser runtimes. As per coding guidelines.
18-21: Use type alias over interface.We prefer types in TS here.
Apply:
-interface BufferedChunk<T> { - index: number; - data: T; -} +type BufferedChunk<T> = { + index: number; + data: T; +};
46-50: Buffering starts before the request; risk of early-chunk eviction.If the source is fast and maxBufferSize is small, chunks 0..N can be overwritten before the POST starts, causing irreversible gaps. Start the request before aggressive buffering or gate buffering when the ring nears capacity.
179-181: Guard loop when the request is destroyed/ended.Avoid writing after an error/redirected retry to the same req.
Apply:
- while (true) { + while (true) { + if (req.destroyed || req.writableEnded || req.writableFinished) break;
169-174: Honor already‑aborted signals.If signal is already aborted when makeRequest starts, bail immediately to avoid hanging.
Apply:
if (this.options.signal) { - this.options.signal.addEventListener("abort", () => { - req.destroy(new Error("Request aborted")); - }); + const onAbort = () => req.destroy(new Error("Request aborted")); + if (this.options.signal.aborted) onAbort(); + this.options.signal.addEventListener("abort", onAbort); }
247-254: Expand retryable detection (optional).Consider also 'ECONNABORTED' and message 'Request timed out' to be safe across environments.
Apply:
- const retryableErrors = [ + const retryableErrors = [ "ECONNRESET", "ECONNREFUSED", "ETIMEDOUT", + "ECONNABORTED", "ENOTFOUND", "EPIPE", "socket hang up", ]; @@ - if (error.message && error.message.includes("socket hang up")) { + if (error.message && (error.message.includes("socket hang up") || error.message.includes("Request timed out"))) { return true; }Also applies to: 261-266
294-307: Unused helper.getChunksFromBuffer() isn’t referenced. Remove or use it to simplify processStream lookup.
35-36: bufferStartIndex is written but never read.Either use it to fast‑fail missing chunks or remove to reduce cognitive load.
Also applies to: 290-291
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
references/hello-world/src/trigger/realtime.tsis excluded by!references/**
📒 Files selected for processing (9)
.changeset/quiet-falcons-approve.md(1 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts(1 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts(2 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts(5 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts(2 hunks)apps/webapp/app/services/realtime/types.ts(1 hunks)docker/config/toxiproxy.json(1 hunks)docker/docker-compose.yml(1 hunks)packages/core/src/v3/runMetadata/metadataStream.ts(7 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tspackages/core/src/v3/runMetadata/metadataStream.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tspackages/core/src/v3/runMetadata/metadataStream.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/services/realtime/relayRealtimeStreams.server.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
🧬 Code graph analysis (3)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
packages/core/src/logger.ts (1)
error(69-79)
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (2)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
loader(30-70)apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)
createLoaderApiRoute(97-287)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
relayRealtimeStreams(275-278)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
🔇 Additional comments (3)
docker/config/toxiproxy.json (1)
1-8: Configuration is valid and appropriate for local testing.The toxiproxy configuration is well-formed and correctly defines a proxy to simulate network conditions during development. The setup aligns well with the PR's goal of testing ECONNRESET recovery.
.changeset/quiet-falcons-approve.md (1)
5-5: Changelog entry looks good. Clear, concise, and scoped to the fix.apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
250-253: Proxying getLastChunkIndex to fallback is fine.
| const loader = createLoaderApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| allowJWT: false, | ||
| corsStrategy: "none", | ||
| findResource: async (params, authentication) => { | ||
| return $replica.taskRun.findFirst({ | ||
| where: { | ||
| friendlyId: params.runId, | ||
| runtimeEnvironmentId: authentication.environment.id, | ||
| }, | ||
| select: { | ||
| id: true, | ||
| friendlyId: true, | ||
| parentTaskRun: { | ||
| select: { | ||
| friendlyId: true, | ||
| }, | ||
| }, | ||
| rootTaskRun: { | ||
| select: { | ||
| friendlyId: true, | ||
| }, | ||
| }, | ||
| }, | ||
| }); | ||
| }, | ||
| }, | ||
| async ({ request, params, resource: run }) => { | ||
| if (!run) { | ||
| return new Response("Run not found", { status: 404 }); | ||
| } | ||
|
|
||
| const targetId = | ||
| params.target === "self" | ||
| ? run.friendlyId | ||
| : params.target === "parent" | ||
| ? run.parentTaskRun?.friendlyId | ||
| : run.rootTaskRun?.friendlyId; | ||
|
|
||
| if (!targetId) { | ||
| return new Response("Target not found", { status: 404 }); | ||
| } | ||
|
|
||
| // Handle HEAD request to get last chunk index | ||
| if (request.method !== "HEAD") { | ||
| return new Response("Only HEAD requests are allowed for this endpoint", { status: 405 }); | ||
| } | ||
|
|
||
| const lastChunkIndex = await relayRealtimeStreams.getLastChunkIndex(targetId, params.streamId); | ||
|
|
||
| return new Response(null, { | ||
| status: 200, | ||
| headers: { | ||
| "X-Last-Chunk-Index": lastChunkIndex.toString(), | ||
| }, | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add authorization (and consider allowJWT parity) to the loader.
This loader currently authenticates but does not enforce scopes/resources, unlike the sibling loader. If it should follow the same access model, add authorization and, if needed, allowJWT.
Proposed update:
const loader = createLoaderApiRoute(
{
params: ParamsSchema,
- allowJWT: false,
- corsStrategy: "none",
+ allowJWT: true,
+ corsStrategy: "none",
+ authorization: {
+ action: "read",
+ resource: (run) => ({
+ runs: run.friendlyId,
+ // include others if selected (tags/batch/tasks) for parity with sibling route
+ }),
+ superScopes: ["read:runs", "read:all", "admin"],
+ },If you want full parity, expand the select for run to include fields used by the authorization resource (e.g., runTags, batch.friendlyId, taskIdentifier) as in apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts. Based on coding guidelines.
🤖 Prompt for AI Agents
In apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts around
lines 72 to 129, the loader authenticates but does not enforce
authorization/scopes or include the resource fields used by the authorization
check; update the loader to call the same authorization routine used by the
sibling loader (validate the authenticated user/environment against the target
run resource and required scopes), add allowJWT parity if the sibling allows
JWT, and expand the findResource select to include the additional fields needed
by the authorization resource (e.g., runTags, batch.friendlyId, taskIdentifier
or any other properties the authorization logic inspects) so the authorization
step can run before returning the last-chunk index; ensure unauthorized requests
return the appropriate 403/401 responses and preserve the existing HEAD-only
behavior.
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
Outdated
Show resolved
Hide resolved
| this.bufferReaderTask = (async () => { | ||
| try { | ||
| let chunkIndex = 0; | ||
| while (true) { | ||
| const { done, value } = await this.streamReader!.read(); | ||
|
|
||
| if (done) { | ||
| this.streamComplete = true; | ||
| break; | ||
| } | ||
|
|
||
| // Add to ring buffer | ||
| this.addToRingBuffer(chunkIndex, value); | ||
| this.highestBufferedIndex = chunkIndex; | ||
| chunkIndex++; | ||
| } | ||
| } catch (error) { | ||
| throw error; | ||
| } | ||
| })(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unhandled rejection and reader lock not released in startBuffering().
On read() error, you throw inside an unawaited task, risking unhandled rejection and leaving the reader locked. Mark completion and always release the lock.
Apply:
- this.bufferReaderTask = (async () => {
+ this.bufferReaderTask = (async () => {
try {
let chunkIndex = 0;
while (true) {
const { done, value } = await this.streamReader!.read();
if (done) {
this.streamComplete = true;
break;
}
// Add to ring buffer
this.addToRingBuffer(chunkIndex, value);
this.highestBufferedIndex = chunkIndex;
chunkIndex++;
}
- } catch (error) {
- throw error;
- }
- })();
+ } catch (error) {
+ this.streamComplete = true; // terminate send loop gracefully
+ } finally {
+ try { this.streamReader?.releaseLock(); } catch {}
+ }
+ })().catch(() => {}); // avoid unhandled rejection noise| req.on("timeout", () => { | ||
| safeReleaseLock(reader); | ||
|
|
||
| req.destroy(new Error("Request timed out")); | ||
| }); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client timeout isn’t retried (bug).
You destroy with a generic Error, so isRetryableError() won’t match and the flow won’t resume. Tag the error as ETIMEDOUT.
Apply:
- req.on("timeout", () => {
- req.destroy(new Error("Request timed out"));
- });
+ req.on("timeout", () => {
+ const err: any = new Error("Request timed out");
+ err.code = "ETIMEDOUT";
+ req.destroy(err);
+ });🤖 Prompt for AI Agents
In packages/core/src/v3/runMetadata/metadataStream.ts around lines 138 to 141,
the code destroys the request with a generic Error("Request timed out") so
isRetryableError() won't detect it; create the Error, set its code property to
"ETIMEDOUT" (e.g. const err = new Error("Request timed out"); (err as any).code
= "ETIMEDOUT";) and pass that error to req.destroy(err) so the retry logic
recognizes a timeout.
| const stringified = JSON.stringify(chunk.data) + "\n"; | ||
| req.write(stringified); | ||
| this.currentChunkIndex = lastSentIndex + 1; | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honor backpressure when writing to the request stream.
Ignoring req.write()’s boolean can buffer unboundedly and spike memory. Await 'drain' when needed.
Apply:
- const stringified = JSON.stringify(chunk.data) + "\n";
- req.write(stringified);
+ const stringified = JSON.stringify(chunk.data) + "\n";
+ const ok = req.write(stringified);
+ if (!ok) {
+ await new Promise<void>((r) => req.once("drain", r));
+ }
this.currentChunkIndex = lastSentIndex + 1;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const stringified = JSON.stringify(chunk.data) + "\n"; | |
| req.write(stringified); | |
| this.currentChunkIndex = lastSentIndex + 1; | |
| } else { | |
| const stringified = JSON.stringify(chunk.data) + "\n"; | |
| const ok = req.write(stringified); | |
| if (!ok) { | |
| await new Promise<void>((r) => req.once("drain", r)); | |
| } | |
| this.currentChunkIndex = lastSentIndex + 1; | |
| } else { |
🤖 Prompt for AI Agents
In packages/core/src/v3/runMetadata/metadataStream.ts around lines 186 to 189,
the code ignores the boolean result of req.write() which can lead to unbounded
buffering; after calling req.write(stringified) check its return value and if it
returns false await a Promise that resolves on req.once('drain', resolve) before
continuing and advancing this.currentChunkIndex, ensuring backpressure is
honored (also propagate errors if the stream emits 'error' or the request is
aborted).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
179-179: Fix inconsistent error logging in cleanup.The cleanup function uses
console.errorwhile the rest of the method usesthis.logger.error. This inconsistency makes logs harder to trace and filter.Apply this diff:
- await redis.quit().catch(console.error); + await redis.quit().catch((err) => { + this.logger.error("[RealtimeStreams][streamResponse] Error in cleanup:", { err }); + });
♻️ Duplicate comments (5)
packages/core/src/v3/runMetadata/metadataStream.ts (4)
79-103: Release reader lock and avoid unhandled rejection in startBuffering().Reader lock isn’t released on error/EOF, and throwing in the unawaited task risks an unhandled rejection. Mark completion, always release the lock, and swallow the task rejection.
private startBuffering(): void { this.streamReader = this.serverStream.getReader(); this.bufferReaderTask = (async () => { try { let chunkIndex = 0; while (true) { const { done, value } = await this.streamReader!.read(); if (done) { this.streamComplete = true; break; } // Add to ring buffer this.addToRingBuffer(chunkIndex, value); this.highestBufferedIndex = chunkIndex; chunkIndex++; } - } catch (error) { - throw error; - } - })(); + } catch (error) { + this.streamComplete = true; + } finally { + try { + this.streamReader?.releaseLock(); + } catch {} + } + })().catch(() => {}); }
147-150: Timeout should be classified as retryable (ETIMEDOUT).Generic Error prevents isRetryableError() from matching; add code="ETIMEDOUT".
- req.on("timeout", () => { - req.destroy(new Error("Request timed out")); - }); + req.on("timeout", () => { + const err: any = new Error("Request timed out"); + err.code = "ETIMEDOUT"; + req.destroy(err); + });
151-158: HTTP 408 should back off and re‑sync resume offset via HEAD.Immediate retry with the same offset risks gaps/dupes. Back off, query last chunk, and resume from next.
if (res.statusCode === 408) { if (this.retryCount < this.maxRetries) { this.retryCount++; - resolve(this.makeRequest(startFromChunk)); + (async () => { + const delayMs = this.calculateBackoffDelay(); + await this.delay(delayMs); + const last = await this.queryServerLastChunkIndex(); + resolve(this.makeRequest((last ?? -1) + 1)); + })().catch(reject); return; }
195-198: Honor backpressure from req.write().Ignoring the boolean return can cause unbounded buffering and memory spikes. Await 'drain' when false.
- const stringified = JSON.stringify(chunk.data) + "\n"; - req.write(stringified); + const stringified = JSON.stringify(chunk.data) + "\n"; + const ok = req.write(stringified); + if (!ok) { + await new Promise<void>((r) => req.once("drain", r)); + } this.currentChunkIndex = lastSentIndex + 1;apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (1)
76-103: Add authorization + allowJWT parity to loader (matches sibling route).Currently authenticates but does not authorize; parity recommended with realtime.v1.streams.$runId.$streamId.ts.
-const loader = createLoaderApiRoute( +const loader = createLoaderApiRoute( { params: ParamsSchema, - allowJWT: false, + allowJWT: true, corsStrategy: "none", findResource: async (params, authentication) => { return $replica.taskRun.findFirst({ where: { friendlyId: params.runId, runtimeEnvironmentId: authentication.environment.id, }, - select: { - id: true, - friendlyId: true, - parentTaskRun: { select: { friendlyId: true } }, - rootTaskRun: { select: { friendlyId: true } }, - }, + select: { + id: true, + friendlyId: true, + runTags: true, + taskIdentifier: true, + batch: { select: { friendlyId: true } }, + parentTaskRun: { select: { friendlyId: true } }, + rootTaskRun: { select: { friendlyId: true } }, + }, }); }, + authorization: { + action: "read", + resource: (run) => ({ + runs: run.friendlyId, + tags: run.runTags, + batch: run.batch?.friendlyId, + tasks: run.taskIdentifier, + }), + superScopes: ["read:runs", "read:all", "admin"], + }, },Based on coding guidelines.
🧹 Nitpick comments (11)
packages/core/src/v3/runMetadata/metadataStream.ts (6)
192-204: O(n) lookup per chunk; use O(1) slot check.Using Array.find() scans up to maxBufferSize every write. Use the ring slot and verify the stored index.
- const chunk = this.ringBuffer.find((c) => c.index === lastSentIndex); + const slot = this.ringBuffer[lastSentIndex % this.maxBufferSize]; + const chunk = slot && slot.index === lastSentIndex ? slot : undefined;
116-121: Consider NDJSON content type.If the server expects newline-delimited JSON, prefer application/x-ndjson.
As this impacts the server contract, confirm first.
headers: { ...this.options.headers, - "Content-Type": "application/json", + "Content-Type": "application/x-ndjson", "X-Client-Id": this.clientId, "X-Resume-From-Chunk": startFromChunk.toString(), },
178-182: Use once: true for abort listener to avoid leaks.Listener persists across retries; ensure it runs once.
- if (this.options.signal) { - this.options.signal.addEventListener("abort", () => { - req.destroy(new Error("Request aborted")); - }); - } + if (this.options.signal) { + this.options.signal.addEventListener( + "abort", + () => req.destroy(new Error("Request aborted")), + { once: true } + ); + }
58-61: Prefer isomorphic ID generation (randomUUID) over node:crypto when possible.Improves portability; falls back to randomBytes if needed. As per coding guidelines.
- private generateClientId(): string { - return randomBytes(4).toString("hex"); - } + private generateClientId(): string { + try { + // Web Crypto / Node 19+ supports crypto.randomUUID() + // @ts-ignore + if (typeof globalThis.crypto?.randomUUID === "function") { + // Shorten but keep uniqueness properties sufficient for this use + return globalThis.crypto.randomUUID().slice(0, 8).replace(/-/g, ""); + } + } catch {} + return randomBytes(4).toString("hex"); + }
1-2: Consider switching to fetch for portability and simpler timeouts.Using node:http ties this to Node; fetch works in Node/Web and aligns with our isomorphic preference. It also simplifies HEAD/POST and abort handling via AbortSignal and controller timeouts. As per coding guidelines.
Would you like a follow-up patch that refactors makeRequest()/queryServerLastChunkIndex() to fetch with a ReadableStream body and consistent retry hooks?
Also applies to: 109-123
303-315: Unused helper getChunksFromBuffer().It’s not referenced; remove to reduce surface area or wire it into processStream.
Run a quick search to confirm no external references before removal.
apps/webapp/app/env.server.ts (1)
201-204: Clamp inactivity timeout to sane bounds.Prevent accidental 0/negative via a min constraint.
- REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce - .number() - .int() - .default(60000 * 5), // 5 minutes + REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce + .number() + .int() + .min(1_000) // >= 1s + .default(60_000 * 5), // 5 minutesapps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
121-149: Harden ingestData against invalid resumeFromChunk.Defensively sanitize at the relay boundary to avoid forwarding NaN/negative.
- async ingestData( + async ingestData( stream: ReadableStream<Uint8Array>, runId: string, streamId: string, clientId: string, resumeFromChunk?: number ): Promise<Response> { const [localStream, fallbackStream] = stream.tee(); logger.debug("[RelayRealtimeStreams][ingestData] Ingesting data", { runId, streamId, clientId, resumeFromChunk, }); + const startFrom = + resumeFromChunk !== undefined && Number.isInteger(resumeFromChunk) && resumeFromChunk >= 0 + ? resumeFromChunk + : undefined; // Handle local buffering asynchronously and catch errors this.handleLocalIngestion(localStream, runId, streamId).catch((err) => { logger.error("[RelayRealtimeStreams][ingestData] Error in local ingestion:", { err }); }); // Forward to the fallback ingestor asynchronously and catch errors - return this.options.fallbackIngestor.ingestData( + return this.options.fallbackIngestor.ingestData( fallbackStream, runId, streamId, clientId, - resumeFromChunk + startFrom ); }apps/webapp/test/redisRealtimeStreams.test.ts (1)
1123-1127: Reduce flakiness in inactivity timing assertions.Elapsed bounds are tied to internal BLOCK interval (~5s). Make them looser or derive from a shared constant to avoid timing flakes on slow CI.
Example:
- widen to 3.5s–12s, or
- export blockTimeMs from implementation and import here to compute expected range.
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2)
104-136: Consider extracting the duplicated inactivity check.The inactivity timeout logic appears in two places (lines 104-119 and 120-136) with identical threshold checks and logging. Extracting this into a helper function would improve maintainability.
Example refactor:
const checkInactivity = () => { const inactiveMs = Date.now() - lastDataTime; if (inactiveMs >= this.inactivityTimeoutMs) { this.logger.debug( "[RealtimeStreams][streamResponse] Closing stream due to inactivity", { streamKey, inactiveMs, threshold: this.inactivityTimeoutMs, } ); controller.close(); return true; } return false; };Then replace both blocks with
if (checkInactivity()) return;
206-214: Consider using an arrow function for cleanup.The
selfvariable (line 206) is a workaround to accessthis.loggerinside the cleanup function. Using an arrow function would be more idiomatic.Apply this diff:
- const self = this; - - async function cleanup() { + const cleanup = async () => { try { await redis.quit(); } catch (error) { - self.logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error }); + this.logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error }); } - } + };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
references/hello-world/src/trigger/realtime.tsis excluded by!references/**
📒 Files selected for processing (9)
apps/webapp/app/env.server.ts(1 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts(3 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts(2 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts(7 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts(4 hunks)apps/webapp/app/services/realtime/types.ts(1 hunks)apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts(1 hunks)apps/webapp/test/redisRealtimeStreams.test.ts(1 hunks)packages/core/src/v3/runMetadata/metadataStream.ts(8 hunks)
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/env.server.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tsapps/webapp/app/services/realtime/relayRealtimeStreams.server.tspackages/core/src/v3/runMetadata/metadataStream.tsapps/webapp/test/redisRealtimeStreams.test.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/env.server.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tsapps/webapp/app/services/realtime/relayRealtimeStreams.server.tspackages/core/src/v3/runMetadata/metadataStream.tsapps/webapp/test/redisRealtimeStreams.test.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/env.server.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tsapps/webapp/app/services/realtime/relayRealtimeStreams.server.tsapps/webapp/test/redisRealtimeStreams.test.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/env.server.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tsapps/webapp/app/services/realtime/relayRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/env.server.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tsapps/webapp/app/services/realtime/relayRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
**/*.test.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Our tests are all vitest
Files:
apps/webapp/test/redisRealtimeStreams.test.ts
{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}: Do not import app/env.server.ts into tests, either directly or indirectly
Tests should only import classes/functions from files under apps/webapp/app/**/*.ts
Files:
apps/webapp/test/redisRealtimeStreams.test.ts
**/*.{test,spec}.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.{test,spec}.{ts,tsx,js,jsx}: Unit tests must use Vitest
Tests should avoid mocks or stubs and use helpers from @internal/testcontainers when Redis or Postgres are needed
Test files live beside the files under test and should use descriptive describe and it blocks
Files:
apps/webapp/test/redisRealtimeStreams.test.ts
🧬 Code graph analysis (5)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
apps/webapp/app/env.server.ts (1)
env(1211-1211)
apps/webapp/test/redisRealtimeStreams.test.ts (2)
internal-packages/testcontainers/src/index.ts (1)
redisTest(167-167)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
RedisRealtimeStreams(18-352)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (2)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
relayRealtimeStreams(265-268)apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
v1RealtimeStreams(20-20)
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (4)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
v1RealtimeStreams(20-20)apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
loader(51-90)apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)
createLoaderApiRoute(97-287)apps/webapp/app/db.server.ts (1)
$replica(103-106)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2)
packages/core/src/logger.ts (3)
Logger(19-139)LogLevel(15-15)error(69-79)apps/webapp/app/services/realtime/types.ts (2)
StreamIngestor(4-14)StreamResponder(17-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (22)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (7)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
16-17: LGTM: inactivity timeout wiring.Correctly threads env.REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS to RedisRealtimeStreams.
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
240-244: LGTM: getLastChunkIndex delegation.Forwarding to fallback ingestor is appropriate here.
apps/webapp/app/services/realtime/types.ts (1)
5-14: All breaking changes have been addressed across the codebase.Verification confirms both
StreamIngestorimplementors (RedisRealtimeStreams, RelayRealtimeStreams) and all call sites have been updated:
ingestData: All 5 parameters present (stream, runId, streamId, clientId, resumeFromChunk)streamResponse: All 4 parameters present (request, runId, streamId, signal)Fallback calls at lines 143, 71, and 82 in relayRealtimeStreams.server.ts correctly pass the new signatures. Route handlers at realtime.v1.streams routes correctly invoke the methods with updated argument counts.
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (4)
5-5: LGTM: Logger integration follows guidelines.The Logger import uses the correct subpath export from
@trigger.dev/core/logger, and the new options (logger, logLevel, inactivityTimeoutMs) provide good configurability with reasonable defaults.Also applies to: 9-11
22-25: LGTM: Constructor properly initializes logger and inactivity timeout.The fallback defaults (info log level, 60-second timeout) are sensible.
193-274: LGTM: Chunk-based ingestion with proper resumption support.The method correctly:
- Starts chunk indexing from
resumeFromChunk(line 204)- Writes
clientIdandchunkIndexto Redis for each chunk (lines 243-246)- Uses TTL-based cleanup (line 255) instead of sentinel values
- Logs ECONNRESET at info level (line 261) for graceful recovery per the PR objectives
276-351: LGTM: getLastChunkIndex correctly addresses the past review.This implementation properly:
- Scans backwards using
XREVRANGE(line 286) from newest to oldest- Skips
END_SENTINELentries for backward compatibility (lines 318-321)- Filters by
clientIdand returns the first matching chunk with a validchunkIndex(lines 324-330)- Handles pagination with exclusive range syntax
(${id}(line 336)- Returns -1 on errors or no matching chunks
The signature matches the
StreamIngestorinterface with theclientIdparameter, enabling per-client chunk tracking as intended by the PR.Based on learnings
| const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); | ||
| const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; | ||
|
|
||
| return relayRealtimeStreams.ingestData( | ||
| request.body, | ||
| $params.runId, | ||
| $params.streamId, | ||
| clientId, | ||
| resumeFromChunkNumber | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against NaN/invalid X-Resume-From-Chunk.
Same parsing issue here.
- const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
- const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
+ const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk");
+ const n =
+ resumeFromChunkHeader !== null ? Number(resumeFromChunkHeader) : undefined;
+ if (n !== undefined && (!Number.isInteger(n) || n < 0)) {
+ return new Response("Invalid X-Resume-From-Chunk", { status: 400 });
+ }
+ const resumeFromChunkNumber = n;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); | |
| const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; | |
| return relayRealtimeStreams.ingestData( | |
| request.body, | |
| $params.runId, | |
| $params.streamId, | |
| clientId, | |
| resumeFromChunkNumber | |
| ); | |
| const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk"); | |
| const n = | |
| resumeFromChunkHeader !== null ? Number(resumeFromChunkHeader) : undefined; | |
| if (n !== undefined && (!Number.isInteger(n) || n < 0)) { | |
| return new Response("Invalid X-Resume-From-Chunk", { status: 400 }); | |
| } | |
| const resumeFromChunkNumber = n; | |
| return relayRealtimeStreams.ingestData( | |
| request.body, | |
| $params.runId, | |
| $params.streamId, | |
| clientId, | |
| resumeFromChunkNumber | |
| ); |
🤖 Prompt for AI Agents
In apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts around lines
39-48, the X-Resume-From-Chunk header is parsed with parseInt but not validated,
so parseInt can produce NaN; update the code to check the parsed value (e.g.,
const n = parseInt(...,10); if (!Number.isFinite(n) || Number.isNaN(n)) {
resumeFromChunkNumber = undefined } else { resumeFromChunkNumber = n }),
optionally ensure it’s an integer and non-negative, and then pass the validated
resumeFromChunkNumber to ingestData (or return a 400 if you prefer strict
validation).
| // Extract client ID from header, default to "default" if not provided | ||
| const clientId = request.headers.get("X-Client-Id") || "default"; | ||
|
|
||
| if (!request.body) { | ||
| return new Response("No body provided", { status: 400 }); | ||
| } | ||
|
|
||
| const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); | ||
| const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; | ||
|
|
||
| return v1RealtimeStreams.ingestData( | ||
| request.body, | ||
| targetId, | ||
| params.streamId, | ||
| clientId, | ||
| resumeFromChunkNumber | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against NaN/invalid X-Resume-From-Chunk.
parseInt may yield NaN; passing it downstream will store "NaN" indices.
- const resumeFromChunk = request.headers.get("X-Resume-From-Chunk");
- const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined;
+ const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk");
+ const n =
+ resumeFromChunkHeader !== null ? Number(resumeFromChunkHeader) : undefined;
+ if (n !== undefined && (!Number.isInteger(n) || n < 0)) {
+ return new Response("Invalid X-Resume-From-Chunk", { status: 400 });
+ }
+ const resumeFromChunkNumber = n;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Extract client ID from header, default to "default" if not provided | |
| const clientId = request.headers.get("X-Client-Id") || "default"; | |
| if (!request.body) { | |
| return new Response("No body provided", { status: 400 }); | |
| } | |
| const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); | |
| const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; | |
| return v1RealtimeStreams.ingestData( | |
| request.body, | |
| targetId, | |
| params.streamId, | |
| clientId, | |
| resumeFromChunkNumber | |
| ); | |
| } | |
| // Extract client ID from header, default to "default" if not provided | |
| const clientId = request.headers.get("X-Client-Id") || "default"; | |
| if (!request.body) { | |
| return new Response("No body provided", { status: 400 }); | |
| } | |
| const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk"); | |
| const n = | |
| resumeFromChunkHeader !== null ? Number(resumeFromChunkHeader) : undefined; | |
| if (n !== undefined && (!Number.isInteger(n) || n < 0)) { | |
| return new Response("Invalid X-Resume-From-Chunk", { status: 400 }); | |
| } | |
| const resumeFromChunkNumber = n; | |
| return v1RealtimeStreams.ingestData( | |
| request.body, | |
| targetId, | |
| params.streamId, | |
| clientId, | |
| resumeFromChunkNumber | |
| ); | |
| } |
🤖 Prompt for AI Agents
In apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts around
lines 56 to 73, the X-Resume-From-Chunk header is parsed with parseInt which can
produce NaN and propagate invalid indices; validate the parsed value by checking
Number.isInteger and !Number.isNaN and ensure it’s non-negative—if invalid,
treat it as undefined (or return a 400 error if you prefer strict validation);
convert the header to a safe number only when valid and otherwise pass undefined
to v1RealtimeStreams.ingestData so downstream code never receives NaN.
| req.on("error", async (error) => { | ||
| // Check if this is a retryable connection error | ||
| if (this.isRetryableError(error)) { | ||
| if (this.retryCount < this.maxRetries) { | ||
| this.retryCount++; | ||
| const delayMs = this.calculateBackoffDelay(); | ||
|
|
||
| await this.delay(delayMs); | ||
|
|
||
| // Query server to find out what the last chunk it received was | ||
| const serverLastChunk = await this.queryServerLastChunkIndex(); | ||
|
|
||
| // Resume from the next chunk after what the server has | ||
| const resumeFromChunk = serverLastChunk + 1; | ||
|
|
||
| resolve(this.makeRequest(resumeFromChunk)); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop writing to a failed/closed request; avoid double writers after retry.
On 'error' you recursively resolve a new makeRequest(), but the old processStream loop keeps writing to the closed socket, risking write-after-end and noisy errors. Track request closure and bail the loop.
const req = requestFn({
method: "POST",
@@
});
- req.on("error", async (error) => {
+ let requestClosed = false;
+ const markClosed = () => {
+ requestClosed = true;
+ };
+ req.once("error", markClosed);
+ req.once("close", markClosed);
+ req.once("finish", markClosed);
+
+ req.on("error", async (error) => {
// Check if this is a retryable connection error
if (this.isRetryableError(error)) {
if (this.retryCount < this.maxRetries) {
this.retryCount++;
const delayMs = this.calculateBackoffDelay();
await this.delay(delayMs);
// Query server to find out what the last chunk it received was
const serverLastChunk = await this.queryServerLastChunkIndex();
// Resume from the next chunk after what the server has
const resumeFromChunk = serverLastChunk + 1;
resolve(this.makeRequest(resumeFromChunk));
return;
}
}
reject(error);
});
@@
const processStream = async () => {
try {
let lastSentIndex = startFromChunk - 1;
while (true) {
+ if (requestClosed) {
+ break;
+ }
// Send all chunks that are available in buffer
while (lastSentIndex < this.highestBufferedIndex) {
lastSentIndex++;
- const chunk = this.ringBuffer.find((c) => c.index === lastSentIndex);
+ if (requestClosed) break;
+ const slot = this.ringBuffer[lastSentIndex % this.maxBufferSize];
+ const chunk = slot && slot.index === lastSentIndex ? slot : undefined;
if (chunk) {
const stringified = JSON.stringify(chunk.data) + "\n";
- req.write(stringified);
+ const ok = req.write(stringified);
+ if (!ok) {
+ await new Promise<void>((r) => req.once("drain", r));
+ if (requestClosed) break;
+ }
this.currentChunkIndex = lastSentIndex + 1;
} else {
- // Chunk not in buffer (outside ring buffer window)
- // This can happen if the ring buffer size is too small
- console.warn(
- `[metadataStream] Chunk ${lastSentIndex} not in ring buffer (outside window), cannot recover`
- );
+ // Required chunk was evicted; cannot guarantee no data loss.
+ const err = new Error(
+ `[metadataStream] Required chunk ${lastSentIndex} was evicted from the ring buffer; increase maxBufferSize or provide a rewindable source`
+ );
+ req.destroy(err);
+ throw err;
}
}
// If stream is complete and we've sent all buffered chunks, we're done
- if (this.streamComplete && lastSentIndex >= this.highestBufferedIndex) {
+ if ((this.streamComplete && lastSentIndex >= this.highestBufferedIndex) || requestClosed) {
req.end();
break;
}
// Wait a bit for more chunks to be buffered
await this.delay(10);
}
} catch (error) {
reject(error);
}
};Also applies to: 184-215
🤖 Prompt for AI Agents
In packages/core/src/v3/runMetadata/metadataStream.ts around lines 124-141 (and
similarly 184-215), the error handler currently resolves a new makeRequest() but
leaves the original request/stream loop running and potentially writing to a
closed socket; modify the flow to mark the current request as closed and stop
the streaming loop before starting a retry. Concretely: set a boolean flag
(e.g., this.requestClosed = true) or call a teardown helper when the request
emits 'error' or 'close', have the processStream/writing loop check that flag
and return early to avoid further writes, remove or clean up request event
listeners before resolving the retry, and ensure resolve(this.makeRequest(...))
is only reached after cleanup so no double-writer can write to a failed/closed
request.
… stream data from tasks to the server
…ND_SENTINEL system
8e9d6a2 to
d2179cd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (5)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
79-102: Buffering task: unhandled rejection and reader lock leakage.Errors thrown inside the task can become unhandled; reader lock isn’t released on exit.
private startBuffering(): void { this.streamReader = this.serverStream.getReader(); - this.bufferReaderTask = (async () => { + this.bufferReaderTask = (async () => { try { let chunkIndex = 0; while (true) { const { done, value } = await this.streamReader!.read(); if (done) { this.streamComplete = true; break; } // Add to ring buffer this.addToRingBuffer(chunkIndex, value); this.highestBufferedIndex = chunkIndex; chunkIndex++; } - } catch (error) { - throw error; - } - })(); + } catch { + this.streamComplete = true; + } finally { + try { this.streamReader?.releaseLock(); } catch {} + } + })().catch(() => {}); }apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (2)
63-64: Validate X-Resume-From-Chunk to prevent NaN propagation.
parseIntcan produceNaNif the header contains invalid data, which would then be passed toingestDataand stored as string "NaN" in Redis chunk indices.Apply this diff to validate the parsed value:
- const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); - const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; + const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk"); + let resumeFromChunkNumber: number | undefined = undefined; + if (resumeFromChunkHeader !== null) { + const n = Number(resumeFromChunkHeader); + if (!Number.isInteger(n) || n < 0) { + return new Response("Invalid X-Resume-From-Chunk", { status: 400 }); + } + resumeFromChunkNumber = n; + }
76-102: Add authorization to the loader.The loader authenticates but does not enforce authorization scopes or resource checks. The sibling route
realtime.v1.streams.$runId.$streamId.ts(lines 70-79) enforces authorization with action "read" and scopes ["read:runs", "read:all", "admin"]. This loader should follow the same access model.Apply this diff to add authorization parity:
const loader = createLoaderApiRoute( { params: ParamsSchema, - allowJWT: false, + allowJWT: true, corsStrategy: "none", + authorization: { + action: "read", + resource: (run) => ({ + runs: run.friendlyId, + tags: run.runTags, + batch: run.batch?.friendlyId, + tasks: run.taskIdentifier, + }), + superScopes: ["read:runs", "read:all", "admin"], + }, findResource: async (params, authentication) => { return $replica.taskRun.findFirst({ where: { friendlyId: params.runId, runtimeEnvironmentId: authentication.environment.id, }, select: { id: true, friendlyId: true, + runTags: true, + taskIdentifier: true, + batch: { + select: { + friendlyId: true, + }, + }, parentTaskRun: { select: { friendlyId: true, }, }, rootTaskRun: { select: { friendlyId: true, }, }, }, }); }, },apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (2)
18-32: HEAD handling in action is unreachable—move to loader.Remix dispatches HEAD requests to the loader, not the action. This entire HEAD branch is dead code and will never execute. The HEAD logic must be moved into the loader before the
streamResponsecall.Apply this diff to move HEAD handling to the loader:
export async function action({ request, params }: ActionFunctionArgs) { const $params = ParamsSchema.parse(params); // Extract client ID from header, default to "default" if not provided const clientId = request.headers.get("X-Client-Id") || "default"; - // Handle HEAD request to get last chunk index for this client - if (request.method === "HEAD") { - const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex( - $params.runId, - $params.streamId, - clientId - ); - - return new Response(null, { - status: 200, - headers: { - "X-Last-Chunk-Index": lastChunkIndex.toString(), - }, - }); - } if (!request.body) {And add to the loader handler:
async ({ params, request, resource: run, authentication }) => { + // Handle HEAD request to get last chunk index + if (request.method === "HEAD") { + const clientId = request.headers.get("X-Client-Id") || "default"; + const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex( + run.friendlyId, + params.streamId, + clientId + ); + return new Response(null, { + status: 200, + headers: { "X-Last-Chunk-Index": lastChunkIndex.toString() }, + }); + } + // Get Last-Event-ID header for resuming from a specific position const lastEventId = request.headers.get("Last-Event-ID") || undefined;
38-39: Validate X-Resume-From-Chunk to prevent NaN propagation.
parseIntcan produceNaNif the header contains invalid data, which would then be stored as string "NaN" in Redis chunk indices.Apply this diff to validate the parsed value:
- const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); - const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; + const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk"); + let resumeFromChunkNumber: number | undefined = undefined; + if (resumeFromChunkHeader !== null) { + const n = Number(resumeFromChunkHeader); + if (!Number.isInteger(n) || n < 0) { + return new Response("Invalid X-Resume-From-Chunk", { status: 400 }); + } + resumeFromChunkNumber = n; + }
🧹 Nitpick comments (11)
packages/core/src/v3/apiClient/runStream.ts (3)
168-172: Expose retry tuning via options and add jitter/max cap.Hard-coded maxRetries/retryDelayMs limits flexibility and retries can synchronize across clients. Add jitter and a max cap; plumb values from options.
- private retryCount = 0; - private maxRetries = 5; - private retryDelayMs = 1000; + private retryCount = 0; + private maxRetries = 5; // default; allow override via factory options if present + private retryDelayMs = 1000; // base; add jitter and cap in retryConnection()And in retryConnection():
- const delay = this.retryDelayMs * Math.pow(2, this.retryCount - 1); + const base = this.retryDelayMs * Math.pow(2, this.retryCount - 1); + const jitter = Math.random() * 250; + const delay = Math.min(base + jitter, 30_000);
200-206: Validate SSE content-type.Guard against misconfigured endpoints returning non-SSE. Fail fast with clear error.
- const response = await fetch(this.url, { + const response = await fetch(this.url, { headers, signal: this.options.signal, }); + const ctype = response.headers.get("content-type") || ""; + if (response.ok && !ctype.includes("text/event-stream")) { + throw new Error(`Expected text/event-stream, got ${ctype || "unknown"}`); + }
276-303: Avoid recursive reconnect stack growth.retryConnection calls connectStream recursively; with many reconnects this can grow the stack. Prefer a loop.
- // Reconnect - await this.connectStream(controller); + for (;;) { + await this.connectStream(controller); + if (this.retryCount === 0 || this.options.signal?.aborted) return; + }packages/core/src/v3/runMetadata/metadataStream.ts (2)
277-303: Broaden retryable error detection.Also treat Node ECONNABORTED and TLS handshake hiccups (EAI_AGAIN) as retryable.
- const retryableErrors = [ + const retryableErrors = [ "ECONNRESET", "ECONNREFUSED", "ETIMEDOUT", "ENOTFOUND", "EPIPE", "EHOSTUNREACH", "ENETUNREACH", + "ECONNABORTED", + "EAI_AGAIN", "socket hang up", ];
328-340: Ring buffer bookkeeping: track bufferStartIndex on growth too.When below capacity, bufferStartIndex remains 0 even if startFromChunk > 0. Keep it consistent.
if (this.ringBuffer.length < this.maxBufferSize) { // Buffer not full yet, just append this.ringBuffer.push(chunk); + if (this.ringBuffer.length === 1) { + this.bufferStartIndex = index; + } } else {.changeset/quiet-falcons-approve.md (1)
1-5: Clarify protocol in changelog entry.Mention headers to aid integrators scanning the changelog.
-gracefully recover from ECONNRESET errors when sending stream data from tasks to the server +Gracefully recover from transient network errors when streaming task metadata to the server. +Clients buffer and resume using: +- X-Client-Id on POST/HEAD +- X-Resume-From-Chunk on POST +- X-Last-Chunk-Index on HEAD +Defaults: ring buffer 1000 chunks, maxRetries 10, backoff base 1s (cap 30s).docker/config/nginx.conf (2)
17-21: SSE timeouts are too low; align with inactivity timeout and add forwarded proto/port.30s read timeout risks truncating idle SSE. Bump to >= 1m (often much higher) and forward proto/port.
listen 8443 ssl; http2 on; @@ - proxy_read_timeout 30s; - proxy_send_timeout 30s; + proxy_read_timeout 5m; # match/ exceed REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS + proxy_send_timeout 1m; @@ proxy_set_header Host $host; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header X-Forwarded-Port $server_port; proxy_set_header X-Forwarded-For $remote_addr;Also applies to: 36-43
5-11: Note on http2 keepalive behavior.Good call using keepalive_timeout; consider explicitly setting http2_max_concurrent_streams/requests if you hit limits under load.
apps/webapp/test/redisRealtimeStreams.test.ts (1)
1-1420: Consider extracting common test helpers.The test suite is comprehensive and covers all critical scenarios well. However, there's significant duplication in test setup (creating streams, encoders, ingesting data, reading responses). Consider extracting helpers like
createTestStream(chunks),ingestAndVerify(...), andreadSSEEvents(reader, count)to reduce duplication and improve maintainability.apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2)
326-401: Add safety limit to pagination loop in getLastChunkIndex.The pagination loop starting at line 335 (
while (true)) could theoretically run indefinitely if there's a bug in the pagination logic or if the Redis stream is extremely large. Add a safety limit to prevent runaway loops.Apply this diff:
async getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number> { const redis = new Redis(this.options.redis ?? {}); const streamKey = `stream:${runId}:${streamId}`; try { // Paginate through the stream from newest to oldest until we find this client's last chunk const batchSize = 100; + const maxIterations = 1000; // Prevent infinite loops (100k entries max) + let iterations = 0; let lastId = "+"; // Start from newest while (true) { + if (++iterations > maxIterations) { + this.logger.warn( + "[RedisRealtimeStreams][getLastChunkIndex] Pagination limit reached", + { streamKey, clientId, maxIterations } + ); + return -1; + } + const entries = await redis.xrevrange(streamKey, lastId, "-", "COUNT", batchSize);
136-167: Consider extracting inactivity check logic.The inactivity timeout check is duplicated in two places (lines 136-150 and 152-167). Consider extracting this into a helper function to reduce duplication and improve maintainability.
Example:
const checkInactivity = () => { const inactiveMs = Date.now() - lastDataTime; if (inactiveMs >= this.inactivityTimeoutMs) { this.logger.debug( "[RealtimeStreams][streamResponse] Closing stream due to inactivity", { streamKey, inactiveMs, threshold: this.inactivityTimeoutMs } ); controller.close(); return true; } return false; };Then use
if (checkInactivity()) return;in both locations.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (20)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yamlreferences/hello-world/src/trigger/realtime.tsis excluded by!references/**references/realtime-streams/.gitignoreis excluded by!references/**references/realtime-streams/README.mdis excluded by!references/**references/realtime-streams/next.config.tsis excluded by!references/**references/realtime-streams/package.jsonis excluded by!references/**references/realtime-streams/postcss.config.mjsis excluded by!references/**references/realtime-streams/public/file.svgis excluded by!**/*.svg,!references/**references/realtime-streams/public/globe.svgis excluded by!**/*.svg,!references/**references/realtime-streams/public/next.svgis excluded by!**/*.svg,!references/**references/realtime-streams/public/vercel.svgis excluded by!**/*.svg,!references/**references/realtime-streams/public/window.svgis excluded by!**/*.svg,!references/**references/realtime-streams/src/app/favicon.icois excluded by!**/*.ico,!references/**references/realtime-streams/src/app/globals.cssis excluded by!references/**references/realtime-streams/src/app/layout.tsxis excluded by!references/**references/realtime-streams/src/app/page.tsxis excluded by!references/**references/realtime-streams/src/components/streams.tsxis excluded by!references/**references/realtime-streams/src/trigger/streams.tsis excluded by!references/**references/realtime-streams/trigger.config.tsis excluded by!references/**references/realtime-streams/tsconfig.jsonis excluded by!references/**
📒 Files selected for processing (15)
.changeset/quiet-falcons-approve.md(1 hunks)apps/webapp/app/env.server.ts(1 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts(3 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts(2 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts(6 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts(0 hunks)apps/webapp/app/services/realtime/types.ts(2 hunks)apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts(1 hunks)apps/webapp/test/redisRealtimeStreams.test.ts(1 hunks)docker/config/nginx.conf(1 hunks)docker/config/toxiproxy.json(1 hunks)docker/docker-compose.yml(1 hunks)packages/core/src/v3/apiClient/runStream.ts(2 hunks)packages/core/src/v3/runMetadata/metadataStream.ts(7 hunks)packages/core/test/metadataStream.test.ts(1 hunks)
💤 Files with no reviewable changes (1)
- apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
🚧 Files skipped from review as they are similar to previous changes (4)
- apps/webapp/app/env.server.ts
- docker/config/toxiproxy.json
- apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts
- apps/webapp/app/services/realtime/types.ts
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
packages/core/src/v3/runMetadata/metadataStream.tspackages/core/test/metadataStream.test.tspackages/core/src/v3/apiClient/runStream.tsapps/webapp/test/redisRealtimeStreams.test.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
packages/core/src/v3/runMetadata/metadataStream.tspackages/core/test/metadataStream.test.tspackages/core/src/v3/apiClient/runStream.tsapps/webapp/test/redisRealtimeStreams.test.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
**/*.test.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Our tests are all vitest
Files:
packages/core/test/metadataStream.test.tsapps/webapp/test/redisRealtimeStreams.test.ts
**/*.{test,spec}.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.{test,spec}.{ts,tsx,js,jsx}: Unit tests must use Vitest
Tests should avoid mocks or stubs and use helpers from @internal/testcontainers when Redis or Postgres are needed
Test files live beside the files under test and should use descriptive describe and it blocks
Files:
packages/core/test/metadataStream.test.tsapps/webapp/test/redisRealtimeStreams.test.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/test/redisRealtimeStreams.test.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
{apps/webapp/**/__tests__/**/*.{ts,tsx},apps/webapp/**/*.{test,spec}.{ts,tsx}}: Do not import app/env.server.ts into tests, either directly or indirectly
Tests should only import classes/functions from files under apps/webapp/app/**/*.ts
Files:
apps/webapp/test/redisRealtimeStreams.test.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
🧬 Code graph analysis (6)
packages/core/test/metadataStream.test.ts (1)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
MetadataStream(25-437)
packages/core/src/v3/apiClient/runStream.ts (3)
packages/core/src/v3/apiClient/index.ts (1)
headers(1143-1154)packages/core/src/v3/runMetadata/manager.ts (1)
stream(307-313)packages/core/src/v3/runMetadata/index.ts (1)
stream(80-86)
apps/webapp/test/redisRealtimeStreams.test.ts (2)
internal-packages/testcontainers/src/index.ts (1)
redisTest(167-167)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
RedisRealtimeStreams(24-402)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
v1RealtimeStreams(20-20)
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (3)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
v1RealtimeStreams(20-20)apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)
createLoaderApiRoute(97-287)apps/webapp/app/db.server.ts (1)
$replica(103-106)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2)
packages/core/src/logger.ts (3)
Logger(19-139)LogLevel(15-15)error(69-79)apps/webapp/app/services/realtime/types.ts (2)
StreamIngestor(4-14)StreamResponder(17-25)
🪛 Biome (2.1.2)
packages/core/test/metadataStream.test.ts
[error] 792-795: This generator function doesn't contain yield.
(lint/correctness/useYield)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
305-315: Treat 502–504 and 408/429 as retryable; avoid retry for other 4xx.You already do this; consider documenting with a brief comment to prevent regressions.
packages/core/test/metadataStream.test.ts (1)
791-799: Fix Biome error: async generator has no yield.The
sourceparameter acceptsAsyncIterable<T>(confirmed in metadataStream.ts:10). The proposed fix adds an unreachable yield to satisfy Biome'suseYieldrule while maintaining correct semantics (no chunks generated). This pattern is appropriate for a test utility where the intent is to verify empty stream handling.- async function* generateChunks() { - // Yields nothing - return; - } + async function* generateChunks() { + // Intentionally no chunks; keep one unreachable yield to satisfy linters + if (false) { yield undefined as any; } + }apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
65-69: Original review comment is incorrect—ping logic is sound and prevents redundant pings.The code correctly avoids rapid pings through the
timeSinceLastEnqueue >= pingIntervalMscheck combined with updatinglastEnqueueTimeafter each ping. Timeline:
- At t=10s: first ping sent,
lastEnqueueTimeupdated- At t=15s:
timeSinceLastEnqueue= 5s (< 10s), no ping- At t=20s:
timeSinceLastEnqueue= 10s (>= 10s), next ping sentEven if XREAD BLOCK (5s timeout) returns multiple times in succession, the 10-second threshold ensures pings are sent at most every 10 seconds. Additionally, when data is enqueued (line 124),
lastEnqueueTimeis updated, preventing unnecessary pings during active data flow.No changes needed.
| nginx-h2: | ||
| image: nginx:1.27 | ||
| container_name: nginx-h2 | ||
| restart: unless-stopped | ||
| ports: | ||
| - "8443:8443" | ||
| volumes: | ||
| - ./config/nginx.conf:/etc/nginx/nginx.conf:ro | ||
| - ./config/certs:/etc/nginx/certs:ro |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add networks configuration to nginx-h2 service.
The nginx-h2 service is missing a networks configuration block. All other services explicitly join app_network, and nginx-h2 should as well to communicate with other services.
Apply this diff:
nginx-h2:
image: nginx:1.27
container_name: nginx-h2
restart: unless-stopped
ports:
- "8443:8443"
volumes:
- ./config/nginx.conf:/etc/nginx/nginx.conf:ro
- ./config/certs:/etc/nginx/certs:ro
+ networks:
+ - app_network📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| nginx-h2: | |
| image: nginx:1.27 | |
| container_name: nginx-h2 | |
| restart: unless-stopped | |
| ports: | |
| - "8443:8443" | |
| volumes: | |
| - ./config/nginx.conf:/etc/nginx/nginx.conf:ro | |
| - ./config/certs:/etc/nginx/certs:ro | |
| nginx-h2: | |
| image: nginx:1.27 | |
| container_name: nginx-h2 | |
| restart: unless-stopped | |
| ports: | |
| - "8443:8443" | |
| volumes: | |
| - ./config/nginx.conf:/etc/nginx/nginx.conf:ro | |
| - ./config/certs:/etc/nginx/certs:ro | |
| networks: | |
| - app_network |
🤖 Prompt for AI Agents
In docker/docker-compose.yml around lines 157 to 165 the nginx-h2 service is
missing a networks block; add a networks: section with - app_network (matching
other services) so nginx-h2 joins the app_network, ensuring consistency with the
rest of the compose file and enabling communication with other containers.
| private async connectStream(controller: ReadableStreamDefaultController): Promise<void> { | ||
| try { | ||
| const headers: Record<string, string> = { | ||
| Accept: "text/event-stream", | ||
| ...this.options.headers, | ||
| }, | ||
| signal: this.options.signal, | ||
| }).then((response) => { | ||
| }; | ||
|
|
||
| // Include Last-Event-ID header if we're resuming | ||
| if (this.lastEventId) { | ||
| headers["Last-Event-ID"] = this.lastEventId; | ||
| } | ||
|
|
||
| const response = await fetch(this.url, { | ||
| headers, | ||
| signal: this.options.signal, | ||
| }); | ||
|
|
||
| if (!response.ok) { | ||
| throw ApiError.generate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t retry non‑retryable 4xx; surface error instead of burning attempts.
Currently any non-OK response leads to retries. Avoid retrying 4xx (except 408/429).
- if (!response.ok) {
- throw ApiError.generate(
+ if (!response.ok) {
+ const err = ApiError.generate(
response.status,
{},
"Could not subscribe to stream",
Object.fromEntries(response.headers)
- );
+ );
+ // Retry only on 408/429/5xx. Abort for other 4xx.
+ if (err.status && err.status < 500 && err.status !== 408 && err.status !== 429) {
+ throw err;
+ }
+ // fall through to catch; retryConnection will handle
+ throw err;
}And in catch, short‑circuit:
- // Retry on error
- await this.retryConnection(controller, error as Error);
+ const e = error as any;
+ if (e?.status && e.status < 500 && e.status !== 408 && e.status !== 429) {
+ controller.error(e);
+ return;
+ }
+ await this.retryConnection(controller, e);Also applies to: 271-273
…ing resumable streams. We also will now send invisible "ping" packets to keep connected clients alive when there are no real data packets to send, which will be especially helpful to older clients
d2179cd to
3258134
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
docker/docker-compose.yml (1)
157-165: Add networks configuration to nginx-h2 service.The
nginx-h2service is missing anetworksconfiguration block. All other services explicitly joinapp_network, and nginx-h2 should as well to communicate with other services in the compose environment.Apply this diff:
nginx-h2: image: nginx:1.27 container_name: nginx-h2 restart: unless-stopped ports: - "8443:8443" volumes: - ./config/nginx.conf:/etc/nginx/nginx.conf:ro - ./config/certs:/etc/nginx/certs:ro + networks: + - app_networkapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (2)
15-32: HEAD handling inside action won’t run in Remix; move to loader.Remix routes HEAD to the loader, not the action. Current branch is dead, breaking resume discovery.
export async function action({ request, params }: ActionFunctionArgs) { const $params = ParamsSchema.parse(params); - // Extract client ID from header, default to "default" if not provided - const clientId = request.headers.get("X-Client-Id") || "default"; - - // Handle HEAD request to get last chunk index for this client - if (request.method === "HEAD") { - const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex( - $params.runId, - $params.streamId, - clientId - ); - - return new Response(null, { - status: 200, - headers: { - "X-Last-Chunk-Index": lastChunkIndex.toString(), - }, - }); - } + const clientId = request.headers.get("X-Client-Id") ?? "default";And add a HEAD branch in the loader before streaming:
- async ({ params, request, resource: run, authentication }) => { - // Get Last-Event-ID header for resuming from a specific position + async ({ params, request, resource: run, authentication }) => { + if (request.method === "HEAD") { + const clientId = request.headers.get("X-Client-Id") ?? "default"; + const lastChunkIndex = await v1RealtimeStreams.getLastChunkIndex( + run.friendlyId, + params.streamId, + clientId + ); + return new Response(null, { + status: 200, + headers: { "X-Last-Chunk-Index": String(lastChunkIndex) }, + }); + } + // Get Last-Event-ID header for resuming from a specific position#!/bin/bash # Verify HEAD is only handled in loader for this route rg -n -C2 'request\.method\s*===\s*"HEAD"' apps/webapp/app/routes/realtime.v1.streams.\$runId.\$streamId.ts
38-40: Validate X-Resume-From-Chunk; guard against NaN/negative.
parseIntcan yieldNaN; negative values should be rejected.- const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); - const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; + const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk"); + let resumeFromChunkNumber: number | undefined; + if (resumeFromChunkHeader !== null) { + const n = Number(resumeFromChunkHeader); + if (!Number.isInteger(n) || n < 0) { + return new Response("Invalid X-Resume-From-Chunk", { status: 400 }); + } + resumeFromChunkNumber = n; + }
🧹 Nitpick comments (6)
apps/webapp/app/services/realtime/types.ts (2)
1-1: Remove unused import.
AuthenticatedEnvironmentisn’t used anymore. Drop it to avoid dead code.-import { AuthenticatedEnvironment } from "../apiAuth.server"; +// (removed unused import)
4-14: Prefertypealiases overinterfaceper guidelines.Our TS style favors
typefor shapes here; also keeps public surface consistent across packages. As per coding guidelines.-export interface StreamIngestor { - ingestData( - stream: ReadableStream<Uint8Array>, - runId: string, - streamId: string, - clientId: string, - resumeFromChunk?: number - ): Promise<Response>; - - getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number>; -} +export type StreamIngestor = { + ingestData( + stream: ReadableStream<Uint8Array>, + runId: string, + streamId: string, + clientId: string, + resumeFromChunk?: number + ): Promise<Response>; + getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number>; +}; @@ -export interface StreamResponder { - streamResponse( - request: Request, - runId: string, - streamId: string, - signal: AbortSignal, - lastEventId?: string - ): Promise<Response>; -} +export type StreamResponder = { + streamResponse( + request: Request, + runId: string, + streamId: string, + signal: AbortSignal, + lastEventId?: string + ): Promise<Response>; +};Also applies to: 16-25
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (4)
304-306: Keep stream TTL warm during long ingests.TTL is only set at the end; long-running streams may never expire if interrupted. Periodically refresh TTL during writes.
await redis.xadd( streamKey, @@ value ); + // Periodically refresh TTL to ensure cleanup even for long-lived streams + if (currentChunkIndex % 50 === 0) { + await redis.expire(streamKey, env.REALTIME_STREAM_TTL); + } + currentChunkIndex++; @@ - // Set TTL for cleanup when stream is done + // Final TTL set on completion await redis.expire(streamKey, env.REALTIME_STREAM_TTL);
356-366: HardenchunkIndexparsing.Ensure the parsed value is an integer ≥ 0 before using it.
- if (fields[i] === "chunkIndex") { - chunkIndex = parseInt(fields[i + 1], 10); - } + if (fields[i] === "chunkIndex") { + const n = Number(fields[i + 1]); + if (Number.isInteger(n) && n >= 0) { + chunkIndex = n; + } + }
235-240: Disable proxy buffering for SSE.Add
X-Accel-Buffering: noto avoid Nginx buffering SSE.return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", + "X-Accel-Buffering": "no", }, });
3-3: Remove unused import.
LineTransformStreamisn’t used after the refactor.-import { LineTransformStream } from "./utils.server"; +// (removed unused import)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (19)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yamlreferences/realtime-streams/.gitignoreis excluded by!references/**references/realtime-streams/README.mdis excluded by!references/**references/realtime-streams/next.config.tsis excluded by!references/**references/realtime-streams/package.jsonis excluded by!references/**references/realtime-streams/postcss.config.mjsis excluded by!references/**references/realtime-streams/public/file.svgis excluded by!**/*.svg,!references/**references/realtime-streams/public/globe.svgis excluded by!**/*.svg,!references/**references/realtime-streams/public/next.svgis excluded by!**/*.svg,!references/**references/realtime-streams/public/vercel.svgis excluded by!**/*.svg,!references/**references/realtime-streams/public/window.svgis excluded by!**/*.svg,!references/**references/realtime-streams/src/app/favicon.icois excluded by!**/*.ico,!references/**references/realtime-streams/src/app/globals.cssis excluded by!references/**references/realtime-streams/src/app/layout.tsxis excluded by!references/**references/realtime-streams/src/app/page.tsxis excluded by!references/**references/realtime-streams/src/components/streams.tsxis excluded by!references/**references/realtime-streams/src/trigger/streams.tsis excluded by!references/**references/realtime-streams/trigger.config.tsis excluded by!references/**references/realtime-streams/tsconfig.jsonis excluded by!references/**
📒 Files selected for processing (8)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts(3 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts(6 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts(0 hunks)apps/webapp/app/services/realtime/types.ts(2 hunks)apps/webapp/test/redisRealtimeStreams.test.ts(1 hunks)docker/config/nginx.conf(1 hunks)docker/docker-compose.yml(1 hunks)packages/core/src/v3/apiClient/runStream.ts(2 hunks)
💤 Files with no reviewable changes (1)
- apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
🚧 Files skipped from review as they are similar to previous changes (3)
- packages/core/src/v3/apiClient/runStream.ts
- docker/config/nginx.conf
- apps/webapp/test/redisRealtimeStreams.test.ts
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/services/realtime/types.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
🧬 Code graph analysis (2)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2)
packages/core/src/logger.ts (3)
Logger(19-139)LogLevel(15-15)error(69-79)apps/webapp/app/services/realtime/types.ts (2)
StreamIngestor(4-14)StreamResponder(17-25)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
v1RealtimeStreams(20-20)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (2)
docker/docker-compose.yml (1)
144-155: LGTM!The toxiproxy service is well-configured with proper network integration, volume mounts for configuration, ports for both the proxy and API, and a clear command invocation.
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
82-91: SSE resume wiring confirmed correct.Code inspection verifies the implementation: Last-Event-ID header is extracted and passed to
streamResponse(), which uses it to resume from the correct Redis stream position viaredis.xread(lastId). SSE event IDs are properly set with the Redis stream ID (line 219), enabling client-side resume capability. Therequest.signalis correctly threaded for abort handling. Test coverage for resumption exists ("Should support resuming from Last-Event-ID").Manual end-to-end verification with curl can be performed in your environment with actual run and stream IDs.
| // Write each chunk with its index and clientId | ||
| this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", { | ||
| streamKey, | ||
| runId, | ||
| clientId, | ||
| chunkIndex: currentChunkIndex, | ||
| resumeFromChunk: startChunk, | ||
| value, | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid logging raw chunk data; potential PII leakage.
value may include sensitive payloads. Log size/indices instead.
- this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", {
+ this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", {
streamKey,
runId,
clientId,
chunkIndex: currentChunkIndex,
resumeFromChunk: startChunk,
- value,
+ size: value.length,
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Write each chunk with its index and clientId | |
| this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", { | |
| streamKey, | |
| runId, | |
| clientId, | |
| chunkIndex: currentChunkIndex, | |
| resumeFromChunk: startChunk, | |
| value, | |
| }); | |
| // Write each chunk with its index and clientId | |
| this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", { | |
| streamKey, | |
| runId, | |
| clientId, | |
| chunkIndex: currentChunkIndex, | |
| resumeFromChunk: startChunk, | |
| size: value.length, | |
| }); |
🤖 Prompt for AI Agents
In apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts around lines
277 to 285, the debug log currently includes the raw chunk payload in the value
field which can leak PII; replace that raw payload with non-sensitive metadata
such as the chunk size (byte length or string length), chunk index, and any
resume/start indices, and remove or redact the actual content. Update the logger
call to omit value (or set valueSize/shape instead) so only safe metadata is
logged.
… the stream keys. Also added a new db:seed script to seed a fresh database for local development with reference projects setup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
packages/core/src/v3/runMetadata/manager.ts (2)
409-431: Timeout handling is ineffective; streams aren’t aborted on timeout.The timeout Promise resolves, so catch never runs. Function returns even if streams are still running.
Apply:
public async waitForAllStreams(timeout: number = 60_000): Promise<void> { if (this.activeStreams.size === 0) { return; } const promises = Array.from(this.activeStreams.values()).map((stream) => stream.wait()); try { - await Promise.race([ - Promise.allSettled(promises), - new Promise<void>((resolve, _) => setTimeout(() => resolve(), timeout)), - ]); + const timeoutErr = new Promise<never>((_, reject) => + setTimeout(() => reject(new Error(`Timed out after ${timeout}ms waiting for streams`)), timeout) + ); + await Promise.race([Promise.allSettled(promises), timeoutErr]); } catch (error) { console.error("Error waiting for streams to finish:", error); // If we time out, abort all remaining streams - for (const [key, promise] of this.activeStreams.entries()) { - // We can add abort logic here if needed - this.activeStreams.delete(key); - } + for (const key of this.activeStreams.keys()) { + // Optional: add AbortSignal support to StreamInstance in future to cancel here + this.activeStreams.delete(key); + } throw error; } }
362-369: Update$$streamswhen a stream is created, or use local tracking for the guard check.The guard check at line 362 reads
$$streamsbut the manager never updates it when creating streams. Streams are tracked locally in theactiveStreamsMap (line 397), making theMAXIMUM_TOTAL_STREAMScheck ineffective unless$$streamsis maintained by an external mechanism.Replace the check with:
if (this.activeStreams.size >= MAXIMUM_TOTAL_STREAMS) {Or ensure
$$streamsis updated when a stream is added by calling:this.set("$$streams", [...(this.store?.$$streams ?? []), key]);after creating the stream.
packages/trigger-sdk/src/v3/shared.ts (3)
433-455: Accept TriggerApiRequestOptions to allow clientConfig propagation.Public triggerAndWait() uses ApiRequestOptions, so clientConfig can’t be passed through.
Apply:
-export function triggerAndWait<TTask extends AnyTask>( +export function triggerAndWait<TTask extends AnyTask>( id: TaskIdentifier<TTask>, payload: TaskPayload<TTask>, options?: TriggerAndWaitOptions, - requestOptions?: ApiRequestOptions + requestOptions?: TriggerApiRequestOptions ): TaskRunPromise<TaskIdentifier<TTask>, TaskOutput<TTask>> {
481-492: Same issue for batchTriggerAndWait(): widen requestOptions.Use TriggerApiRequestOptions so callers can pass clientConfig.
Apply:
-export async function batchTriggerAndWait<TTask extends AnyTask>( +export async function batchTriggerAndWait<TTask extends AnyTask>( id: TaskIdentifier<TTask>, items: Array<BatchItem<TaskPayload<TTask>>>, options?: BatchTriggerAndWaitOptions, - requestOptions?: ApiRequestOptions + requestOptions?: TriggerApiRequestOptions ): Promise<BatchResult<TaskIdentifier<TTask>, TaskOutput<TTask>>> {
1275-1277: Span attribute guard uses Array.isArray on runCount (number).This condition never fires; use typeof check.
Apply:
- if ("runCount" in body && Array.isArray(body.runCount)) { + if ("runCount" in body && typeof (body as any).runCount === "number") {
♻️ Duplicate comments (7)
packages/core/src/v3/apiClient/runStream.ts (1)
206-214: Don’t retry non‑retryable 4xx; surface error.Short‑circuit retries for 4xx except 408/429 to avoid burning attempts.
- if (!response.ok) { - throw ApiError.generate( + if (!response.ok) { + const err = ApiError.generate( response.status, {}, "Could not subscribe to stream", Object.fromEntries(response.headers) - ); + ); + if (err.status && err.status < 500 && err.status !== 408 && err.status !== 429) { + throw err; + } + throw err; }And in catch:
- // Retry on error - await this.retryConnection(controller, error as Error); + const e = error as any; + if (e?.status && e.status < 500 && e.status !== 408 && e.status !== 429) { + controller.error(e); + return; + } + await this.retryConnection(controller, e);Also applies to: 260-269
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (2)
96-98: Validate X-Resume-From-Chunk to avoid NaN/negatives.parseInt can yield NaN; guard and 400 on invalid header.
Apply:
- const resumeFromChunk = request.headers.get("X-Resume-From-Chunk"); - const resumeFromChunkNumber = resumeFromChunk ? parseInt(resumeFromChunk, 10) : undefined; + const resumeFromChunkHeader = request.headers.get("X-Resume-From-Chunk"); + const n = resumeFromChunkHeader !== null ? Number(resumeFromChunkHeader) : undefined; + if (n !== undefined && (!Number.isInteger(n) || n < 0)) { + return new Response("Invalid X-Resume-From-Chunk", { status: 400 }); + } + const resumeFromChunkNumber = n;
112-139: Add authorization and allowJWT parity to the loader.Align with the sibling SSE route: enforce scopes/resources and enable allowJWT if intended.
Suggested:
- { - params: ParamsSchema, - allowJWT: false, - corsStrategy: "none", - findResource: async (params, authentication) => { + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "none", + authorization: { + action: "read", + resource: (run) => ({ + runs: run.friendlyId, + tags: run.runTags, + batch: run.batch?.friendlyId, + tasks: run.taskIdentifier, + }), + superScopes: ["read:runs", "read:all", "admin"], + }, + findResource: async (params, authentication) => {Also extend select/include to provide fields referenced by authorization if not already present. Based on coding guidelines.
packages/core/src/v3/runMetadata/metadataStream.ts (3)
80-103: Handle reader errors and always release lock.Same issue previously flagged: unhandled rejection and locked reader on error. Add finally + catch to avoid leaks.
Apply:
private startBuffering(): void { this.streamReader = this.serverStream.getReader(); this.bufferReaderTask = (async () => { try { let chunkIndex = 0; while (true) { const { done, value } = await this.streamReader!.read(); if (done) { this.streamComplete = true; break; } // Add to ring buffer this.addToRingBuffer(chunkIndex, value); this.highestBufferedIndex = chunkIndex; chunkIndex++; } - } catch (error) { - throw error; - } - })(); + } catch (error) { + this.streamComplete = true; + } finally { + try { this.streamReader?.releaseLock(); } catch {} + } + })().catch(() => {}); }
152-169: Timeout should set ETIMEDOUT to trigger retry logic.Previously raised; same fix applies.
Apply:
- req.on("timeout", async () => { + req.on("timeout", async () => { // Timeout is retryable if (this.retryCount < this.maxRetries) { this.retryCount++; const delayMs = this.calculateBackoffDelay(); await this.delay(delayMs); // Query server to find where to resume const serverLastChunk = await this.queryServerLastChunkIndex(); const resumeFromChunk = serverLastChunk + 1; - resolve(this.makeRequest(resumeFromChunk)); + const err: any = new Error("Request timed out"); + err.code = "ETIMEDOUT"; + try { req.destroy(err); } catch {} + resolve(this.makeRequest(resumeFromChunk)); return; } - reject(new Error("Request timed out")); + const err: any = new Error("Request timed out"); + err.code = "ETIMEDOUT"; + reject(err); });
126-147: Stop old writer before retry and honor backpressure.As noted earlier, the writer loop can continue after errors and req.write() ignores backpressure; add a closed flag and await 'drain'.
Apply:
- req.on("error", async (error) => { + let requestClosed = false; + const markClosed = () => { requestClosed = true; }; + req.once("error", markClosed); + req.once("close", markClosed); + req.once("finish", markClosed); + + req.on("error", async (error) => { // Check if this is a retryable connection error if (this.isRetryableError(error)) { if (this.retryCount < this.maxRetries) { this.retryCount++; const delayMs = this.calculateBackoffDelay(); await this.delay(delayMs); // Query server to find out what the last chunk it received was const serverLastChunk = await this.queryServerLastChunkIndex(); // Resume from the next chunk after what the server has const resumeFromChunk = serverLastChunk + 1; - resolve(this.makeRequest(resumeFromChunk)); + try { req.destroy(error as any); } catch {} + requestClosed = true; + resolve(this.makeRequest(resumeFromChunk)); return; } } reject(error); }); @@ const processStream = async () => { try { let lastSentIndex = startFromChunk - 1; while (true) { + if (requestClosed) break; // Send all chunks that are available in buffer while (lastSentIndex < this.highestBufferedIndex) { lastSentIndex++; - const chunk = this.ringBuffer.find((c) => c.index === lastSentIndex); + if (requestClosed) break; + const slot = this.ringBuffer[lastSentIndex % this.maxBufferSize]; + const chunk = slot && slot.index === lastSentIndex ? slot : undefined; if (chunk) { const stringified = JSON.stringify(chunk.data) + "\n"; - req.write(stringified); + const ok = req.write(stringified); + if (!ok) { + await new Promise<void>((r) => req.once("drain", r)); + if (requestClosed) break; + } this.currentChunkIndex = lastSentIndex + 1; } } // If stream is complete and we've sent all buffered chunks, we're done - if (this.streamComplete && lastSentIndex >= this.highestBufferedIndex) { + if ((this.streamComplete && lastSentIndex >= this.highestBufferedIndex) || requestClosed) { req.end(); break; } // Wait a bit for more chunks to be buffered await this.delay(10); } } catch (error) { reject(error); } };Also applies to: 217-233, 234-242
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
283-291: CRITICAL: Raw chunk data still logged (PII leakage risk).This was flagged in a previous review but remains unresolved. The
valuefield may contain sensitive user data (PII). Logging raw chunk payloads violates privacy best practices.Apply the previously suggested fix:
this.logger.debug("[RedisRealtimeStreams][ingestData] Writing chunk", { streamKey, runId, clientId, chunkIndex: currentChunkIndex, resumeFromChunk: startChunk, - value, + size: value.length, });
🧹 Nitpick comments (19)
apps/webapp/seed.mts (1)
119-121: Update instructions to reflect actual project refs from database.The hardcoded instructions may become stale if project refs change. Consider querying the actual refs from the created projects and displaying them dynamically:
- console.log("\n⚠️ Note: Update the .env files in d3-chat and realtime-streams with:"); - console.log(` - d3-chat: TRIGGER_PROJECT_REF=proj_cdmymsrobxmcgjqzhdkq`); - console.log(` - realtime-streams: TRIGGER_PROJECT_REF=proj_klxlzjnzxmbgiwuuwhvb`); + console.log("\n⚠️ Note: Update the .env files with the following project refs:"); + for (const projectConfig of referenceProjects) { + const proj = await prisma.project.findUnique({ + where: { externalRef: projectConfig.externalRef }, + select: { name: true, externalRef: true }, + }); + if (proj) { + console.log(` - ${proj.name}: TRIGGER_PROJECT_REF=${proj.externalRef}`); + } + }internal-packages/database/prisma/schema.prisma (1)
752-756: Constrain stream version via enum (optional now, safer long-term)Using a string for realtimeStreamsVersion is flexible but permits invalid values. Prefer a Prisma enum (e.g., V1 | S2) with a default, then migrate.
Example:
+enum RealtimeStreamsVersion { + V1 + S2 +} ... - realtimeStreamsVersion String @default("v1") + realtimeStreamsVersion RealtimeStreamsVersion @default(V1)If you plan to filter by realtimeStreams, consider adding an index later (e.g., GIN) when the query need arises.
apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts (1)
36-39: Validate x-trigger-realtime-streams-version against allowed valuesRestrict to known versions to avoid typos and unexpected DB values; keep defaulting behavior via Prisma.
-export const HeadersSchema = z.object({ +export const HeadersSchema = z.object({ ... - "x-trigger-realtime-streams-version": z.string().nullish(), + "x-trigger-realtime-streams-version": z + .enum(["v1", "s2"]) + .nullish(), }); ... "x-trigger-realtime-streams-version": realtimeStreamsVersion, ... - realtimeStreamsVersion: realtimeStreamsVersion ?? undefined, + realtimeStreamsVersion: realtimeStreamsVersion ?? undefined,Optionally, lower-case before enum if clients may vary casing:
const normalized = realtimeStreamsVersion?.toLowerCase() as "v1" | "s2" | undefined;Also applies to: 67-68, 113-114, 129-131
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (2)
19-20: Provide a sane default for inactivityTimeoutMsIf the env var is unset, pass a default (e.g., 5 minutes) to avoid surprising behavior.
- inactivityTimeoutMs: env.REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS, + inactivityTimeoutMs: env.REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS ?? 5 * 60_000,As per coding guidelines (apps/webapp), env access is correct.
25-51: Avoid recreating S2RealtimeStreams on every call (optional)S2RealtimeStreams is stateless-ish, but repeated instantiation is unnecessary. Cache per environment to reduce churn.
+const s2Instances = new Map<string, S2RealtimeStreams>(); export function getRealtimeStreamInstance( environment: AuthenticatedEnvironment, streamVersion: string ): StreamIngestor & StreamResponder { if (streamVersion === "v1") { return v1RealtimeStreams; } else { if (env.REALTIME_STREAMS_S2_BASIN && env.REALTIME_STREAMS_S2_ACCESS_TOKEN) { - return new S2RealtimeStreams({ + const key = `${environment.organization.id}:${environment.id}`; + let inst = s2Instances.get(key); + if (!inst) { + inst = new S2RealtimeStreams({ basin: env.REALTIME_STREAMS_S2_BASIN, accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN, streamPrefix: [ "org", environment.organization.id, "env", environment.slug, environment.id, ].join("/"), logLevel: env.REALTIME_STREAMS_S2_LOG_LEVEL, flushIntervalMs: env.REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS, maxRetries: env.REALTIME_STREAMS_S2_MAX_RETRIES, - }); + }); + s2Instances.set(key, inst); + } + return inst; } return v1RealtimeStreams; } }Optionally plumb S2 heartbeat/wait settings from env into S2RealtimeStreams for full configurability.
apps/webapp/app/services/realtime/types.ts (2)
3-7: Clarify initialize semantics and return type usage.Consider documenting whether initializeStream is idempotent and when responseHeaders should be sent (e.g., only on first create vs every PUT). Add brief JSDoc to avoid divergent implementations.
11-16: Be explicit about resume index semantics.Define whether resumeFromChunk is inclusive or exclusive and guarantee non-negative integers. Types alone can’t enforce this; add JSDoc to standardize implementors and callers.
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (3)
57-72: Make stream registration idempotent (avoid duplicate push).Repeated PUTs can duplicate streamId in realtimeStreams. Prefer a uniqueness check before push or compute a unique set and set it.
Example approach:
- const updatedRun = await prisma.taskRun.update({ + const updatedRun = await prisma.$transaction(async (tx) => { + const current = await tx.taskRun.findUnique({ + where: { friendlyId_runtimeEnvironmentId: { friendlyId: targetId, runtimeEnvironmentId: authentication.environment.id } }, + select: { realtimeStreams: true, realtimeStreamsVersion: true }, + }); + const next = Array.from(new Set([...(current?.realtimeStreams ?? []), params.streamId])); + return tx.taskRun.update({ where: { friendlyId: targetId, runtimeEnvironmentId: authentication.environment.id, }, data: { - realtimeStreams: { - push: params.streamId, - }, + realtimeStreams: { set: next }, }, select: { realtimeStreamsVersion: true, }, - }); + }); + });
88-94: Restrict ingestion to POST (or specific methods).Currently any non‑PUT method routes to ingest; explicitly require POST to reduce surprises.
- } else { + } else if (request.method === "POST") { // ... - } + } else { + return new Response("Method Not Allowed", { status: 405 }); + }
89-91: Sanitize X-Client-Id.Constrain length and charset to prevent key/path abuse downstream (e.g., Redis key construction).
- const clientId = request.headers.get("X-Client-Id") || "default"; + const rawClientId = request.headers.get("X-Client-Id") || "default"; + const clientId = /^[A-Za-z0-9:_\-]{1,128}$/.test(rawClientId) ? rawClientId : "default";packages/core/src/v3/apiClient/runStream.ts (2)
239-253: Close the controller on normal completion.When done is true, close the controller to signal EOF to consumers.
- if (done) { - break; - } + if (done) { + controller.close(); + break; + }
286-299: Expose retry/backoff as options.Make maxRetries/retryDelayMs configurable via constructor options to harmonize with server/client defaults.
- private maxRetries = 5; - private retryDelayMs = 1000; + private maxRetries: number; + private retryDelayMs: number; constructor( private url: string, - private options: { headers?: Record<string, string>; signal?: AbortSignal } - ) {} + private options: { headers?: Record<string, string>; signal?: AbortSignal; maxRetries?: number; baseDelayMs?: number } + ) { + this.maxRetries = options.maxRetries ?? 5; + this.retryDelayMs = options.baseDelayMs ?? 1000; + }And plumb through from SSEStreamSubscriptionFactory.
apps/webapp/app/env.server.ts (2)
201-205: Default inactivity timeout: OK.Sane default (5m). Consider documenting expected head/heartbeat behavior in the implementing stream responder.
1208-1215: Validate S2 config cohesion.If any S2 var is set, require both BASIN and ACCESS_TOKEN; use a small refine or discriminated union to prevent half‑configured deployments.
Example:
-const EnvironmentSchema = z.object({ /* ... */ - REALTIME_STREAMS_S2_BASIN: z.string().optional(), - REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(), - /* ... */ -}) +const S2Schema = z.preprocess( + (v) => v, + z.union([ + z.object({ REALTIME_STREAMS_S2_BASIN: z.string(), REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string() }), + z.object({ REALTIME_STREAMS_S2_BASIN: z.undefined().optional(), REALTIME_STREAMS_S2_ACCESS_TOKEN: z.undefined().optional() }), + ]) +); +const EnvironmentSchema = z.object({ /* ... */ }).and(S2Schema);packages/core/src/v3/runMetadata/metadataStream.ts (2)
223-231: Use O(1) ring buffer access; remove unused helpers.The per‑chunk search is O(n). You already maintain a ring; index by modulo and drop unused getChunksFromBuffer()/bufferStartIndex if not needed.
Apply:
- const chunk = this.ringBuffer.find((c) => c.index === lastSentIndex); + const slot = this.ringBuffer[lastSentIndex % this.maxBufferSize]; + const chunk = slot && slot.index === lastSentIndex ? slot : undefined;And consider deleting the unused getChunksFromBuffer() and bufferStartIndex fields to reduce complexity.
Also applies to: 323-335, 337-349
27-44: Unused AbortController field.private controller is never used; remove or wire it to support cancellation.
apps/webapp/app/services/realtime/s2realtimeStreams.server.ts (3)
137-137: Remove citation artifact from comment.The comment contains a leftover citation reference that should be removed.
- wait: this.s2WaitSeconds, // long polling for new data. :contentReference[oaicite:6]{index=6} + wait: this.s2WaitSeconds, // long polling for new data
225-225: Remove citation artifact from comment.The comment contains a leftover citation reference that should be removed.
- // GET /v1/streams/{stream}/records?... (supports wait= for long-poll; linearizable reads). :contentReference[oaicite:9]{index=9} + // GET /v1/streams/{stream}/records?... (supports wait= for long-poll; linearizable reads)
251-257: Consider clarifying the lastEventId format handling.The method defensively handles Redis-style IDs (
timestamp-sequence) by taking the leading digits, but S2 streams use plain sequence numbers. Since S2 and Redis streams don't interoperate, this fallback may never occur.Consider adding a comment explaining the format:
private parseLastEventId(lastEventId?: string): number | undefined { if (!lastEventId) return undefined; - // tolerate formats like "1699999999999-5" (take leading digits) + // S2 event IDs are plain sequence numbers (e.g., "42") + // Defensively handle Redis format ("timestamp-sequence") by taking leading digits const digits = lastEventId.split("-")[0]; const n = Number(digits); return Number.isFinite(n) && n >= 0 ? n + 1 : undefined; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (13)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yamlreferences/realtime-streams/PERFORMANCE_TESTING.mdis excluded by!references/**references/realtime-streams/TESTING.mdis excluded by!references/**references/realtime-streams/package.jsonis excluded by!references/**references/realtime-streams/src/app/actions.tsis excluded by!references/**references/realtime-streams/src/app/globals.cssis excluded by!references/**references/realtime-streams/src/app/page.tsxis excluded by!references/**references/realtime-streams/src/app/performance/[runId]/page.tsxis excluded by!references/**references/realtime-streams/src/app/runs/[runId]/page.tsxis excluded by!references/**references/realtime-streams/src/components/performance-monitor.tsxis excluded by!references/**references/realtime-streams/src/components/streams.tsxis excluded by!references/**references/realtime-streams/src/components/trigger-button.tsxis excluded by!references/**references/realtime-streams/src/trigger/streams.tsis excluded by!references/**
📒 Files selected for processing (36)
apps/webapp/app/env.server.ts(2 hunks)apps/webapp/app/models/organization.server.ts(1 hunks)apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts(4 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts(2 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts(2 hunks)apps/webapp/app/runEngine/services/triggerTask.server.ts(1 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts(6 hunks)apps/webapp/app/services/realtime/s2realtimeStreams.server.ts(1 hunks)apps/webapp/app/services/realtime/types.ts(2 hunks)apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts(2 hunks)apps/webapp/app/services/realtimeClient.server.ts(1 hunks)apps/webapp/app/v3/services/triggerTask.server.ts(1 hunks)apps/webapp/package.json(2 hunks)apps/webapp/prisma/seed.ts(0 hunks)apps/webapp/prisma/seedCloud.ts(0 hunks)apps/webapp/seed.mts(1 hunks)internal-packages/database/prisma/migrations/20251020121543_add_realtime_streams_version_to_task_run/migration.sql(1 hunks)internal-packages/database/prisma/migrations/20251020163612_add_realtime_streams_to_task_run/migration.sql(1 hunks)internal-packages/database/prisma/schema.prisma(1 hunks)internal-packages/run-engine/src/engine/index.ts(2 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts(2 hunks)internal-packages/run-engine/src/engine/types.ts(1 hunks)packages/cli-v3/src/entryPoints/dev-run-worker.ts(1 hunks)packages/core/package.json(1 hunks)packages/core/src/v3/apiClient/index.ts(7 hunks)packages/core/src/v3/apiClient/runStream.ts(6 hunks)packages/core/src/v3/apiClientManager/index.ts(1 hunks)packages/core/src/v3/apiClientManager/types.ts(2 hunks)packages/core/src/v3/runMetadata/manager.ts(3 hunks)packages/core/src/v3/runMetadata/metadataStream.ts(6 hunks)packages/core/src/v3/runMetadata/s2MetadataStream.ts(1 hunks)packages/core/src/v3/runMetadata/types.ts(1 hunks)packages/core/src/v3/schemas/api.ts(2 hunks)packages/core/src/v3/schemas/common.ts(1 hunks)packages/trigger-sdk/src/v3/shared.ts(9 hunks)turbo.json(1 hunks)
💤 Files with no reviewable changes (2)
- apps/webapp/prisma/seed.ts
- apps/webapp/prisma/seedCloud.ts
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/v3/services/triggerTask.server.tsapps/webapp/app/services/realtimeClient.server.tspackages/core/src/v3/runMetadata/types.tsinternal-packages/run-engine/src/engine/types.tspackages/core/src/v3/schemas/api.tsinternal-packages/run-engine/src/engine/index.tsapps/webapp/app/runEngine/services/triggerTask.server.tspackages/core/src/v3/apiClientManager/types.tspackages/core/src/v3/apiClient/runStream.tspackages/cli-v3/src/entryPoints/dev-run-worker.tsinternal-packages/run-engine/src/engine/systems/runAttemptSystem.tsapps/webapp/app/services/realtime/types.tspackages/core/src/v3/schemas/common.tsapps/webapp/app/routes/api.v1.tasks.$taskId.trigger.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tspackages/core/src/v3/apiClientManager/index.tsapps/webapp/app/models/organization.server.tsapps/webapp/app/services/realtime/s2realtimeStreams.server.tspackages/core/src/v3/runMetadata/metadataStream.tspackages/core/src/v3/runMetadata/s2MetadataStream.tsapps/webapp/app/env.server.tspackages/trigger-sdk/src/v3/shared.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tspackages/core/src/v3/apiClient/index.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tspackages/core/src/v3/runMetadata/manager.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/v3/services/triggerTask.server.tsapps/webapp/app/services/realtimeClient.server.tspackages/core/src/v3/runMetadata/types.tspackages/core/src/v3/schemas/api.tsapps/webapp/app/runEngine/services/triggerTask.server.tspackages/core/src/v3/apiClientManager/types.tspackages/core/src/v3/apiClient/runStream.tsapps/webapp/app/services/realtime/types.tspackages/core/src/v3/schemas/common.tsapps/webapp/app/routes/api.v1.tasks.$taskId.trigger.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tspackages/core/src/v3/apiClientManager/index.tsapps/webapp/app/models/organization.server.tsapps/webapp/app/services/realtime/s2realtimeStreams.server.tspackages/core/src/v3/runMetadata/metadataStream.tspackages/core/src/v3/runMetadata/s2MetadataStream.tsapps/webapp/app/env.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tspackages/core/src/v3/apiClient/index.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.tspackages/core/src/v3/runMetadata/manager.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/v3/services/triggerTask.server.tsapps/webapp/app/services/realtimeClient.server.tsapps/webapp/app/runEngine/services/triggerTask.server.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/routes/api.v1.tasks.$taskId.trigger.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tsapps/webapp/app/models/organization.server.tsapps/webapp/app/services/realtime/s2realtimeStreams.server.tsapps/webapp/app/env.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/v3/services/triggerTask.server.tsapps/webapp/app/services/realtimeClient.server.tsapps/webapp/app/runEngine/services/triggerTask.server.tsapps/webapp/app/routes/api.v1.tasks.$taskId.trigger.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tsapps/webapp/app/models/organization.server.tsapps/webapp/app/services/realtime/s2realtimeStreams.server.tsapps/webapp/app/env.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/v3/services/triggerTask.server.tsapps/webapp/app/services/realtimeClient.server.tsapps/webapp/app/runEngine/services/triggerTask.server.tsapps/webapp/app/services/realtime/types.tsapps/webapp/app/routes/api.v1.tasks.$taskId.trigger.tsapps/webapp/app/services/realtime/v1StreamsGlobal.server.tsapps/webapp/app/models/organization.server.tsapps/webapp/app/services/realtime/s2realtimeStreams.server.tsapps/webapp/app/env.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/services/realtime/redisRealtimeStreams.server.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
🧠 Learnings (2)
📚 Learning: 2025-08-18T10:07:17.368Z
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-08-18T10:07:17.368Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : When triggering a task multiple times in a loop from inside another task, use batchTrigger()/batchTriggerAndWait() instead of per-item trigger() calls
Applied to files:
packages/trigger-sdk/src/v3/shared.ts
📚 Learning: 2025-08-18T10:07:17.368Z
Learnt from: CR
PR: triggerdotdev/trigger.dev#0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-08-18T10:07:17.368Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use triggerAndWait() only from within a task context (not from generic app code) and handle result.ok or use unwrap() with error handling
Applied to files:
packages/trigger-sdk/src/v3/shared.ts
🧬 Code graph analysis (14)
packages/core/src/v3/apiClientManager/types.ts (1)
packages/core/src/v3/apiClient/index.ts (1)
ApiClientFutureFlags(130-132)
apps/webapp/app/services/realtime/types.ts (2)
packages/core/src/v3/runMetadata/manager.ts (1)
stream(320-326)packages/core/src/v3/apiClient/stream.ts (1)
stream(180-182)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (3)
apps/webapp/app/env.server.ts (1)
env(1219-1219)apps/webapp/app/services/realtime/types.ts (2)
StreamIngestor(2-17)StreamResponder(20-28)apps/webapp/app/services/realtime/s2realtimeStreams.server.ts (1)
S2RealtimeStreams(33-258)
packages/core/src/v3/apiClientManager/index.ts (2)
packages/core/src/v3/apiClientManager/types.ts (1)
ApiClientConfiguration(3-19)packages/core/src/v3/apiClient/index.ts (1)
ApiClient(151-1193)
apps/webapp/seed.mts (1)
apps/webapp/app/models/organization.server.ts (1)
createOrganization(20-77)
apps/webapp/app/services/realtime/s2realtimeStreams.server.ts (1)
apps/webapp/app/services/realtime/types.ts (2)
StreamResponder(20-28)StreamIngestor(2-17)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
packages/core/src/v3/runMetadata/types.ts (1)
StreamInstance(33-35)
packages/core/src/v3/runMetadata/s2MetadataStream.ts (2)
packages/core/src/v3/runMetadata/types.ts (1)
StreamInstance(33-35)packages/core/src/v3/runMetadata/manager.ts (1)
stream(320-326)
packages/trigger-sdk/src/v3/shared.ts (2)
packages/core/src/v3/apiClientManager-api.ts (1)
apiClientManager(5-5)packages/core/src/v3/apiClient/index.ts (1)
TriggerApiRequestOptions(115-118)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
getRealtimeStreamInstance(25-51)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2)
packages/core/src/logger.ts (3)
Logger(19-139)LogLevel(15-15)error(69-79)apps/webapp/app/services/realtime/types.ts (2)
StreamIngestor(2-17)StreamResponder(20-28)
packages/core/src/v3/apiClient/index.ts (4)
packages/core/src/v3/apiClientManager/index.ts (2)
ApiClientConfiguration(97-99)accessToken(37-45)packages/core/src/v3/apiClientManager/types.ts (1)
ApiClientConfiguration(3-19)packages/core/src/v3/apiClient/core.ts (3)
ZodFetchOptions(31-39)ApiRequestOptions(43-43)zodfetch(71-78)packages/core/src/v3/schemas/api.ts (2)
CreateStreamResponseBody(1289-1291)CreateStreamResponseBody(1292-1292)
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (3)
apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts (1)
getRealtimeStreamInstance(25-51)apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)
createLoaderApiRoute(97-287)apps/webapp/app/db.server.ts (1)
$replica(103-106)
packages/core/src/v3/runMetadata/manager.ts (4)
packages/core/src/v3/runMetadata/types.ts (2)
RunMetadataManager(20-31)StreamInstance(33-35)packages/core/src/v3/apiClient/index.ts (1)
headers(1181-1192)packages/core/src/v3/runMetadata/metadataStream.ts (1)
MetadataStream(26-433)packages/core/src/v3/runMetadata/s2MetadataStream.ts (1)
S2MetadataStream(53-273)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (23)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (actions)
🔇 Additional comments (40)
apps/webapp/app/services/realtimeClient.server.ts (1)
46-46: LGTM! Column addition aligns with schema updates.The addition of "realtimeStreams" to DEFAULT_ELECTRIC_COLUMNS properly extends the projection to include the new realtime streams data.
packages/core/src/v3/schemas/api.ts (2)
978-978: LGTM! Schema addition supports realtime streams tracking.The realtimeStreams field properly defaults to an empty array for backward compatibility with runs that don't have realtime streams.
1289-1292: LGTM! New schema for stream creation response.The CreateStreamResponseBody schema provides type-safe responses for the new createStream API surface.
packages/cli-v3/src/entryPoints/dev-run-worker.ts (1)
513-516: LGTM! Proper version initialization with backward compatibility.The streamsVersion initialization correctly defaults to "v1" for runs without explicit versioning, ensuring backward compatibility while supporting the new v2/S2 streaming paths.
apps/webapp/app/runEngine/services/triggerTask.server.ts (1)
350-350: LGTM! Version propagation through trigger flow.The realtimeStreamsVersion is correctly propagated from options to the engine trigger payload, enabling versioned realtime streaming for triggered runs.
packages/core/src/v3/apiClientManager/types.ts (1)
1-1: LGTM! Future flags enable progressive feature rollout.The addition of ApiClientFutureFlags to the configuration properly supports feature gating for the new v2 realtime streams functionality.
Also applies to: 18-18
apps/webapp/package.json (1)
18-18: LGTM! Migration to tsx for seed script execution.The switch from ts-node to tsx is a modern improvement for TypeScript execution, and the seed script path correctly points to the new seed.mts file.
Also applies to: 279-279
apps/webapp/seed.mts (2)
57-70: Verify hardcoded project references work across environments.The externalRef values are hardcoded for specific reference projects. Ensure these values are appropriate for all target environments (local dev, CI, etc.), or consider making them configurable via environment variables if different environments need different refs.
1-132: LGTM! Well-structured seed script with clear logging.The seed script is well-organized with:
- Proper error handling and cleanup
- Clear console output for tracking progress
- Idempotent operations (find or create pattern)
- Helpful summary at completion
turbo.json (1)
38-43: LGTM! Pipeline simplified to match new seed approach.The dependency change from
build:db:seedtobuildcorrectly reflects the removal of the intermediate build step and aligns with the tsx-based seed execution.packages/core/src/v3/apiClientManager/index.ts (1)
65-78: LGTM! Clean implementation of per-call configuration override.The new signature enables callers to provide custom configuration on a per-call basis while preserving backwards compatibility. The precedence logic correctly prioritizes the provided config over global defaults, and the ApiClient construction properly forwards all parameters including the new futureFlags.
internal-packages/database/prisma/migrations/20251020163612_add_realtime_streams_to_task_run/migration.sql (1)
1-2: LGTM! Migration syntax is correct.The migration properly adds a TEXT array column with a default empty array. This complements the realtimeStreamsVersion column added in the companion migration.
packages/core/src/v3/schemas/common.ts (1)
339-344: LGTM! Schema extension maintains backwards compatibility.The optional realtimeStreamsVersion field is correctly added to the TaskRunExecution schema using zod's .and() pattern, consistent with the adjacent traceContext field.
internal-packages/database/prisma/migrations/20251020121543_add_realtime_streams_version_to_task_run/migration.sql (1)
1-2: LGTM! Migration safely adds versioning column.The migration properly adds a NOT NULL column with a sensible default value of 'v1', ensuring existing rows are handled correctly.
internal-packages/run-engine/src/engine/types.ts (1)
151-151: LGTM! Type extension is straightforward.The optional realtimeStreamsVersion field is appropriately added to TriggerParams, maintaining backwards compatibility and aligning with the schema and database changes.
apps/webapp/app/models/organization.server.ts (1)
69-69: Verify: v3Enabled now always true for all deployment types.The previous logic conditionally disabled v3 for managed cloud environments (
!features.isManagedCloud), but now all organizations will have v3Enabled set to true. Please confirm this is intentional and that managed cloud environments are ready for this change.apps/webapp/app/v3/services/triggerTask.server.ts (1)
36-36: LGTM! Service options extended cleanly.The optional realtimeStreamsVersion field is appropriately added to TriggerTaskServiceOptions, enabling versioned streaming configuration to flow through the trigger service.
packages/core/src/v3/runMetadata/types.ts (1)
33-35: LGTM! Clean interface for stream lifecycle management.The StreamInstance interface provides a simple, focused contract for waiting on stream completion. The wait() method signature is idiomatic and the placement is appropriate.
internal-packages/run-engine/src/engine/index.ts (1)
392-393: Narrow TriggerParams.realtimeStreamsVersion to a unionPlumbed correctly. To prevent invalid values reaching the DB, type TriggerParams.realtimeStreamsVersion as "v1" | "s2" (or a shared enum type) and keep relying on Prisma default when undefined.
If not already done in internal-packages/run-engine/src/engine/types.ts, please update:
export type TriggerParams = { ... realtimeStreamsVersion?: "v1" | "s2"; };Also applies to: 473-474
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (1)
434-435: Review comment is incorrect—realtimeStreamsVersion is already defined in the type.TaskRunExecution.run already includes
realtimeStreamsVersion: z.string().optional()in its schema definition. BackwardsCompatibleTaskRunExecution.run extends TaskRunExecution["run"] via intersection (&), which preserves all properties from the base type. Therefore, realtimeStreamsVersion is already a valid, type-safe property at line 599—no type mismatch exists, and no type extension is needed.The assignment
realtimeStreamsVersion: updatedRun.realtimeStreamsVersion ?? undefinedwill compile without errors.Likely an incorrect or invalid review comment.
apps/webapp/app/services/realtime/types.ts (1)
25-27: Good: signal + Last-Event-ID support.Signature matches the resume flow and enables cancellation cleanly. LGTM.
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (1)
156-159: Good: HEAD-only guard.Clear 405 for non-HEAD improves semantics. LGTM.
packages/core/src/v3/apiClient/runStream.ts (1)
690-705: Stream key resolution logic looks solid.Prefers explicit metadata.$$streams then falls back to realtimeStreams. LGTM.
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (1)
43-57: SSE resume via Last-Event-ID: nice.Clean wiring of lastEventId and environment‑selected implementation. LGTM.
packages/core/src/v3/apiClient/index.ts (2)
1088-1110: createStream(): API surface LGTM.Returns version plus raw headers for downstream parsing; aligns with manager.ts. No blocking issues.
1174-1176: Future flag header plumbing is correct.Setting x-trigger-realtime-streams-version when unstable_v2RealtimeStreams is enabled matches the negotiation flow.
packages/trigger-sdk/src/v3/shared.ts (1)
568-569: Client config propagation LGTM.Passing requestOptions?.clientConfig into apiClientManager.clientOrThrow() is consistent across entry points.
Also applies to: 733-734, 898-899, 1065-1066, 1154-1155, 1214-1215, 1307-1308, 1387-1388
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (5)
1-21: LGTM! Clean type definitions and imports.The imports follow coding guidelines (using types over interfaces), and the StreamChunk union type provides good type safety for the two-stage pipeline. The options extension properly supports the new features.
23-30: LGTM! Proper initialization with sensible defaults.The constructor correctly initializes the logger and inactivity timeout with reasonable fallback values (60 seconds for timeout).
32-37: LGTM! Appropriate no-op for Redis implementation.This stub correctly implements the interface requirement. Redis-based streams don't need initialization headers (unlike S2 which returns access tokens).
39-247: LGTM! Robust streaming implementation with proper resumption and inactivity handling.The method correctly implements:
- Last-Event-ID based resumption for SSE clients
- Periodic ping mechanism to keep connections alive
- Inactivity-based auto-close to prevent resource leaks
- Two-stage transform pipeline with proper SSE formatting
- Graceful cleanup on abort/error with retry logic
The Redis stream ID is correctly used as the SSE event ID, enabling reliable resumption.
332-407: LGTM! Robust client-specific chunk lookup with proper pagination.The method correctly:
- Paginates backwards through the Redis stream using XREVRANGE
- Filters by clientId to find the last chunk for a specific client
- Handles legacy entries (END_SENTINEL, entries without chunkIndex)
- Uses proper Redis exclusive range syntax for cursor-based pagination
- Returns -1 on error/not-found (matches interface contract)
The previous review concern about skipping END_SENTINEL has been properly addressed.
apps/webapp/app/services/realtime/s2realtimeStreams.server.ts (8)
1-31: LGTM! Clean type definitions following coding guidelines.The types correctly model the S2 API surface and follow project guidelines (types over interfaces). Configuration options are well-documented.
33-62: LGTM! Sensible defaults and proper initialization.Constructor correctly initializes all properties with reasonable defaults:
- 60s wait for long polling
- 25s heartbeat for HTTP/2 keepalive
- 200ms flush interval and 10 max retries (matches PR objectives)
64-70: LGTM! Clean helper methods for stream naming.The methods correctly compose S2 stream names using the configured prefix and run hierarchy.
72-88: LGTM! Proper S2 token issuance for client-side writes.The method correctly generates scoped access tokens and returns them via response headers, enabling clients to write directly to S2. This aligns with the S2 architecture where clients write and the server only reads.
90-102: LGTM! Appropriate errors for unsupported server-side writes.These methods correctly throw for S2 streams, since the architecture has clients write directly to S2 (not through the server). The error messages clearly explain why.
106-176: LGTM! Solid S2 streaming implementation with proper SSE formatting.The method correctly:
- Resumes from lastEventId (parsed to S2 sequence number)
- Uses long polling with clamp=true to handle edge cases
- Formats records as SSE with proper id/data fields
- Implements heartbeat to keep HTTP/2 connections alive
- Handles abort signals and cleanup gracefully
The long-polling approach efficiently streams new data as it arrives.
180-210: LGTM! Secure token issuance with proper scoping.The method correctly generates scoped S2 access tokens with:
- Basin and stream prefix restrictions
- Limited operations (append, create-stream only)
- 24-hour expiration
- Proper error handling
The scoping ensures clients can only write to their designated streams.
212-249: LGTM! Robust S2 read implementation.The method correctly:
- Builds query parameters from options (with null checks)
- Encodes stream name for URL safety
- Sets appropriate headers (Authorization, Accept, S2-Format)
- Handles errors with full context
- Returns typed responses
The use of
S2-Format: rawis appropriate for streaming use cases.
| "@opentelemetry/sdk-trace-base": "2.0.1", | ||
| "@opentelemetry/sdk-trace-node": "2.0.1", | ||
| "@opentelemetry/semantic-conventions": "1.36.0", | ||
| "@s2-dev/streamstore": "^0.15.13", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify new dependency versions and security advisories.
Two new runtime dependencies have been added to support realtime streaming resilience:
- @s2-dev/streamstore@^0.15.13: Supports S2-backed metadata streaming (less common scoped package)
- p-limit@^6.2.0: Well-known concurrency limiter for managing retry/backoff load
Please verify these packages exist, are from trusted sources, and have no known security vulnerabilities before merge.
Also applies to: 192-192
🌐 Web query:
What are the latest versions and any security advisories for @s2-dev/streamstore and p-limit npm packages?
💡 Result:
Summary (as of Oct 21, 2025):
-
@s2-dev/streamstore — latest published version: 0.15.9. No public security advisories found in standard scanners / package pages. [1][2]
-
p-limit — latest published version on npm: 7.1.1. No direct/public security advisories reported for the package (Snyk/other scanners show no known direct vulnerabilities). [3][4]
Sources:
- @s2-dev/streamstore package listing / analysis (Socket) — shows v0.15.9 as latest.
- s2.dev SDK docs / package references for @s2-dev/streamstore.
- p-limit on npm (package page, v7.1.1).
- Snyk vulnerability page for p-limit (no direct vulnerabilities reported).
If you want, I can: (a) fetch the exact npmjs page for @s2-dev/streamstore (if available) and its publish date, or (b) run a focused advisory search (npm advisories, GitHub security alerts, OSS CVE databases) for either package.
@s2-dev/[email protected] does not exist—update to available version.
The latest published version of @s2-dev/streamstore is 0.15.9, but the PR specifies ^0.15.13 on line 184. This version does not exist and will cause installation failures.
Update to @s2-dev/streamstore@^0.15.9 (or verify the correct intended version if this is a pre-release).
p-limit@^6.2.0 on line 192 is acceptable; no direct vulnerabilities are reported.
🤖 Prompt for AI Agents
In packages/core/package.json around line 184, the dependency entry
"@s2-dev/streamstore": "^0.15.13" references a non-existent version; change it
to an available release such as "^0.15.9" (or the correct intended version if
this was meant to be a prerelease) so installs don't fail, then run npm/yarn
install or update lockfile to verify resolution.
| const { version, headers } = await this.apiClient.createStream(this.runId, target, key); | ||
|
|
||
| const parsedResponse = this.#parseCreateStreamResponse(version, headers); | ||
|
|
||
| const streamInstance = | ||
| parsedResponse.version === "v1" | ||
| ? new MetadataStream({ | ||
| key, | ||
| runId: this.runId, | ||
| source: $value, | ||
| baseUrl: this.streamsBaseUrl, | ||
| headers: this.apiClient.getHeaders(), | ||
| signal, | ||
| version, | ||
| target, | ||
| }) | ||
| : new S2MetadataStream({ | ||
| basin: parsedResponse.basin, | ||
| stream: key, | ||
| accessToken: parsedResponse.accessToken, | ||
| source: $value, | ||
| signal, | ||
| limiter: (await import("p-limit")).default, | ||
| }); | ||
|
|
||
| this.activeStreams.set(key, streamInstance); | ||
|
|
||
| // Clean up when stream completes | ||
| streamInstance.wait().finally(() => this.activeStreams.delete(key)); | ||
|
|
||
| return streamInstance; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate S2 options and update streamsVersion.
- Parsed flushIntervalMs/maxRetries are ignored; pass them into S2MetadataStream.
- streamsVersion is never set; reflect the negotiated version so callers can inspect it.
Apply:
- const streamInstance =
+ const streamInstance =
parsedResponse.version === "v1"
? new MetadataStream({
...
version,
target,
})
: new S2MetadataStream({
basin: parsedResponse.basin,
stream: key,
accessToken: parsedResponse.accessToken,
source: $value,
signal,
limiter: (await import("p-limit")).default,
+ flushIntervalMs: parsedResponse.flushIntervalMs,
+ maxRetries: parsedResponse.maxRetries,
});
+ // Reflect negotiated version
+ this.streamsVersion = parsedResponse.version;| // Flushing state | ||
| private pendingFlushes: Array<T> = []; | ||
| private flushInterval: NodeJS.Timeout | null = null; | ||
| private flushPromises: Promise<void>[] = []; | ||
| private limiter: LimitFunction; | ||
| private retryCount = 0; | ||
| private readonly baseDelayMs = 1000; | ||
| private readonly maxDelayMs = 30000; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent flushPromises growth and honor AbortSignal.
- flushPromises grows unbounded. Prune entries on settle.
- Respect options.signal to stop buffering and clear interval promptly.
Apply:
private flushPromises: Promise<void>[] = [];
@@
private startPeriodicFlush(): void {
- this.flushInterval = setInterval(() => {
- this.flush().catch(() => {
+ this.flushInterval = setInterval(() => {
+ if (this.options.signal?.aborted) return;
+ this.flush().catch(() => {
// Errors are already logged in flush()
});
}, this.flushIntervalMs);
}
@@
private async flush(): Promise<void> {
if (this.pendingFlushes.length === 0) {
return;
}
@@
- this.flushPromises.push(flushPromise);
+ this.flushPromises.push(flushPromise);
+ void flushPromise.finally(() => {
+ const i = this.flushPromises.indexOf(flushPromise);
+ if (i >= 0) this.flushPromises.splice(i, 1);
+ });Optionally stop buffering on abort:
private startBuffering(): void {
this.streamReader = this.serverStream.getReader();
this.bufferReaderTask = (async () => {
try {
let chunkCount = 0;
while (true) {
+ if (this.options.signal?.aborted) break;
const { done, value } = await this.streamReader!.read();
if (done) {
this.streamComplete = true;
break;
}
this.pendingFlushes.push(value);
chunkCount++;
}
} catch (error) {
throw error;
}
})();
}Also applies to: 69-70, 142-149, 200-205
🤖 Prompt for AI Agents
In packages/core/src/v3/runMetadata/s2MetadataStream.ts around lines 66 to 74
(and similarly at 69-70, 142-149, 200-205), flushPromises is allowed to grow
unbounded and the AbortSignal from options is not honored; fix by pruning each
flush promise from flushPromises when it settles (attach .finally to remove the
promise from the array), ensure any new flushes push their promise into
flushPromises and are cleaned up on completion, and wire options.signal (if
provided) to stop buffering immediately by clearing the flush interval, aborting
or skipping further pushes to pendingFlushes, and resolving/clearing
pendingFlushes appropriately so no timers or buffers remain after abort.
Summary
This PR implements graceful recovery from
ECONNRESETerrors when streaming data from task executors to the server. The solution introduces a robust retry mechanism with exponential backoff and chunk-level resumption to ensure reliable real-time data streaming even in the face of network disruptions.Problem
When tasks send streaming data to the server (e.g., logs, metadata), connection resets (
ECONNRESET) could cause data loss and stream failures. This was particularly problematic for:Solution
1. Ring Buffer Architecture (Client-side)
MetadataStreamthat keeps the last N chunks in memory (default: 1000)2. Chunk-based Resume Protocol
X-Resume-From-Chunkheader to resume streaming from a specific chunk index3. Retry Logic with Exponential Backoff
4. Server-side Chunk Tracking
RedisRealtimeStreams.ingestData()now stores chunk indices with stream datagetLastChunkIndex()method retrieves the highest chunk index from RedisX-Last-Chunk-Indexheader5. Graceful Error Handling
ECONNRESETduring ingestion and returns 500 statusFlow Diagram
sequenceDiagram participant Task as Task Executor participant Buffer as Ring Buffer participant Client as MetadataStream participant Server as Realtime API participant Redis as Redis Stream Note over Task,Buffer: Background Buffering Task->>Buffer: Stream data chunks loop Continuous buffering Buffer->>Buffer: Store chunk N in ring buffer end Note over Client,Server: Initial Streaming (Attempt 1) Client->>Buffer: Read chunk 0 Client->>Server: POST chunk 0 (X-Resume-From-Chunk: 0) Server->>Redis: XADD chunk 0 (index: 0) Client->>Buffer: Read chunk 1 Client->>Server: POST chunk 1 Server->>Redis: XADD chunk 1 (index: 1) Client->>Buffer: Read chunk 2 Client->>Server: POST chunk 2 Server--xClient: ECONNRESET (connection lost) Note over Client,Server: Recovery Process Client->>Client: Detect ECONNRESET (retryable) Client->>Client: Calculate backoff delay Client->>Client: Wait (exponential backoff + jitter) Client->>Server: HEAD request to query progress Server->>Redis: XREVRANGE (get last chunk) Redis-->>Server: Last chunk: index 1 Server-->>Client: X-Last-Chunk-Index: 1 Note over Client,Server: Resume Streaming (Attempt 2) Client->>Buffer: Find chunks from index 2 Client->>Server: POST chunks 2,3,4... (X-Resume-From-Chunk: 2) loop Resume transmission Server->>Redis: XADD chunk N (index: N) end Client->>Server: POST END_SENTINEL Server->>Redis: XADD END_SENTINEL + set TTL Server-->>Client: 200 OK Note over Server,Redis: Consumer reads stream Server->>Redis: XREAD from stream Redis-->>Server: All chunks including resumed onesKey Changes
Client (
packages/core/src/v3/runMetadata/metadataStream.ts)Server (
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts)API Routes
X-Resume-From-Chunkheader (lines 19-27 in both routes)$targetroute)Testing Infrastructure
Benefits
Testing
🤖 Generated with Claude Code