diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 46cd39f2..a9a32dd2 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -281,6 +281,7 @@ export type { ProviderSettings, SandboxFactory, SessionData, + SessionDelta, SessionEnv, SessionOptions, SessionStore, diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index f0abe1e0..47a65d9f 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -8,6 +8,7 @@ export type { FlueEvent, FlueEventCallback, SessionData, + SessionDelta, SessionStore, SessionEnv, FileStat, diff --git a/packages/sdk/src/session.ts b/packages/sdk/src/session.ts index 91f9401e..43eb944d 100644 --- a/packages/sdk/src/session.ts +++ b/packages/sdk/src/session.ts @@ -221,6 +221,11 @@ export class SessionHistory { return path.reverse(); } + /** Entry ids in storage order, used by the SDK to compute saveDelta removals. */ + getEntryIds(): string[] { + return this.entries.map((entry) => entry.id); + } + /** * Active-path entries appended after `afterLeafId` (exclusive), in order. * @@ -405,6 +410,7 @@ export class Session implements FlueSession { private env: SessionEnv; private store: SessionStore; private history: SessionHistory; + private lastSavedEntryIds: Set; private createdAt: string | undefined; private compactionSettings: CompactionSettings; private overflowRecoveryAttempted = false; @@ -439,6 +445,9 @@ export class Session implements FlueSession { this.createdAt = options.existingData?.createdAt; this.history = SessionHistory.fromData(options.existingData); + // Pre-existing entries are considered "already saved" — adapters that + // implement saveDelta? get only changes made after construction. + this.lastSavedEntryIds = new Set(this.history.getEntryIds()); const cc = this.config.compaction; this.compactionSettings = { @@ -1244,7 +1253,23 @@ export class Session implements FlueSession { const now = new Date().toISOString(); const data = this.history.toData(this.metadata, this.createdAt ?? now, now); if (!this.createdAt) this.createdAt = now; - await this.store.save(this.storageKey, data); + const currentEntryIds = new Set(data.entries.map((entry) => entry.id)); + if (typeof this.store.saveDelta === 'function') { + const newEntries = data.entries.filter((entry) => !this.lastSavedEntryIds.has(entry.id)); + const removedEntryIds = [...this.lastSavedEntryIds].filter((entryId) => !currentEntryIds.has(entryId)); + await this.store.saveDelta(this.storageKey, { + version: data.version, + newEntries, + removedEntryIds, + leafId: data.leafId, + metadata: data.metadata, + createdAt: data.createdAt, + updatedAt: data.updatedAt, + }); + } else { + await this.store.save(this.storageKey, data); + } + this.lastSavedEntryIds = currentEntryIds; } private async recordTaskSession( diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts index 65af7ac4..458b0562 100644 --- a/packages/sdk/src/types.ts +++ b/packages/sdk/src/types.ts @@ -536,10 +536,66 @@ export interface BranchSummaryEntry extends SessionEntryBase { details?: unknown; } +/** + * Delta passed to `SessionStore.saveDelta?`. Contains the entries appended + * since the last successful save, any entries removed from the current + * `SessionData`, and the current session header fields as full overwrites. + * + * Most history changes are append-only: compaction and branch summaries push + * new entries without mutating older ones. Overflow recovery is the exception: + * the SDK may remove a failed assistant leaf before retrying. Adapters must + * apply `removedEntryIds` before appending `newEntries` so `load()` returns + * the latest authoritative `SessionData`, not the union of all entries ever + * seen. + */ +export interface SessionDelta { + /** Session data version. Matches `SessionData.version`. */ + version: SessionData['version']; + /** Entries appended since the last save call (in order). */ + newEntries: SessionEntry[]; + /** Entry ids removed from the current session since the last save. */ + removedEntryIds: string[]; + /** Current leaf id (full overwrite). */ + leafId: string | null; + /** Current metadata (full overwrite — small object). */ + metadata: Record; + /** Session creation timestamp (full overwrite). */ + createdAt: string; + /** Session update timestamp for this save (full overwrite). */ + updatedAt: string; +} + export interface SessionStore { save(id: string, data: SessionData): Promise; load(id: string): Promise; delete(id: string): Promise; + + /** + * Optional delta hook. If implemented, it is called by live `Session` + * instances *instead of* `save()` with only the entry changes since the + * last successful save. Adapters that implement this can persist O(delta) + * per turn instead of O(history). + * + * Dispatch is checked per call via `typeof store.saveDelta === 'function'`, + * so adapters that implement both methods will only see `saveDelta` invoked + * for live session saves. `save()` remains required: Flue still uses it for + * initial empty session creation and for adapters that don't opt in. + * + * `load(id)` must still return the full `SessionData` — the adapter is + * responsible for reconstructing it from its records. + * + * When a session is resumed from `load()`, pre-existing entries are treated + * as already saved; the first `saveDelta` carries only changes made after + * construction. When `load()` returns null, Flue first writes an empty + * `SessionData` via `save()`, then later `saveDelta` calls carry entries + * appended after that empty snapshot. + * + * `newEntries.length === 0` is possible (a `save()` call with nothing to + * append) and adapters should still apply `removedEntryIds` plus the + * `leafId`/`metadata`/timestamp refresh. Empty `newEntries` is never a + * signal to delete all prior entries. + */ + saveDelta?(id: string, delta: SessionDelta): Promise; } // ─── Options ────────────────────────────────────────────────────────────────