diff --git a/docs/cli/me-memory.md b/docs/cli/me-memory.md index 14a26a2..26790b5 100644 --- a/docs/cli/me-memory.md +++ b/docs/cli/me-memory.md @@ -243,6 +243,12 @@ Skipped memories do not contribute to the exit code; only parse and engine error `--dry-run` validates parsing only; it does not predict id collisions with already-imported memories. Run with `--verbose` after a real import to see the skipped ids. +### Chunking and partial failures + +Large imports are sliced into multiple `batchCreate` requests under the hood to fit under the server's request-body limit. Each chunk is sent sequentially. If a chunk fails (network error, server error), siblings are not affected -- the successful chunks still land. The failed chunk's items are reported as `failed`, and the chunk-level error message appears in the `errors` array (sourced as `chunk N (K items)`). + +This means partial failures are now possible: `imported > 0` and `failed > 0` can both be true in the same run. Re-running the import with the same input will pick up where the previous run left off (already-inserted ids are skipped via `ON CONFLICT DO NOTHING`, missing ids are inserted). + --- ## me memory export diff --git a/docs/cli/me-pack.md b/docs/cli/me-pack.md index 025424f..ca4a42f 100644 --- a/docs/cli/me-pack.md +++ b/docs/cli/me-pack.md @@ -88,6 +88,15 @@ JSON mode (`--format json`) returns: | `skippedIdempotent` | Skipped because already present at this version. | | `skippedConflict` | Skipped because the id is held by something not from this pack/version. | | `skippedConflictIds` | Array of conflicting ids (only present when `skippedConflict > 0`). | +| `failed` | Memories in chunks that errored before reaching the server. | +| `failedIds` | Array of ids in failed chunks (only present when `failed > 0`). | +| `errors` | Per-chunk error detail: `{ chunkIndex, itemCount, ids, error }` (only present when `failed > 0`). | + +### Chunking and partial failures + +Large packs are sliced into multiple `batchCreate` requests under the hood to fit under the server's request-body limit. Chunks are sent sequentially. If a chunk fails (network error, server error), siblings are not affected -- the successful chunks still land. The failed memories are reported under `failed` / `failedIds` / `errors`. + +A partial install is crash-safe: re-running `me pack install` with the same pack file picks up where the previous run left off. The step-3 search finds the inserted memories as already-at-this-version (idempotent), and the missing ids are filled in. The text output adds a `└ N failed (chunk error — re-run to retry)` line and a `clack.log.error` block with per-chunk error detail when this happens. ### Example diff --git a/docs/mcp/me_memory_import.md b/docs/mcp/me_memory_import.md index 1c24b40..89b4ce2 100644 --- a/docs/mcp/me_memory_import.md +++ b/docs/mcp/me_memory_import.md @@ -28,13 +28,15 @@ See [File Formats](../formats.md) for full schema documentation, examples, and f { "imported": 2, "skipped": 1, + "failed": 0, "ids": [ "0194a000-0001-7000-8000-000000000001", "0194a000-0002-7000-8000-000000000002" ], "skippedIds": [ "0194a000-0003-7000-8000-000000000003" - ] + ], + "errors": [] } ``` @@ -42,11 +44,19 @@ See [File Formats](../formats.md) for full schema documentation, examples, and f |-------|------|-------------| | `imported` | `number` | Number of memories successfully imported on this call. | | `skipped` | `number` | Number of memories whose explicit `id` already existed in the engine. Always present (may be `0`). | +| `failed` | `number` | Number of memories in chunks that errored before reaching the server. Always present (may be `0`). | | `ids` | `string[]` | UUIDs of the memories actually inserted on this call. | | `skippedIds` | `string[]` | The explicit ids that were skipped because they already existed. Always present (may be empty). Inspect any of these with `me_memory_get` to see what's there. | +| `errors` | `Array<{ chunkIndex, itemCount, ids, error }>` | One entry per failed chunk. Always present (may be empty). | The tool is idempotent for memories with explicit ids: re-calling with the same arguments leaves the engine in the same state, with all previously-imported ids appearing in `skippedIds` instead of `ids`. Memories submitted without an explicit `id` get a server-generated UUIDv7 and never collide. +### Chunking and partial failures + +Large imports are sliced into multiple `batchCreate` requests under the hood to fit under the server's request-body limit. Chunks are sent sequentially. If a chunk fails, siblings are not affected -- the successful chunks still land. The failed chunk's items are reported under `failed`/`errors`, and re-calling with the same arguments will pick up where the previous call left off. + +The tool throws only when **every** chunk fails (total failure). For mixed outcomes it returns the partial-success detail above so the caller can decide how to react. + ## Examples ### Import from file (preferred) diff --git a/packages/cli/chunk.test.ts b/packages/cli/chunk.test.ts new file mode 100644 index 0000000..fcfba94 --- /dev/null +++ b/packages/cli/chunk.test.ts @@ -0,0 +1,215 @@ +/** + * Tests for the byte-aware chunker in `chunk.ts`. + */ +import { describe, expect, test } from "bun:test"; +import type { MemoryCreateParams } from "@memory.build/protocol/engine"; +import { + approxMemoryBytes, + type BatchCreateClient, + batchCreateChunked, + chunkByBytes, +} from "./chunk.ts"; + +describe("chunkByBytes", () => { + // Cheap size: each character is 1 byte. + const sizeAsLength = (s: string) => s.length; + + test("returns a single chunk when everything fits the budget", () => { + const chunks = Array.from( + chunkByBytes(["aa", "bb", "cc"], 100, 1000, sizeAsLength), + ); + expect(chunks).toEqual([["aa", "bb", "cc"]]); + }); + + test("cuts a new chunk when adding the next item would overflow the byte budget", () => { + const chunks = Array.from( + chunkByBytes(["aaaa", "bbbb", "cccc"], 6, 1000, sizeAsLength), + ); + // First two items: 4 + 4 = 8 > 6, so cut after 'aaaa'. Next: 4 + 4 = 8 > 6 + // again, cut after 'bbbb'. Then 'cccc' alone. + expect(chunks).toEqual([["aaaa"], ["bbbb"], ["cccc"]]); + }); + + test("packs as many items as fit before cutting", () => { + const chunks = Array.from( + chunkByBytes(["aa", "bb", "cc", "dd"], 5, 1000, sizeAsLength), + ); + // Running total: 2, 4 (still ≤5), 6 > 5 → cut. Next chunk starts at 'cc'. + expect(chunks).toEqual([ + ["aa", "bb"], + ["cc", "dd"], + ]); + }); + + test("cuts a new chunk when count cap is hit before byte budget", () => { + const chunks = Array.from( + chunkByBytes(["a", "b", "c", "d", "e"], 999, 2, sizeAsLength), + ); + expect(chunks).toEqual([["a", "b"], ["c", "d"], ["e"]]); + }); + + test("yields an oversized item alone instead of dropping it", () => { + const big = "x".repeat(100); + const chunks = Array.from( + chunkByBytes(["aa", big, "bb"], 10, 1000, sizeAsLength), + ); + // 'aa' fits, then 'big' would overflow → cut, big gets its own chunk + // (even though it exceeds the budget on its own), then 'bb' starts a new chunk. + expect(chunks).toEqual([["aa"], [big], ["bb"]]); + }); + + test("returns no chunks for empty input", () => { + const chunks = Array.from(chunkByBytes([], 100, 1000, sizeAsLength)); + expect(chunks).toEqual([]); + }); +}); + +describe("approxMemoryBytes", () => { + test("scales with content length", () => { + const small = approxMemoryBytes({ content: "hi", tree: "t" }); + const large = approxMemoryBytes({ + content: "x".repeat(10_000), + tree: "t", + }); + expect(large).toBeGreaterThan(small + 9_000); + }); + + test("includes meta and id contribution", () => { + const a = approxMemoryBytes({ content: "x", tree: "t" }); + const b = approxMemoryBytes({ + id: "00000000-0000-7000-8000-000000000001", + content: "x", + tree: "t", + meta: { source_session_id: "abc", source_message_id: "def" }, + }); + expect(b).toBeGreaterThan(a); + }); + + test("counts UTF-8 bytes, not UTF-16 code units, for non-ASCII content", () => { + // "abc" and "日本語" both have JS .length === 3, but the CJK string is + // 9 UTF-8 bytes (3 bytes per char) — so the wire-size estimate must + // differ. A String.length-based implementation would return identical + // values here, missing roughly 6 bytes of real wire weight per CJK + // memory and silently shrinking the headroom under the 1 MiB cap. + const ascii = approxMemoryBytes({ content: "abc", tree: "t" }); + const cjk = approxMemoryBytes({ content: "日本語", tree: "t" }); + expect(cjk).toBeGreaterThan(ascii + 5); + }); +}); + +describe("batchCreateChunked", () => { + /** + * Build a tiny memory whose serialized size is just `bytes` of "x" + * content. Lets each test control exactly how many chunks the byte + * budget produces without depending on the 768 KiB default. + */ + const mem = (id: string, contentBytes = 1): MemoryCreateParams => ({ + id, + content: "x".repeat(contentBytes), + tree: "t", + }); + + /** Minimal stub client; the test supplies the per-call behavior. */ + const stubClient = ( + handler: (memories: MemoryCreateParams[]) => Promise<{ ids: string[] }>, + ): BatchCreateClient => ({ + memory: { batchCreate: ({ memories }) => handler(memories) }, + }); + + test("single chunk, all succeed", async () => { + const calls: number[] = []; + const client = stubClient(async (memories) => { + calls.push(memories.length); + return { ids: memories.map((m) => m.id ?? "auto") }; + }); + const result = await batchCreateChunked(client, [mem("a"), mem("b")]); + expect(result.insertedIds).toEqual(["a", "b"]); + expect(result.failedIds).toEqual([]); + expect(result.errors).toEqual([]); + expect(calls).toEqual([2]); // single batchCreate call + }); + + test("two chunks succeed, insertedIds accumulate across chunks", async () => { + // Force two chunks via a tight byte budget by using big content. We + // can't override the 768 KiB default through the public API, so use + // many small memories and rely on the count cap... actually easier: + // use one big enough that two would overflow. + const big = mem("big", 700_000); + const small = mem("small", 10); + const client = stubClient(async (memories) => ({ + ids: memories.map((m) => m.id ?? "auto"), + })); + const result = await batchCreateChunked(client, [big, small]); + // Both items land; we don't assert chunk boundaries here, only that + // ids are accumulated correctly across however many chunks fired. + expect(result.insertedIds.sort()).toEqual(["big", "small"]); + expect(result.failedIds).toEqual([]); + expect(result.errors).toEqual([]); + }); + + test("second chunk fails: insertedIds from first only, failedIds from second", async () => { + const big1 = mem("a", 700_000); + const big2 = mem("b", 700_000); + let call = 0; + const client = stubClient(async (memories) => { + call++; + if (call === 2) throw new Error("server boom"); + return { ids: memories.map((m) => m.id ?? "auto") }; + }); + const result = await batchCreateChunked(client, [big1, big2]); + expect(result.insertedIds).toEqual(["a"]); + expect(result.failedIds).toEqual(["b"]); + expect(result.errors).toHaveLength(1); + expect(result.errors[0]).toMatchObject({ + chunkIndex: 1, + itemCount: 1, + ids: ["b"], + error: "server boom", + }); + }); + + test("all chunks fail: insertedIds empty, failedIds covers all explicit ids", async () => { + const big1 = mem("a", 700_000); + const big2 = mem("b", 700_000); + const client = stubClient(async () => { + throw new Error("network down"); + }); + const result = await batchCreateChunked(client, [big1, big2]); + expect(result.insertedIds).toEqual([]); + expect(result.failedIds.sort()).toEqual(["a", "b"]); + expect(result.errors).toHaveLength(2); + expect(result.errors[0]?.chunkIndex).toBe(0); + expect(result.errors[1]?.chunkIndex).toBe(1); + }); + + test("server returns shorter ids than requested (simulating ON CONFLICT)", async () => { + // Mimics post-#64 server behavior: caller submits 3 memories, server + // inserts 2 (one was a duplicate id, dropped by ON CONFLICT). The + // helper should faithfully report the 2 inserted; classifying the + // missing one as "skipped" is the caller's job. + const client = stubClient(async (memories) => ({ + ids: memories.map((m) => m.id ?? "auto").filter((id) => id !== "dup"), // server "drops" the dup id + })); + const result = await batchCreateChunked(client, [ + mem("a"), + mem("dup"), + mem("b"), + ]); + expect(result.insertedIds).toEqual(["a", "b"]); + expect(result.failedIds).toEqual([]); // no chunk failed + expect(result.errors).toEqual([]); + }); + + test("empty input never calls the server", async () => { + let calls = 0; + const client = stubClient(async () => { + calls++; + return { ids: [] }; + }); + const result = await batchCreateChunked(client, []); + expect(result.insertedIds).toEqual([]); + expect(result.failedIds).toEqual([]); + expect(result.errors).toEqual([]); + expect(calls).toBe(0); + }); +}); diff --git a/packages/cli/chunk.ts b/packages/cli/chunk.ts new file mode 100644 index 0000000..e05dc32 --- /dev/null +++ b/packages/cli/chunk.ts @@ -0,0 +1,191 @@ +/** + * Byte-aware chunker for `memory.batchCreate` requests. + * + * Callers (the agent-session importer, `me memory import`, the MCP import + * tool, `me pack install`) need to slice large insert sets into chunks + * small enough to fit under the server's request-body limit. A count-only + * cap is not enough: a single assistant turn with a large code block or + * tool result routinely exceeds 1 KB on its own, so a 1000-item chunk can + * easily blow past the server's 1 MiB cap and get rejected with HTTP 413. + * + * This module provides the generic plumbing (`chunkByBytes`) plus the + * importer-shaped defaults (`BATCH_CREATE_BYTES_BUDGET`, `BATCH_CREATE_CHUNK`, + * `approxMemoryBytes`) and a one-line wrapper (`chunkMemoriesForBatchCreate`) + * that callers should reach for unless they need a custom budget. + */ + +import type { MemoryCreateParams } from "@memory.build/protocol/engine"; + +/** + * Hard cap on memories per `memory.batchCreate` call. Matches the protocol + * limit; sessions or imports with more than this get split into chunks. + */ +export const BATCH_CREATE_CHUNK = 1000; + +/** + * Soft byte budget per `memory.batchCreate` request body, in UTF-8 bytes. + * + * The server caps request bodies at 1 MiB by default + * (`packages/server/middleware/size-limit.ts`). With a count-only cap of + * 1000, a single chunk of moderately-sized assistant messages routinely + * exceeds 1 MiB and the request is rejected with HTTP 413 — taking the + * entire chunk down with it. We instead cut chunks early when their + * estimated UTF-8 wire size approaches a budget that leaves room for the + * JSON-RPC envelope plus headers. + * + * 768 KiB leaves ~256 KiB of headroom under the 1 MiB default server limit. + * A single memory larger than the budget still gets sent in its own + * singleton chunk; if the server rejects it, the per-chunk catch records + * the failure without affecting siblings. + */ +export const BATCH_CREATE_BYTES_BUDGET = 768 * 1024; + +/** + * Approximate the UTF-8 wire size of a single `MemoryCreateParams` when + * it lands inside a JSON-RPC `memory.batchCreate` request. Accurate + * enough for chunking decisions; we don't model the envelope exactly. + * + * Uses `Buffer.byteLength(_, "utf8")` rather than `String.prototype.length` + * (which counts UTF-16 code units) so non-ASCII content — CJK code blocks, + * emoji, accented Latin — is sized at the byte count the server actually + * sees on the wire, not the JS character count. For ASCII-only content + * the two are identical; for CJK the byte count is ~3× the char count, + * and for supplementary-plane emoji 2× (4 bytes vs 2 surrogate units). + */ +export function approxMemoryBytes(m: MemoryCreateParams): number { + return Buffer.byteLength(JSON.stringify(m), "utf8"); +} + +/** + * Split `items` into chunks where each chunk's summed `size(item)` stays + * under `byteBudget`, capped at `countCap` items per chunk. An item whose + * own size exceeds the budget gets its own singleton chunk so the caller + * can still attempt it (and fail loudly server-side, rather than silently + * dropping it client-side). + * + * Exported for unit testing and direct use by callers that need a custom + * budget. Most callers should use `chunkMemoriesForBatchCreate` instead. + */ +export function* chunkByBytes( + items: T[], + byteBudget: number, + countCap: number, + size: (item: T) => number, +): Generator { + let chunk: T[] = []; + let bytes = 0; + for (const item of items) { + const itemBytes = size(item); + const wouldOverflow = chunk.length > 0 && bytes + itemBytes > byteBudget; + const atCountCap = chunk.length >= countCap; + if (wouldOverflow || atCountCap) { + yield chunk; + chunk = []; + bytes = 0; + } + chunk.push(item); + bytes += itemBytes; + } + if (chunk.length > 0) yield chunk; +} + +/** + * Convenience wrapper: chunk a `MemoryCreateParams[]` for `batchCreate` + * using the importer-shaped defaults — 1 MiB-aware byte budget, 1000-item + * count cap, JSON-stringify size estimate. Use this unless you have a + * specific reason to override the defaults. + */ +export function* chunkMemoriesForBatchCreate( + items: MemoryCreateParams[], +): Generator { + yield* chunkByBytes( + items, + BATCH_CREATE_BYTES_BUDGET, + BATCH_CREATE_CHUNK, + approxMemoryBytes, + ); +} + +/** + * Minimal client shape `batchCreateChunked` needs. Structurally typed so + * callers can pass an `EngineClient` or a stub in tests without coupling + * this module to the full client surface. + */ +export interface BatchCreateClient { + memory: { + batchCreate: (params: { + memories: MemoryCreateParams[]; + }) => Promise<{ ids: string[] }>; + }; +} + +/** Result of a chunked `batchCreate` run. */ +export interface BatchCreateChunkedResult { + /** Ids the server confirmed inserted (across all successful chunks). */ + insertedIds: string[]; + /** + * Explicit ids submitted in chunks that errored, flattened across all + * failed chunks for callers that just need a set of "ids to exclude + * from skip classification." For per-chunk error attribution use + * `errors[].ids` instead. + * + * These were never processed by the server, so they are neither + * inserted nor skipped. + */ + failedIds: string[]; + /** One entry per failed chunk. */ + errors: Array<{ + /** 0-based index of the failed chunk in submission order. */ + chunkIndex: number; + /** Total items in the chunk (including those without explicit ids). */ + itemCount: number; + /** Explicit ids in this chunk (subset of `itemCount`). */ + ids: string[]; + error: string; + }>; +} + +/** + * Run `client.memory.batchCreate` over `memories`, automatically slicing + * the input into chunks that fit under the server's request-body limit. + * + * Chunks are sent sequentially. A failed chunk is recorded once in + * `errors` and its explicit ids are added to `failedIds`; it does not + * abort siblings. Successful chunks contribute to `insertedIds`. + * + * Note: the returned `insertedIds` may be shorter than the number of + * inputs in successful chunks because the server uses + * `ON CONFLICT (id) DO NOTHING`. Use `computeSkippedIds` (or, for packs, + * `classifySkips` with `failedIds`) to classify the missing ids. + */ +export async function batchCreateChunked( + client: BatchCreateClient, + memories: MemoryCreateParams[], +): Promise { + const insertedIds: string[] = []; + const failedIds: string[] = []; + const errors: BatchCreateChunkedResult["errors"] = []; + let chunkIndex = 0; + + for (const chunk of chunkMemoriesForBatchCreate(memories)) { + try { + const { ids } = await client.memory.batchCreate({ memories: chunk }); + insertedIds.push(...ids); + } catch (error) { + const msg = error instanceof Error ? error.message : String(error); + const ids = chunk + .map((p) => p.id) + .filter((x): x is string => typeof x === "string"); + failedIds.push(...ids); + errors.push({ + chunkIndex, + itemCount: chunk.length, + ids, + error: msg, + }); + } + chunkIndex++; + } + + return { insertedIds, failedIds, errors }; +} diff --git a/packages/cli/commands/memory-import.ts b/packages/cli/commands/memory-import.ts index 902a8b7..4b1f935 100644 --- a/packages/cli/commands/memory-import.ts +++ b/packages/cli/commands/memory-import.ts @@ -10,6 +10,7 @@ import { readFile } from "node:fs/promises"; import { resolve } from "node:path"; import * as clack from "@clack/prompts"; import { Command } from "commander"; +import { batchCreateChunked } from "../chunk.ts"; import { createClient } from "../client.ts"; import { resolveCredentials } from "../credentials.ts"; import { getOutputFormat, output } from "../output.ts"; @@ -268,32 +269,46 @@ export function createMemoryImportCommand(): Command { let skippedIds: string[] = []; - try { - const createParams = allMemories.map(({ memory: mem }) => ({ - content: mem.content, - ...(mem.id ? { id: mem.id } : {}), - ...(mem.meta ? { meta: mem.meta } : {}), - ...(mem.tree ? { tree: mem.tree } : {}), - ...(mem.temporal ? { temporal: mem.temporal } : {}), - })); + const createParams = allMemories.map(({ memory: mem }) => ({ + content: mem.content, + ...(mem.id ? { id: mem.id } : {}), + ...(mem.meta ? { meta: mem.meta } : {}), + ...(mem.tree ? { tree: mem.tree } : {}), + ...(mem.temporal ? { temporal: mem.temporal } : {}), + })); - const explicitIds = createParams - .map((p) => p.id) - .filter((id): id is string => typeof id === "string"); + const explicitIds = createParams + .map((p) => p.id) + .filter((id): id is string => typeof id === "string"); - const response = await engine.memory.batchCreate({ - memories: createParams, - }); + // Chunked batch create — large imports are sliced under the + // server's request-body limit, and a single failed chunk doesn't + // take down the rest of the import. + const { + insertedIds, + failedIds, + errors: chunkErrors, + } = await batchCreateChunked(engine, createParams); - result.imported = response.ids.length; - result.ids = response.ids; - skippedIds = computeSkippedIds(explicitIds, response.ids); - } catch (error) { - const msg = error instanceof Error ? error.message : String(error); - result.errors.push({ source: "server", error: msg }); - result.failed = allMemories.length; + result.imported = insertedIds.length; + result.ids = insertedIds; + result.failed = failedIds.length; + for (const e of chunkErrors) { + result.errors.push({ + source: `chunk ${e.chunkIndex} (${e.itemCount} items)`, + error: e.error, + }); } + // Skipped = explicit ids requested but neither inserted nor in a + // failed chunk. Failed-chunk ids never reached the server, so they + // are not "skipped due to id collision" — they're a separate class + // already accounted for in `result.failed`. + const failedSet = new Set(failedIds); + skippedIds = computeSkippedIds(explicitIds, insertedIds).filter( + (id) => !failedSet.has(id), + ); + // Output results const skipped = skippedIds.length; output( diff --git a/packages/cli/commands/pack.test.ts b/packages/cli/commands/pack.test.ts index 79867ff..cf92aa9 100644 --- a/packages/cli/commands/pack.test.ts +++ b/packages/cli/commands/pack.test.ts @@ -137,4 +137,38 @@ describe("classifySkips", () => { expect(result.idempotent).toEqual([]); expect(result.conflict).toEqual([]); }); + + test("excludes failedIds from classification (failed != skipped)", () => { + // Chunk containing "b" errored — "b" never reached the server, so it + // must not be counted as either idempotent or conflict. Without the + // failedIds parameter it would have been mis-classified as conflict + // (no existing row → looks like a non-pack id collision). + const result = classifySkips({ + requestedIds: ["a", "b", "c"], + insertedIds: ["a"], + failedIds: ["b"], + existing: [{ id: "c", meta: { pack: { name: "foo", version: "1" } } }], + packName: "foo", + packVersion: "1", + }); + expect(result.idempotent).toEqual(["c"]); + expect(result.conflict).toEqual([]); + // "b" is in neither bucket — caller tracks it under `failed`. + }); + + test("handles all four categories in one classification", () => { + const result = classifySkips({ + requestedIds: ["inserted", "idem", "conflict", "failed"], + insertedIds: ["inserted"], + failedIds: ["failed"], + existing: [ + { id: "idem", meta: { pack: { name: "foo", version: "1" } } }, + { id: "conflict", meta: { pack: { name: "other", version: "1" } } }, + ], + packName: "foo", + packVersion: "1", + }); + expect(result.idempotent).toEqual(["idem"]); + expect(result.conflict).toEqual(["conflict"]); + }); }); diff --git a/packages/cli/commands/pack.ts b/packages/cli/commands/pack.ts index 910f6f5..0deddfb 100644 --- a/packages/cli/commands/pack.ts +++ b/packages/cli/commands/pack.ts @@ -9,6 +9,7 @@ import { readFileSync } from "node:fs"; import * as clack from "@clack/prompts"; import { Command } from "commander"; +import { batchCreateChunked } from "../chunk.ts"; import { createClient } from "../client.ts"; import { resolveCredentials } from "../credentials.ts"; import { getOutputFormat, output, table } from "../output.ts"; @@ -239,30 +240,38 @@ function createPackInstallCommand(): Command { ...(mem.temporal ? { temporal: mem.temporal } : {}), })); - const result = await engine.memory.batchCreate({ - memories: createParams, - }); + // Chunked batch create — large packs are sliced under the + // server's request-body limit, and a single failed chunk doesn't + // take down its siblings (re-running install will self-heal). + const { + insertedIds, + failedIds, + errors: chunkErrors, + } = await batchCreateChunked(engine, createParams); spin?.stop("Done"); // Post-#64 `batchCreate` returns only ids it actually inserted — // conflicting ids are silently skipped. Classify the skips so the - // user sees benign re-installs vs real id collisions. + // user sees benign re-installs vs real id collisions, excluding + // failed-chunk ids (those never reached the server). const requestedIds = createParams .map((p) => p.id) .filter((x): x is string => typeof x === "string"); const { idempotent, conflict } = classifySkips({ requestedIds, - insertedIds: result.ids, + insertedIds, + failedIds, existing: existing.results, packName, packVersion, }); - const installed = result.ids.length; + const installed = insertedIds.length; const skippedIdempotent = idempotent.length; const skippedConflict = conflict.length; const skipped = skippedIdempotent + skippedConflict; + const failed = failedIds.length; const jsonOut: Record = { pack: packName, @@ -272,10 +281,15 @@ function createPackInstallCommand(): Command { skipped, skippedIdempotent, skippedConflict, + failed, }; if (skippedConflict > 0) { jsonOut.skippedConflictIds = conflict; } + if (failed > 0) { + jsonOut.failedIds = failedIds; + jsonOut.errors = chunkErrors; + } output(jsonOut, fmt, () => { // Pure idempotent re-install — distinct success line. @@ -283,7 +297,8 @@ function createPackInstallCommand(): Command { installed === 0 && stale.length === 0 && skippedIdempotent > 0 && - skippedConflict === 0 + skippedConflict === 0 && + failed === 0 ) { clack.log.success( `Pack '${packName}' v${packVersion} already installed (${skippedIdempotent} ${skippedIdempotent === 1 ? "memory" : "memories"} present, no changes)`, @@ -303,6 +318,11 @@ function createPackInstallCommand(): Command { ` └ ${skippedIdempotent} already present (skipped)`, ); } + if (failed > 0) { + lines.push( + ` └ ${failed} failed (chunk error — re-run to retry)`, + ); + } clack.log.success(lines.join("\n")); } @@ -314,6 +334,18 @@ function createPackInstallCommand(): Command { `${skippedConflict} ${noun} not installed — id ${verb} with existing non-pack ${noun}:\n${idsList}\n Inspect with: me memory get `, ); } + + if (failed > 0) { + const errLines = chunkErrors + .map( + (e) => + ` chunk ${e.chunkIndex} (${e.itemCount} items): ${e.error}`, + ) + .join("\n"); + clack.log.error( + `${failed} ${failed === 1 ? "memory" : "memories"} failed before reaching the server:\n${errLines}\n Re-run \`me pack install\` to retry — already-installed memories will be skipped.`, + ); + } }); } catch (error) { handleError(error, fmt); @@ -394,24 +426,34 @@ function createPackListCommand(): Command { /** * `engine.memory.batchCreate` uses `ON CONFLICT (id) DO NOTHING` server-side, * so the returned `ids` array can be shorter than the request when conflicts - * occur. For pack install, those silent skips fall into two buckets: + * occur. For pack install, ids that didn't land fall into three buckets: * * - **idempotent**: the row is already present and tagged with this pack * name + version (a benign re-install of the same version) * - **conflict**: the id is held by something else — a different pack, * a different version, or a non-pack memory the user wrote themselves. * Surfaced as a warning so a real id collision isn't silently masked. + * - **failed (excluded here)**: the id was in a chunk that errored before + * reaching the server. Callers pass these via `failedIds` so they don't + * get mis-classified as conflicts; they're tracked separately under + * the `failed` bucket in the install output. * * Pure function exported for unit testing. */ export function classifySkips(args: { requestedIds: string[]; insertedIds: string[]; + /** + * Ids that were submitted but never reached the server because their + * containing chunk errored. Optional — if omitted, treated as empty. + */ + failedIds?: string[]; existing: ReadonlyArray<{ id: string; meta?: unknown }>; packName: string; packVersion: string; }): { idempotent: string[]; conflict: string[] } { const inserted = new Set(args.insertedIds); + const failed = new Set(args.failedIds ?? []); const existingById = new Map( args.existing.map((m) => [m.id, m.meta]), ); @@ -419,7 +461,7 @@ export function classifySkips(args: { const conflict: string[] = []; for (const id of args.requestedIds) { - if (inserted.has(id)) continue; + if (inserted.has(id) || failed.has(id)) continue; const meta = existingById.get(id); const packMeta = diff --git a/packages/cli/importers/index.ts b/packages/cli/importers/index.ts index bd559e7..521b190 100644 --- a/packages/cli/importers/index.ts +++ b/packages/cli/importers/index.ts @@ -15,6 +15,7 @@ */ import type { MemoryCreateParams } from "@memory.build/protocol/engine"; +import { batchCreateChunked } from "../chunk.ts"; import type { EngineClient } from "../client.ts"; import type { ProgressReporter } from "./progress.ts"; import { SlugRegistry } from "./slug.ts"; @@ -38,12 +39,6 @@ import { deterministicMessageUuidV7 } from "./uuid.ts"; */ export const IMPORTER_VERSION = "1"; -/** - * Maximum memories per `memory.batchCreate` call (matches the protocol - * limit). Sessions with more messages than this are split into chunks. - */ -const BATCH_CREATE_CHUNK = 1000; - /** * Maximum memories per `memory.search` lookup. Same protocol limit. A * session with more existing messages than this triggers a fallback to @@ -299,25 +294,25 @@ async function writeSession( } } - // Inserts: one batchCreate per chunk. + // Inserts: one batchCreate per chunk. Chunks are cut by byte budget OR + // count cap, whichever fires first, so a chunk's serialized request body + // stays under the server's request size limit. if (toInsert.length > 0) { if (options.dryRun) { outcome.inserted += toInsert.length; } else { - for (let i = 0; i < toInsert.length; i += BATCH_CREATE_CHUNK) { - const chunk = toInsert.slice(i, i + BATCH_CREATE_CHUNK); - try { - await engine.memory.batchCreate({ memories: chunk }); - outcome.inserted += chunk.length; - } catch (error) { - const msg = error instanceof Error ? error.message : String(error); - outcome.failed += chunk.length; - for (const c of chunk) { - outcome.errors.push({ - messageId: c.id ?? "(unknown)", - error: msg, - }); - } + const { insertedIds, errors } = await batchCreateChunked( + engine, + toInsert, + ); + outcome.inserted += insertedIds.length; + // Each chunk error contributes its full itemCount to `failed` and + // attaches the same message to each id in that chunk — matching the + // pre-chunking behavior of one error row per attempted message. + for (const e of errors) { + outcome.failed += e.itemCount; + for (const id of e.ids) { + outcome.errors.push({ messageId: id, error: e.error }); } } } diff --git a/packages/cli/mcp/server.ts b/packages/cli/mcp/server.ts index 73efc17..8ba7cc3 100644 --- a/packages/cli/mcp/server.ts +++ b/packages/cli/mcp/server.ts @@ -13,6 +13,7 @@ import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js" import { stringify as yamlStringify } from "yaml"; import { z } from "zod"; import { CLIENT_VERSION } from "../../../version"; +import { batchCreateChunked } from "../chunk.ts"; import type { EngineClient } from "../client.ts"; import { createClient } from "../client.ts"; import { formatMemoryAsMarkdown } from "../commands/memory.ts"; @@ -571,7 +572,11 @@ Docs: ${docUrl("me_memory_import")}`, readOnlyHint: false, destructiveHint: false, // Server-side `ON CONFLICT (id) DO NOTHING` makes repeat calls with - // the same explicit ids land the engine in the same state. + // the same explicit ids land the engine in the same state. With + // chunking, a partial-failure call can be retried safely: ids + // already inserted are skipped, ids in failed chunks are + // re-attempted, and the final state converges to "all submitted + // ids present" once at least one call gets each chunk through. idempotentHint: true, }, }, @@ -666,14 +671,33 @@ Docs: ${docUrl("me_memory_import")}`, .map((m) => m.id) .filter((id): id is string => typeof id === "string"); - const result = await client.memory.batchCreate({ - memories: allMemories, - }); + // Chunked batch create — large imports are sliced under the + // server's request-body limit, and a single failed chunk doesn't + // take down the rest of the import. + const { insertedIds, failedIds, errors } = await batchCreateChunked( + client, + allMemories, + ); + + // Throw only on total failure — the agent should see partial-success + // detail rather than an opaque error for mixed outcomes. + if (insertedIds.length === 0 && errors.length > 0) { + throw new Error( + errors.length === 1 + ? errors[0]?.error + : `All ${errors.length} chunks failed; first error: ${errors[0]?.error}`, + ); + } // Server-side `ON CONFLICT (id) DO NOTHING` may silently drop // duplicate ids; surface those so the caller can investigate. - const insertedSet = new Set(result.ids); - const skippedIds = explicitIds.filter((id) => !insertedSet.has(id)); + // Failed-chunk ids never reached the server, so they're not + // skipped — they're reported separately under `failed`/`errors`. + const insertedSet = new Set(insertedIds); + const failedSet = new Set(failedIds); + const skippedIds = explicitIds.filter( + (id) => !insertedSet.has(id) && !failedSet.has(id), + ); return { content: [ @@ -681,10 +705,12 @@ Docs: ${docUrl("me_memory_import")}`, type: "text" as const, text: JSON.stringify( { - imported: result.ids.length, + imported: insertedIds.length, skipped: skippedIds.length, - ids: result.ids, + failed: failedIds.length, + ids: insertedIds, skippedIds, + errors, }, null, 2,