From 6ab8d7da31c42260a31b28f17a4c98b00970e6ce Mon Sep 17 00:00:00 2001 From: 7HR4IZ3 Date: Tue, 22 Oct 2024 23:01:58 -0700 Subject: [PATCH] feat: export stream download via one time download link --- packages/backend/src/api/v1/auth/utils.ts | 1 + packages/backend/src/api/v1/runs/export.ts | 6 ++++ packages/backend/src/api/v1/runs/index.ts | 36 +++++++++++++++++++--- packages/frontend/package.json | 3 +- packages/frontend/utils/fetcher.ts | 10 +++--- 5 files changed, 43 insertions(+), 13 deletions(-) diff --git a/packages/backend/src/api/v1/auth/utils.ts b/packages/backend/src/api/v1/auth/utils.ts index 256a527b..a25ba448 100644 --- a/packages/backend/src/api/v1/auth/utils.ts +++ b/packages/backend/src/api/v1/auth/utils.ts @@ -87,6 +87,7 @@ const publicRoutes = [ `/v1/runs/ingest`, new RegExp(`/v1/runs/.+/public`), // public run data new RegExp(`/v1/runs/.+/feedback`), // getFeedback in SDKs + new RegExp(`/v1/runs/download/.+`), // run exports `/v1/template_versions/latest`, `/v1/template-versions/latest`, "/v1/users/verify-email", diff --git a/packages/backend/src/api/v1/runs/export.ts b/packages/backend/src/api/v1/runs/export.ts index 5c971c1c..480dd6a2 100644 --- a/packages/backend/src/api/v1/runs/export.ts +++ b/packages/backend/src/api/v1/runs/export.ts @@ -107,6 +107,10 @@ function getTraceChildren(run: Run, runs: Run[]): TraceRun { }; } +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + export async function fileExport( { ctx, sql, cursor, formatRun, projectId }: ExportType, exportFormat: "csv" | "ojsonl" | "jsonl", @@ -121,6 +125,8 @@ export async function fileExport( const stream = Readable.from({ async *[Symbol.asyncIterator]() { for await (const [row] of cursor) { + // TODO: Remove this. Simulate a large dataset + await sleep(25); yield parser.parse(formatRun(row)); } }, diff --git a/packages/backend/src/api/v1/runs/index.ts b/packages/backend/src/api/v1/runs/index.ts index 9d20a045..be2faa73 100644 --- a/packages/backend/src/api/v1/runs/index.ts +++ b/packages/backend/src/api/v1/runs/index.ts @@ -10,6 +10,10 @@ import { checkAccess } from "@/src/utils/authorization"; import { jsonrepair } from "jsonrepair"; import { z } from "zod"; +import crypto from "crypto"; + +const EXPORTERS = new Map() + /** * @openapi * components: @@ -546,12 +550,13 @@ runs.get("/", async (ctx: Context) => { `; if (exportFormat) { + const token = crypto.randomBytes(32).toString('hex'); const cursor = query.cursor(); - return fileExport( - { ctx, sql, cursor, formatRun, projectId }, - exportFormat, - exportType, - ); + + EXPORTERS.set(token, { cursor, projectId, exportFormat, exportType }); + + ctx.body = { token }; + return; } const rows = await query; @@ -1075,4 +1080,25 @@ runs.delete("/:id", checkAccess("logs", "delete"), async (ctx: Context) => { ctx.status = 200; }); +runs.get("/download/:token", async (ctx) => { + const { token } = z + .object({ token: z.string() }) + .parse(ctx.params); + + const exporter = EXPORTERS.get(token); + if (!exporter) { + ctx.throw(404, "Export not found"); + } + + // One time use + EXPORTERS.delete(token); + + const { cursor, projectId, exportFormat, exportType } = exporter; + return fileExport( + { ctx, sql, cursor, formatRun, projectId }, + exportFormat, + exportType, + ); +}); + export default runs; diff --git a/packages/frontend/package.json b/packages/frontend/package.json index 451842aa..c6d8f665 100644 --- a/packages/frontend/package.json +++ b/packages/frontend/package.json @@ -38,8 +38,7 @@ "recharts": "^2.12.7", "shared": "*", "swr": "^2.2.4", - "zod": "^3.23.8", - "streamsaver": "^2.0.6" + "zod": "^3.23.8" }, "devDependencies": { "@types/streamsaver": "^2.0.5", diff --git a/packages/frontend/utils/fetcher.ts b/packages/frontend/utils/fetcher.ts index 2d4e1d45..d0a88c60 100644 --- a/packages/frontend/utils/fetcher.ts +++ b/packages/frontend/utils/fetcher.ts @@ -44,12 +44,10 @@ async function getFile(path) { throw new Error(message); } - const { createWriteStream } = await import("streamsaver"); - const contentType = res.headers.get("Content-Type") as string; - const fileExtension = contentType.split("/")[1]; - - const fileStream = createWriteStream(`export.${fileExtension}`); - await res.body?.pipeTo(fileStream); + const data = await res.json(); + if (data.token) { + window.location.assign(buildUrl(`/runs/download/${data.token}`)); + } } async function getStream(url, args, onChunk) {