Skip to content

Commit

Permalink
Merge branch 'llm-1942-implement-streaming-from-database-for-csvjsonl…
Browse files Browse the repository at this point in the history
…-exports' of github.com:lunary-ai/lunary into llm-1942-implement-streaming-from-database-for-csvjsonl-exports
  • Loading branch information
hughcrt committed Nov 6, 2024
2 parents cf14ff8 + 6ab8d7d commit 5454ea8
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 13 deletions.
1 change: 1 addition & 0 deletions packages/backend/src/api/v1/auth/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const publicRoutes = [
`/v1/runs/ingest`,
new RegExp(`/v1/runs/.+/public`), // public run data
new RegExp(`/v1/runs/.+/feedback`), // getFeedback in SDKs
new RegExp(`/v1/runs/download/.+`), // run exports
`/v1/template_versions/latest`,
`/v1/template-versions/latest`,
"/v1/users/verify-email",
Expand Down
6 changes: 6 additions & 0 deletions packages/backend/src/api/v1/runs/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ function getTraceChildren(run: Run, runs: Run[]): TraceRun {
};
}

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

export async function fileExport(
{ ctx, sql, cursor, formatRun, projectId }: ExportType,
exportFormat: "csv" | "ojsonl" | "jsonl",
Expand All @@ -121,6 +125,8 @@ export async function fileExport(
const stream = Readable.from({
async *[Symbol.asyncIterator]() {
for await (const [row] of cursor) {
// TODO: Remove this. Simulate a large dataset
await sleep(25);
yield parser.parse(formatRun(row));
}
},
Expand Down
36 changes: 31 additions & 5 deletions packages/backend/src/api/v1/runs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import { checkAccess } from "@/src/utils/authorization";
import { jsonrepair } from "jsonrepair";
import { z } from "zod";

import crypto from "crypto";

const EXPORTERS = new Map()

/**
* @openapi
* components:
Expand Down Expand Up @@ -546,12 +550,13 @@ runs.get("/", async (ctx: Context) => {
`;

if (exportFormat) {
const token = crypto.randomBytes(32).toString('hex');
const cursor = query.cursor();
return fileExport(
{ ctx, sql, cursor, formatRun, projectId },
exportFormat,
exportType,
);

EXPORTERS.set(token, { cursor, projectId, exportFormat, exportType });

ctx.body = { token };
return;
}

const rows = await query;
Expand Down Expand Up @@ -1075,4 +1080,25 @@ runs.delete("/:id", checkAccess("logs", "delete"), async (ctx: Context) => {
ctx.status = 200;
});

runs.get("/download/:token", async (ctx) => {
const { token } = z
.object({ token: z.string() })
.parse(ctx.params);

const exporter = EXPORTERS.get(token);
if (!exporter) {
ctx.throw(404, "Export not found");
}

// One time use
EXPORTERS.delete(token);

const { cursor, projectId, exportFormat, exportType } = exporter;
return fileExport(
{ ctx, sql, cursor, formatRun, projectId },
exportFormat,
exportType,
);
});

export default runs;
3 changes: 1 addition & 2 deletions packages/frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
"shared": "*",
"streamsaver": "^2.0.6",
"swr": "^2.2.4",
"zod": "^3.23.8",
"streamsaver": "^2.0.6"
"zod": "^3.23.8"
},
"devDependencies": {
"@types/streamsaver": "^2.0.5",
Expand Down
10 changes: 4 additions & 6 deletions packages/frontend/utils/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,10 @@ async function getFile(path) {
throw new Error(message);
}

const { createWriteStream } = await import("streamsaver");
const contentType = res.headers.get("Content-Type") as string;
const fileExtension = contentType.split("/")[1];

const fileStream = createWriteStream(`export.${fileExtension}`);
await res.body?.pipeTo(fileStream);
const data = await res.json();
if (data.token) {
window.location.assign(buildUrl(`/runs/download/${data.token}`));
}
}

async function getStream(url, args, onChunk) {
Expand Down

0 comments on commit 5454ea8

Please sign in to comment.