From 3c82fe8cd8df8118475d2feb9e81f2119e066e75 Mon Sep 17 00:00:00 2001 From: ops Date: Sun, 18 Jan 2026 01:01:16 +0100 Subject: [PATCH] feat: add storage abstraction layer with SQLite provider --- packages/opencode/src/cli/cmd/sqlite.ts | 195 +++++++++ packages/opencode/src/config/config.ts | 13 + packages/opencode/src/index.ts | 2 + packages/opencode/src/project/instance.ts | 13 +- packages/opencode/src/session/index.ts | 2 +- packages/opencode/src/session/message-v2.ts | 3 +- .../opencode/src/storage/json-provider.ts | 112 +++++ packages/opencode/src/storage/provider.ts | 52 +++ .../opencode/src/storage/sqlite-provider.ts | 286 ++++++++++++ packages/opencode/src/storage/storage.ts | 208 ++++++--- packages/opencode/src/util/sqlite-storage.ts | 409 ++++++++++++++++++ sqlite-storage.example.json | 77 ++++ 12 files changed, 1303 insertions(+), 69 deletions(-) create mode 100644 packages/opencode/src/cli/cmd/sqlite.ts create mode 100644 packages/opencode/src/storage/json-provider.ts create mode 100644 packages/opencode/src/storage/provider.ts create mode 100644 packages/opencode/src/storage/sqlite-provider.ts create mode 100644 packages/opencode/src/util/sqlite-storage.ts create mode 100644 sqlite-storage.example.json diff --git a/packages/opencode/src/cli/cmd/sqlite.ts b/packages/opencode/src/cli/cmd/sqlite.ts new file mode 100644 index 00000000000..11eb50e389b --- /dev/null +++ b/packages/opencode/src/cli/cmd/sqlite.ts @@ -0,0 +1,195 @@ +import type { Argv } from "yargs" +import { cmd } from "./cmd" +import { UI } from "../ui" +import { SqliteStorage } from "../../util/sqlite-storage" +import { ENTITY_TYPES } from "../../storage/sqlite-provider" +import * as prompts from "@clack/prompts" + +export const SqliteCommand = cmd({ + command: "sqlite", + describe: "manage SQLite storage backend", + builder: (yargs: Argv) => + yargs.command(SqliteInitCommand).command(SqliteImportCommand).command(SqliteExportCommand).demandCommand(), + async handler() {}, +}) + +export const SqliteInitCommand = cmd({ + command: "init", + describe: "initialize SQLite database", + builder: (yargs: Argv) => + yargs + .option("force", { + describe: "overwrite existing database", + type: "boolean", + default: false, + }) + .option("config", { + describe: "path to sqlite-storage.json config", + type: "string", + }), + handler: async (args) => { + UI.empty() + prompts.intro("Initialize SQLite database") + + const storage = await SqliteStorage.create(args.config) + const dbPath = storage.dbPath() + const configPath = storage.configPath() + + prompts.log.info(`Using schema: ${configPath}`) + prompts.log.info(`Database: ${dbPath}`) + + if (await Bun.file(dbPath).exists()) { + if (!args.force) { + const confirm = await prompts.confirm({ + message: `Database already exists at ${dbPath}. Overwrite?`, + initialValue: false, + }) + + if (prompts.isCancel(confirm) || !confirm) { + throw new UI.CancelledError() + } + } + + await Bun.file(dbPath).writer().end() + await storage.init() + prompts.log.success(`Database re-initialized at ${dbPath}`) + } else { + await storage.init() + prompts.log.success(`Database created at ${dbPath}`) + } + + prompts.outro("Done") + }, +}) + +export const SqliteImportCommand = cmd({ + command: "import", + describe: "import JSON storage to SQLite", + builder: (yargs: Argv) => + yargs + .option("config", { + describe: "path to sqlite-storage.json config", + type: "string", + }) + .option("entity", { + describe: "entity type to import (all if not specified)", + type: "string", + choices: ENTITY_TYPES as unknown as string[], + }) + .option("verbose", { + describe: "show detailed progress", + type: "boolean", + default: false, + }), + handler: async (args) => { + UI.empty() + prompts.intro("Import JSON storage to SQLite") + + try { + const storage = await SqliteStorage.create(args.config) + const dbPath = storage.dbPath() + const configPath = storage.configPath() + + prompts.log.info(`Using schema: ${configPath}`) + prompts.log.info(`Database: ${dbPath}`) + prompts.log.info(`Source: JSON storage`) + + if (!(await Bun.file(dbPath).exists())) { + prompts.log.error(`Database not found. Run 'opencode sqlite init' first`) + throw new UI.CancelledError() + } + + const spinner = prompts.spinner() + spinner.start("Importing data...") + + const result = await storage.importFromJSON({ + entity: args.entity as any, + verbose: args.verbose, + onProgress: (entity, count) => { + if (args.verbose) { + spinner.message(`Importing ${entity}: ${count} records`) + } + }, + }) + + spinner.stop("Import complete") + + prompts.log.success( + `Imported ${result.message} messages, ${result.part} parts, ${result.session} sessions, ${result.project} projects, ${result.todo} todos`, + ) + + prompts.outro("Done") + } catch (error) { + prompts.log.error(`Import failed: ${error instanceof Error ? error.message : String(error)}`) + if (error instanceof Error && error.stack) { + console.error(error.stack) + } + throw error + } + }, +}) + +export const SqliteExportCommand = cmd({ + command: "export", + describe: "export SQLite storage to JSON", + builder: (yargs: Argv) => + yargs + .option("config", { + describe: "path to sqlite-storage.json config", + type: "string", + }) + .option("entity", { + describe: "entity type to export (all if not specified)", + type: "string", + choices: ENTITY_TYPES as unknown as string[], + }) + .option("verbose", { + describe: "show detailed progress", + type: "boolean", + default: false, + }) + .option("force", { + describe: "overwrite existing JSON files", + type: "boolean", + default: false, + }), + handler: async (args) => { + UI.empty() + prompts.intro("Export SQLite storage to JSON") + + const storage = await SqliteStorage.create(args.config) + const dbPath = storage.dbPath() + const configPath = storage.configPath() + + prompts.log.info(`Using schema: ${configPath}`) + prompts.log.info(`Database: ${dbPath}`) + prompts.log.info(`Target: JSON storage`) + + if (!(await Bun.file(dbPath).exists())) { + prompts.log.error(`Database not found at ${dbPath}`) + throw new UI.CancelledError() + } + + const spinner = prompts.spinner() + spinner.start("Exporting data...") + + const result = await storage.exportToJSON({ + entity: args.entity as any, + verbose: args.verbose, + force: args.force, + onProgress: (entity, count) => { + if (args.verbose) { + spinner.message(`Exporting ${entity}: ${count} records`) + } + }, + }) + + spinner.stop("Export complete") + + prompts.log.success( + `Exported ${result.message} messages, ${result.part} parts, ${result.session} sessions, ${result.project} projects, ${result.todo} todos`, + ) + + prompts.outro("Done") + }, +}) diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 1574c644d32..62aca418f16 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -1024,6 +1024,19 @@ export namespace Config { prune: z.boolean().optional().describe("Enable pruning of old tool outputs (default: true)"), }) .optional(), + storage: z + .object({ + backend: z.enum(["json", "sqlite"]).optional().describe("Storage backend to use (default: json)"), + sqlite: z + .object({ + database: z.string().optional().describe("Path to SQLite database file"), + config: z.string().optional().describe("Path to sqlite-storage.json config file"), + }) + .optional() + .describe("SQLite storage configuration"), + }) + .optional() + .describe("Storage backend configuration"), experimental: z .object({ hook: z diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 6dc5e99e91e..dd0c18c2121 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -26,6 +26,7 @@ import { EOL } from "os" import { WebCommand } from "./cli/cmd/web" import { PrCommand } from "./cli/cmd/pr" import { SessionCommand } from "./cli/cmd/session" +import { SqliteCommand } from "./cli/cmd/sqlite" process.on("unhandledRejection", (e) => { Log.Default.error("rejection", { @@ -97,6 +98,7 @@ const cli = yargs(hideBin(process.argv)) .command(GithubCommand) .command(PrCommand) .command(SessionCommand) + .command(SqliteCommand) .fail((msg, err) => { if ( msg?.startsWith("Unknown argument") || diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts index ddaa90f1e2b..41616227fab 100644 --- a/packages/opencode/src/project/instance.ts +++ b/packages/opencode/src/project/instance.ts @@ -5,17 +5,24 @@ import { State } from "./state" import { iife } from "@/util/iife" import { GlobalBus } from "@/bus/global" import { Filesystem } from "@/util/filesystem" +import { Storage } from "../storage/storage" -interface Context { +interface InstanceContext { directory: string worktree: string project: Project.Info } -const context = Context.create("instance") -const cache = new Map>() +const context = Context.create("instance") +const cache = new Map>() +let storageInitialized = false export const Instance = { async provide(input: { directory: string; init?: () => Promise; fn: () => R }): Promise { + if (!storageInitialized) { + await Storage.init() + storageInitialized = true + } + let existing = cache.get(input.directory) if (!existing) { Log.Default.info("creating instance", { directory: input.directory }) diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 3fcdab5238c..2b5347ca038 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -307,7 +307,7 @@ export namespace Session { export async function* list() { const project = Instance.project - for (const item of await Storage.list(["session", project.id])) { + for (const item of await Storage.list(["session", project.id], { orderBy: "-time.updated" })) { yield Storage.read(item) } } diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index d326976f1ae..5e9f4cc9d88 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -580,11 +580,10 @@ export namespace MessageV2 { export const parts = fn(Identifier.schema("message"), async (messageID) => { const result = [] as MessageV2.Part[] - for (const item of await Storage.list(["part", messageID])) { + for (const item of await Storage.list(["part", messageID], { orderBy: "id" })) { const read = await Storage.read(item) result.push(read) } - result.sort((a, b) => (a.id > b.id ? 1 : -1)) return result }) diff --git a/packages/opencode/src/storage/json-provider.ts b/packages/opencode/src/storage/json-provider.ts new file mode 100644 index 00000000000..3db81d0de17 --- /dev/null +++ b/packages/opencode/src/storage/json-provider.ts @@ -0,0 +1,112 @@ +import { Log } from "../util/log" +import path from "path" +import fs from "fs/promises" +import { Global } from "../global" +import { Filesystem } from "../util/filesystem" +import { Lock } from "../util/lock" +import { StorageProvider } from "./provider" + +const log = Log.create({ service: "storage:json" }) + +export class JsonStorageProvider implements StorageProvider.Interface { + constructor(private dir: string) {} + + async read(key: string[]): Promise { + const target = path.join(this.dir, ...key) + ".json" + return this.withErrorHandling(async () => { + using _ = await Lock.read(target) + const result = await Bun.file(target).json() + return result as T + }) + } + + async write(key: string[], content: T): Promise { + const target = path.join(this.dir, ...key) + ".json" + return this.withErrorHandling(async () => { + using _ = await Lock.write(target) + await Bun.write(target, JSON.stringify(content, null, 2)) + }) + } + + async update(key: string[], fn: (draft: T) => void): Promise { + const target = path.join(this.dir, ...key) + ".json" + return this.withErrorHandling(async () => { + using _ = await Lock.write(target) + const content = await Bun.file(target).json() + fn(content) + await Bun.write(target, JSON.stringify(content, null, 2)) + return content as T + }) + } + + async remove(key: string[]): Promise { + const target = path.join(this.dir, ...key) + ".json" + return this.withErrorHandling(async () => { + await fs.unlink(target).catch(() => {}) + }) + } + + async list(prefix: string[], options?: StorageProvider.ListOptions): Promise { + const glob = new Bun.Glob("**/*") + try { + const result = await Array.fromAsync( + glob.scan({ + cwd: path.join(this.dir, ...prefix), + onlyFiles: true, + }), + ).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])) + result.sort() + + // Apply orderBy if specified (requires reading data) + if (options?.orderBy) { + const { field, desc } = this.parseOrderBy(options.orderBy) + const withData = await Promise.all( + result.map(async (key) => ({ + key, + data: await this.read(key), + })), + ) + withData.sort((a, b) => { + const aVal = this.getNestedValue(a.data, field) + const bVal = this.getNestedValue(b.data, field) + const cmp = aVal > bVal ? 1 : aVal < bVal ? -1 : 0 + return desc ? -cmp : cmp + }) + const sorted = withData.map((x) => x.key) + return options.limit ? sorted.slice(0, options.limit) : sorted + } + + // Apply limit without orderBy + return options?.limit ? result.slice(0, options.limit) : result + } catch { + return [] + } + } + + private parseOrderBy(orderBy: string): { field: string; desc: boolean } { + const desc = orderBy.startsWith("-") + const field = desc ? orderBy.slice(1) : orderBy + return { field, desc } + } + + private getNestedValue(obj: any, path: string): any { + const parts = path.split(".") + let current = obj + for (const part of parts) { + if (current === undefined || current === null) return undefined + current = current[part] + } + return current + } + + private async withErrorHandling(body: () => Promise): Promise { + return body().catch((e) => { + if (!(e instanceof Error)) throw e + const errnoException = e as NodeJS.ErrnoException + if (errnoException.code === "ENOENT") { + throw new StorageProvider.NotFoundError({ message: `Resource not found: ${errnoException.path}` }) + } + throw e + }) + } +} diff --git a/packages/opencode/src/storage/provider.ts b/packages/opencode/src/storage/provider.ts new file mode 100644 index 00000000000..1ac6378d4e1 --- /dev/null +++ b/packages/opencode/src/storage/provider.ts @@ -0,0 +1,52 @@ +import { NamedError } from "@opencode-ai/util/error" +import z from "zod" + +export namespace StorageProvider { + export const NotFoundError = NamedError.create( + "NotFoundError", + z.object({ + message: z.string(), + }), + ) + + export interface ListOptions { + /** + * Order by field (dot notation supported, e.g. "time.updated") + * Direction: prepend "-" for descending (e.g. "-time.updated") + */ + orderBy?: string + /** + * Limit number of results + */ + limit?: number + } + + export interface Interface { + /** + * Read data from storage + * @throws NotFoundError if not found + */ + read(key: string[]): Promise + + /** + * Write data to storage + */ + write(key: string[], content: T): Promise + + /** + * Update data in storage + * @throws NotFoundError if not found + */ + update(key: string[], fn: (draft: T) => void): Promise + + /** + * Remove data from storage + */ + remove(key: string[]): Promise + + /** + * List keys with given prefix + */ + list(prefix: string[], options?: ListOptions): Promise + } +} diff --git a/packages/opencode/src/storage/sqlite-provider.ts b/packages/opencode/src/storage/sqlite-provider.ts new file mode 100644 index 00000000000..00a8bcaf2de --- /dev/null +++ b/packages/opencode/src/storage/sqlite-provider.ts @@ -0,0 +1,286 @@ +import { Database } from "bun:sqlite" +import { StorageProvider } from "./provider" +import { Log } from "../util/log" +import path from "path" +import z from "zod" + +const log = Log.create({ service: "storage:sqlite" }) + +// Shared entity types +export const ENTITY_TYPES = ["message", "part", "session", "project", "todo", "session_diff", "session_share"] as const +export type EntityType = (typeof ENTITY_TYPES)[number] + +// Config schema - shared with sqlite-storage.ts +const ColumnSchema = z.record(z.string(), z.string()) + +const TableConfigSchema = z.object({ + columns: ColumnSchema, + extract: z.array(z.string()).optional(), + indices: z.array(z.string()).optional(), +}) + +export const SqliteConfigSchema = z.object({ + database: z.string(), + tables: z + .object({ + message: TableConfigSchema.optional(), + part: TableConfigSchema.optional(), + session: TableConfigSchema.optional(), + project: TableConfigSchema.optional(), + todo: TableConfigSchema.optional(), + session_diff: TableConfigSchema.optional(), + session_share: TableConfigSchema.optional(), + }) + .optional(), +}) + +export type SqliteConfig = z.infer +export type TableConfig = z.infer + +export class SqliteStorageProvider implements StorageProvider.Interface { + private db: Database + private config: SqliteConfig + + constructor(dbPath: string, config: SqliteConfig) { + this.db = new Database(dbPath) + this.config = config + log.info("Initialized SQLite storage", { dbPath }) + } + + async read(key: string[]): Promise { + const [entity, ...rest] = key + const id = rest[rest.length - 1] + const entityType = entity as EntityType + + const tableConfig = this.config.tables?.[entityType] + if (!tableConfig) { + throw new StorageProvider.NotFoundError({ message: `No table config for ${entity}` }) + } + + const row = this.db.prepare(`SELECT * FROM ${entity} WHERE id = ?`).get(id) as any + + if (!row) { + throw new StorageProvider.NotFoundError({ message: `Resource not found: ${key.join("/")}` }) + } + + return this.mapFromRow(row, tableConfig, entity) as T + } + + async write(key: string[], content: T): Promise { + const [entity, ...rest] = key + const id = rest[rest.length - 1] + const entityType = entity as EntityType + + const tableConfig = this.config.tables?.[entityType] + if (!tableConfig) { + log.warn(`No table config for ${entity}, skipping write`) + return + } + + const row = this.mapToRow(content, tableConfig, key) + + const columns = Object.keys(row) + .map((col) => `\`${col}\``) + .join(", ") + const placeholders = Object.keys(row) + .map(() => "?") + .join(", ") + + this.db.prepare(`INSERT OR REPLACE INTO ${entity} (${columns}) VALUES (${placeholders})`).run(...Object.values(row)) + } + + async update(key: string[], fn: (draft: T) => void): Promise { + // Read current value + const current = await this.read(key) + + // Mutate it + fn(current) + + // Write it back + await this.write(key, current) + + return current + } + + async remove(key: string[]): Promise { + const [entity, ...rest] = key + const id = rest[rest.length - 1] + + this.db.prepare(`DELETE FROM ${entity} WHERE id = ?`).run(id) + } + + async list(prefix: string[], options?: StorageProvider.ListOptions): Promise { + const [entity, ...rest] = prefix + const entityType = entity as EntityType + + const tableConfig = this.config.tables?.[entityType] + if (!tableConfig) { + return [] + } + + // Build ORDER BY clause + let orderByClause = "" + if (options?.orderBy) { + const { field, desc } = this.parseOrderBy(options.orderBy) + // Check if field is extracted, otherwise order by id + const isExtracted = tableConfig.extract?.includes(field) + const orderField = isExtracted ? `\`${field}\`` : "id" + orderByClause = ` ORDER BY ${orderField} ${desc ? "DESC" : "ASC"}` + } else { + // Default: order by id + orderByClause = " ORDER BY id ASC" + } + + // Build LIMIT clause + const limitClause = options?.limit ? ` LIMIT ${options.limit}` : "" + + // Handle filtering based on prefix + let rows: any[] + + if (rest.length === 0) { + // No filter - get all entities + rows = this.db.prepare(`SELECT * FROM ${entity}${orderByClause}${limitClause}`).all() as any[] + } else { + // Filter based on entity type and prefix + switch (entityType) { + case "message": + // prefix: ["message", sessionID] + if (rest[0]) { + rows = this.db + .prepare(`SELECT * FROM message WHERE sessionID = ?${orderByClause}${limitClause}`) + .all(rest[0]) as any[] + } else { + rows = this.db.prepare(`SELECT * FROM message${orderByClause}${limitClause}`).all() as any[] + } + break + case "part": + // prefix: ["part", messageID] + if (rest[0]) { + rows = this.db + .prepare(`SELECT * FROM part WHERE messageID = ?${orderByClause}${limitClause}`) + .all(rest[0]) as any[] + } else { + rows = this.db.prepare(`SELECT * FROM part${orderByClause}${limitClause}`).all() as any[] + } + break + case "session": + // prefix: ["session", projectID] + if (rest[0]) { + rows = this.db + .prepare(`SELECT * FROM session WHERE projectID = ?${orderByClause}${limitClause}`) + .all(rest[0]) as any[] + } else { + rows = this.db.prepare(`SELECT * FROM session${orderByClause}${limitClause}`).all() as any[] + } + break + case "todo": + // prefix: ["todo", sessionID] + if (rest[0]) { + rows = this.db + .prepare(`SELECT * FROM todo WHERE sessionID = ?${orderByClause}${limitClause}`) + .all(rest[0]) as any[] + } else { + rows = this.db.prepare(`SELECT * FROM todo${orderByClause}${limitClause}`).all() as any[] + } + break + case "session_diff": + // session_diff has no parent filtering + rows = this.db.prepare(`SELECT * FROM session_diff${orderByClause}${limitClause}`).all() as any[] + break + case "session_share": + // session_share has no parent filtering + rows = this.db.prepare(`SELECT * FROM session_share${orderByClause}${limitClause}`).all() as any[] + break + case "project": + // Projects don't have parent filtering + rows = this.db.prepare(`SELECT * FROM project${orderByClause}${limitClause}`).all() as any[] + break + default: + rows = [] + } + } + + return rows.map((row) => this.getStoragePathFromRow(entityType, row)) + } + + private parseOrderBy(orderBy: string): { field: string; desc: boolean } { + const desc = orderBy.startsWith("-") + const field = desc ? orderBy.slice(1) : orderBy + return { field, desc } + } + + close() { + this.db.close() + } + + // Mapping logic + + private mapToRow(data: any, config: TableConfig, storagePath: string[]): Record { + const row: Record = {} + + // Extract ID from storage path + row.id = storagePath[storagePath.length - 1] + + // Extract specified fields to columns + if (config.extract) { + for (const field of config.extract) { + const value = this.getNestedValue(data, field) + if (value !== undefined) { + row[field] = this.serializeValue(value) + } + } + } + + // Store COMPLETE object in data blob (no deletion for simplicity) + // This means extracted fields are stored twice (column + blob) but avoids reconstruction bugs + row.data = JSON.stringify(data) + + return row + } + + private mapFromRow(row: any, config: TableConfig, entity?: string): any { + // Parse complete object from JSON blob + // Since we store the complete object, no reconstruction needed + return row.data ? JSON.parse(row.data) : {} + } + + private getStoragePathFromRow(entity: EntityType, row: any): string[] { + switch (entity) { + case "message": + return ["message", row.sessionID, row.id] + case "part": + return ["part", row.messageID, row.id] + case "session": + return ["session", row.projectID, row.id] + case "project": + return ["project", row.id] + case "todo": + return ["todo", row.sessionID, row.id] + case "session_diff": + return ["session_diff", row.id] + case "session_share": + return ["session_share", row.id] + default: + return [] + } + } + + private getNestedValue(obj: any, path: string): any { + const parts = path.split(".") + let current = obj + + for (const part of parts) { + if (current === undefined || current === null) return undefined + current = current[part] + } + + return current + } + + private serializeValue(value: any): any { + if (typeof value === "object" && value !== null) { + return JSON.stringify(value) + } + return value + } +} diff --git a/packages/opencode/src/storage/storage.ts b/packages/opencode/src/storage/storage.ts index 18f2d67e7ac..12e8bc0853f 100644 --- a/packages/opencode/src/storage/storage.ts +++ b/packages/opencode/src/storage/storage.ts @@ -1,25 +1,23 @@ import { Log } from "../util/log" import path from "path" -import fs from "fs/promises" import { Global } from "../global" import { Filesystem } from "../util/filesystem" import { lazy } from "../util/lazy" -import { Lock } from "../util/lock" import { $ } from "bun" -import { NamedError } from "@opencode-ai/util/error" -import z from "zod" +import { StorageProvider } from "./provider" +import { JsonStorageProvider } from "./json-provider" +import { SqliteStorageProvider, SqliteConfigSchema } from "./sqlite-provider" export namespace Storage { const log = Log.create({ service: "storage" }) type Migration = (dir: string) => Promise - export const NotFoundError = NamedError.create( - "NotFoundError", - z.object({ - message: z.string(), - }), - ) + // Re-export NotFoundError from provider + export const NotFoundError = StorageProvider.NotFoundError + + // Provider instance + let provider: StorageProvider.Interface | null = null const MIGRATIONS: Migration[] = [ async (dir) => { @@ -158,70 +156,154 @@ export namespace Storage { } }) + /** + * Initialize storage provider based on config + */ + export async function init() { + // Load config directly from file to avoid circular dependency with Config.state() + // which depends on Instance which depends on Storage + const { Global } = await import("../global") + const configPath = path.join(Global.Path.config, "opencode.json") + + let config: any = {} + if (await Bun.file(configPath).exists()) { + config = await Bun.file(configPath).json() + } + + const backend = config.storage?.backend || "json" + + if (backend === "sqlite") { + log.info("Initializing SQLite storage backend") + + // Load sqlite config + const sqliteConfigPath = config.storage?.sqlite?.config || path.join(Global.Path.config, "sqlite-storage.json") + + let sqliteConfig: any + if (await Bun.file(sqliteConfigPath).exists()) { + const json = await Bun.file(sqliteConfigPath).json() + sqliteConfig = SqliteConfigSchema.parse(json) + } else { + // Use default database path from config or default with minimal schema + sqliteConfig = { + database: config.storage?.sqlite?.database || path.join(Global.Path.data, "storage.db"), + tables: { + message: { + columns: { + id: "TEXT PRIMARY KEY", + sessionID: "TEXT", + role: "TEXT", + "time.created": "INTEGER", + data: "TEXT", + }, + extract: ["sessionID", "role", "time.created"], + indices: ["sessionID", "time.created"], + }, + part: { + columns: { id: "TEXT PRIMARY KEY", messageID: "TEXT", type: "TEXT", data: "TEXT" }, + extract: ["messageID", "type"], + indices: ["messageID"], + }, + session: { + columns: { + id: "TEXT PRIMARY KEY", + projectID: "TEXT", + title: "TEXT", + "time.created": "INTEGER", + "time.updated": "INTEGER", + data: "TEXT", + }, + extract: ["projectID", "title", "time.updated"], + indices: ["projectID", "time.updated"], + }, + project: { + columns: { + id: "TEXT PRIMARY KEY", + worktree: "TEXT", + vcs: "TEXT", + "time.created": "INTEGER", + "time.updated": "INTEGER", + data: "TEXT", + }, + extract: ["worktree", "vcs", "time.created", "time.updated"], + indices: ["worktree", "time.created", "time.updated"], + }, + todo: { + columns: { id: "TEXT PRIMARY KEY", sessionID: "TEXT", data: "TEXT" }, + extract: ["sessionID"], + indices: ["sessionID"], + }, + session_diff: { + columns: { id: "TEXT PRIMARY KEY", data: "TEXT" }, + extract: [], + indices: [], + }, + session_share: { + columns: { id: "TEXT PRIMARY KEY", secret: "TEXT", url: "TEXT", data: "TEXT" }, + extract: ["secret", "url"], + indices: [], + }, + }, + } + log.warn("No SQLite config found, using minimal schema", { + path: sqliteConfigPath, + database: sqliteConfig.database, + }) + } + + const dbPath = sqliteConfig.database.replace(/^~/, Global.Path.home) + const resolvedDbPath = path.resolve(dbPath) + + provider = new SqliteStorageProvider(resolvedDbPath, sqliteConfig) + log.info("SQLite storage provider initialized", { dbPath: resolvedDbPath }) + } else { + log.info("Using default JSON storage backend") + // Initialize JSON provider immediately + const { dir } = await state() + provider = new JsonStorageProvider(dir) + log.info("JSON storage provider initialized", { dir }) + } + } + + /** + * Set the storage provider (JsonStorageProvider or SqliteStorageProvider) + */ + export function setProvider(p: StorageProvider.Interface) { + provider = p + log.info("Storage provider set", { provider: p.constructor.name }) + } + + /** + * Get the current storage provider, initializing with JsonStorageProvider if needed + */ + async function getProvider(): Promise { + if (!provider) { + throw new Error("Storage provider not initialized. Call Storage.init() first.") + } + return provider + } + export async function remove(key: string[]) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - await fs.unlink(target).catch(() => {}) - }) + const p = await getProvider() + return p.remove(key) } export async function read(key: string[]) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.read(target) - const result = await Bun.file(target).json() - return result as T - }) + const p = await getProvider() + return p.read(key) } export async function update(key: string[], fn: (draft: T) => void) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.write(target) - const content = await Bun.file(target).json() - fn(content) - await Bun.write(target, JSON.stringify(content, null, 2)) - return content as T - }) + const p = await getProvider() + return p.update(key, fn) } export async function write(key: string[], content: T) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.write(target) - await Bun.write(target, JSON.stringify(content, null, 2)) - }) + const p = await getProvider() + return p.write(key, content) } - async function withErrorHandling(body: () => Promise) { - return body().catch((e) => { - if (!(e instanceof Error)) throw e - const errnoException = e as NodeJS.ErrnoException - if (errnoException.code === "ENOENT") { - throw new NotFoundError({ message: `Resource not found: ${errnoException.path}` }) - } - throw e - }) - } - - const glob = new Bun.Glob("**/*") - export async function list(prefix: string[]) { - const dir = await state().then((x) => x.dir) - try { - const result = await Array.fromAsync( - glob.scan({ - cwd: path.join(dir, ...prefix), - onlyFiles: true, - }), - ).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])) - result.sort() - return result - } catch { - return [] - } + export async function list(prefix: string[], options?: StorageProvider.ListOptions) { + const p = await getProvider() + return p.list(prefix, options) } } diff --git a/packages/opencode/src/util/sqlite-storage.ts b/packages/opencode/src/util/sqlite-storage.ts new file mode 100644 index 00000000000..49aa7c7f0dc --- /dev/null +++ b/packages/opencode/src/util/sqlite-storage.ts @@ -0,0 +1,409 @@ +import { Database } from "bun:sqlite" +import { Global } from "../global" +import { Storage } from "../storage/storage" +import path from "path" +import { Log } from "./log" +import { + SqliteConfigSchema, + type SqliteConfig, + type TableConfig, + type EntityType, + ENTITY_TYPES, +} from "../storage/sqlite-provider" + +const log = Log.create({ service: "sqlite-storage" }) + +const DEFAULT_CONFIG: SqliteConfig = { + database: "~/.local/share/opencode/storage.db", + tables: { + message: { + columns: { + id: "TEXT PRIMARY KEY", + sessionID: "TEXT", + role: "TEXT", + "time.created": "INTEGER", + data: "TEXT", + }, + extract: ["sessionID", "role", "time.created"], + indices: ["sessionID", "time.created"], + }, + part: { + columns: { + id: "TEXT PRIMARY KEY", + messageID: "TEXT", + type: "TEXT", + data: "TEXT", + }, + extract: ["messageID", "type"], + indices: ["messageID"], + }, + session: { + columns: { + id: "TEXT PRIMARY KEY", + projectID: "TEXT", + title: "TEXT", + "time.created": "INTEGER", + "time.updated": "INTEGER", + data: "TEXT", + }, + extract: ["projectID", "title", "time.updated"], + indices: ["projectID", "time.updated"], + }, + project: { + columns: { + id: "TEXT PRIMARY KEY", + worktree: "TEXT", + vcs: "TEXT", + "time.created": "INTEGER", + "time.updated": "INTEGER", + data: "TEXT", + }, + extract: ["worktree", "vcs", "time.created", "time.updated"], + indices: ["worktree", "time.created", "time.updated"], + }, + todo: { + columns: { + id: "TEXT PRIMARY KEY", + sessionID: "TEXT", + data: "TEXT", + }, + extract: ["sessionID"], + indices: ["sessionID"], + }, + session_diff: { + columns: { id: "TEXT PRIMARY KEY", data: "TEXT" }, + extract: [], + indices: [], + }, + session_share: { + columns: { id: "TEXT PRIMARY KEY", secret: "TEXT", url: "TEXT", data: "TEXT" }, + extract: ["secret", "url"], + indices: [], + }, + }, +} + +export namespace SqliteStorage { + export async function create(configPath?: string) { + const result = await loadConfig(configPath) + return new StorageImpl(result.config, result.configPath) + } + + async function loadConfig(configPath?: string): Promise<{ config: SqliteConfig; configPath: string }> { + const defaultPath = path.join(Global.Path.config, "sqlite-storage.json") + const target = configPath ? path.resolve(configPath) : defaultPath + + if (await Bun.file(target).exists()) { + const json = await Bun.file(target).json() + return { + config: SqliteConfigSchema.parse(json), + configPath: target, + } + } + + // Write default config if it doesn't exist + if (!configPath) { + await Bun.write(target, JSON.stringify(DEFAULT_CONFIG, null, 2)) + log.info("Created default sqlite-storage.json", { path: target }) + } + + return { + config: DEFAULT_CONFIG, + configPath: target, + } + } + + class StorageImpl { + private db: Database | null = null + private config: SqliteConfig + private configPathUsed: string + + constructor(config: SqliteConfig, configPath: string) { + this.config = config + this.configPathUsed = configPath + } + + dbPath(): string { + const dbPath = this.config.database.replace(/^~/, Global.Path.home) + return path.resolve(dbPath) + } + + configPath(): string { + return this.configPathUsed + } + + private getDB(): Database { + if (!this.db) { + this.db = new Database(this.dbPath()) + } + return this.db + } + + async init() { + const db = this.getDB() + + // Create tables based on config + for (const [tableName, tableConfig] of Object.entries(this.config.tables ?? {})) { + if (!tableConfig) continue + + const columns = Object.entries(tableConfig.columns) + .map(([name, type]) => `\`${name}\` ${type}`) + .join(", ") + + db.run(`DROP TABLE IF EXISTS ${tableName}`) + db.run(`CREATE TABLE ${tableName} (${columns})`) + + // Create indices + if (tableConfig.indices) { + for (const column of tableConfig.indices) { + db.run(`CREATE INDEX idx_${tableName}_${column.replace(/\./g, "_")} ON ${tableName}(\`${column}\`)`) + } + } + + log.info(`Created table ${tableName}`, { + columns: Object.keys(tableConfig.columns), + indices: tableConfig.indices, + }) + } + } + + async importFromJSON(options: { + entity?: EntityType + verbose?: boolean + onProgress?: (entity: EntityType, count: number) => void + }) { + const entities: EntityType[] = options.entity ? [options.entity] : [...ENTITY_TYPES] + + const result = { + message: 0, + part: 0, + session: 0, + project: 0, + todo: 0, + session_diff: 0, + session_share: 0, + } + + for (const entity of entities) { + const count = await this.importEntity(entity, options.onProgress) + result[entity] = count + } + + return result + } + + private async importEntity(entity: EntityType, onProgress?: (entity: EntityType, count: number) => void) { + const tableConfig = this.config.tables?.[entity] + if (!tableConfig) { + log.warn(`No table config for ${entity}, skipping`) + return 0 + } + + const db = this.getDB() + let count = 0 + + // Get storage paths based on entity type + log.info(`Getting storage paths for ${entity}`) + const paths = await this.getStoragePaths(entity) + log.info(`Found ${paths.length} ${entity} records to import`) + + for (const storagePath of paths) { + try { + const data = await Storage.read(storagePath) + const row = this.mapToRow(data, tableConfig, storagePath) + + const columns = Object.keys(row) + .map((col) => `\`${col}\``) + .join(", ") + const placeholders = Object.keys(row) + .map(() => "?") + .join(", ") + + const stmt = db.prepare(`INSERT OR REPLACE INTO ${entity} (${columns}) VALUES (${placeholders})`) + stmt.run(...Object.values(row)) + + count++ + if (onProgress && count % 100 === 0) { + onProgress(entity, count) + } + } catch (error) { + log.error(`Failed to import ${entity}`, { path: storagePath, error }) + } + } + + return count + } + + async exportToJSON(options: { + entity?: EntityType + verbose?: boolean + force?: boolean + onProgress?: (entity: EntityType, count: number) => void + }) { + const entities: EntityType[] = options.entity ? [options.entity] : [...ENTITY_TYPES] + + const result = { + message: 0, + part: 0, + session: 0, + project: 0, + todo: 0, + session_diff: 0, + session_share: 0, + } + + for (const entity of entities) { + const count = await this.exportEntity(entity, options.force ?? false, options.onProgress) + result[entity] = count + } + + return result + } + + private async exportEntity( + entity: EntityType, + force: boolean, + onProgress?: (entity: EntityType, count: number) => void, + ) { + const tableConfig = this.config.tables?.[entity] + if (!tableConfig) { + log.warn(`No table config for ${entity}, skipping`) + return 0 + } + + const db = this.getDB() + const rows = db.prepare(`SELECT * FROM ${entity}`).all() as any[] + + // Get JSON storage directory + const jsonStorageDir = path.join(Global.Path.data, "storage") + + let count = 0 + for (const row of rows) { + try { + const data = this.mapFromRow(row, tableConfig) + const storagePath = this.getStoragePathFromRow(entity, row) + + // Construct file path for JSON storage + const filePath = path.join(jsonStorageDir, ...storagePath) + ".json" + const fileDir = path.dirname(filePath) + + // Create directory if it doesn't exist + await Bun.$`mkdir -p ${fileDir}`.quiet() + + // Check if file exists and force flag + if (!force && (await Bun.file(filePath).exists())) { + log.warn(`File already exists, skipping (use --force to overwrite)`, { path: filePath }) + continue + } + + // Write directly to JSON file + await Bun.file(filePath).write(JSON.stringify(data, null, 2)) + + count++ + if (onProgress && count % 100 === 0) { + onProgress(entity, count) + } + } catch (error) { + log.error(`Failed to export ${entity}`, { row, error }) + } + } + + return count + } + + private mapToRow(data: any, config: TableConfig, storagePath: string[]): Record { + const row: Record = {} + + // Extract ID from storage path + row.id = storagePath[storagePath.length - 1] + + // Extract specified fields to columns + if (config.extract) { + for (const field of config.extract) { + const value = this.getNestedValue(data, field) + if (value !== undefined) { + row[field] = this.serializeValue(value) + } + } + } + + // Store COMPLETE object in data blob (no deletion for simplicity) + row.data = JSON.stringify(data) + + return row + } + + private mapFromRow(row: any, config: TableConfig): any { + // Parse complete object from JSON blob + return row.data ? JSON.parse(row.data) : {} + } + + private async getStoragePaths(entity: EntityType): Promise { + switch (entity) { + case "message": + return await Storage.list(["message"]) + case "part": + return await Storage.list(["part"]) + case "session": + return await Storage.list(["session"]) + case "project": + return await Storage.list(["project"]) + case "todo": + return await Storage.list(["todo"]) + case "session_diff": + return await Storage.list(["session_diff"]) + case "session_share": + return await Storage.list(["session_share"]) + default: + return [] + } + } + + private getStoragePathFromRow(entity: EntityType, row: any): string[] { + switch (entity) { + case "message": + return ["message", row.sessionID, row.id] + case "part": + return ["part", row.messageID, row.id] + case "session": + return ["session", row.projectID, row.id] + case "project": + return ["project", row.id] + case "todo": + return ["todo", row.sessionID, row.id] + case "session_diff": + return ["session_diff", row.id] + case "session_share": + return ["session_share", row.id] + default: + return [] + } + } + + private getNestedValue(obj: any, path: string): any { + const parts = path.split(".") + let current = obj + + for (const part of parts) { + if (current === undefined || current === null) return undefined + current = current[part] + } + + return current + } + + private serializeValue(value: any): any { + if (typeof value === "object" && value !== null) { + return JSON.stringify(value) + } + return value + } + + close() { + if (this.db) { + this.db.close() + this.db = null + } + } + } +} diff --git a/sqlite-storage.example.json b/sqlite-storage.example.json new file mode 100644 index 00000000000..dd78534be45 --- /dev/null +++ b/sqlite-storage.example.json @@ -0,0 +1,77 @@ +{ + "database": "~/.local/share/opencode/storage.db", + "tables": { + "message": { + "columns": { + "id": "TEXT PRIMARY KEY", + "sessionID": "TEXT", + "role": "TEXT", + "time.created": "INTEGER", + "data": "TEXT" + }, + "extract": ["sessionID", "role", "time.created"], + "indices": ["sessionID", "time.created"] + }, + "part": { + "columns": { + "id": "TEXT PRIMARY KEY", + "messageID": "TEXT", + "type": "TEXT", + "data": "TEXT" + }, + "extract": ["messageID", "type"], + "indices": ["messageID"] + }, + "session": { + "columns": { + "id": "TEXT PRIMARY KEY", + "projectID": "TEXT", + "title": "TEXT", + "time.created": "INTEGER", + "time.updated": "INTEGER", + "data": "TEXT" + }, + "extract": ["projectID", "title", "time.updated"], + "indices": ["projectID", "time.updated"] + }, + "project": { + "columns": { + "id": "TEXT PRIMARY KEY", + "worktree": "TEXT", + "vcs": "TEXT", + "time.created": "INTEGER", + "time.updated": "INTEGER", + "data": "TEXT" + }, + "extract": ["worktree", "vcs", "time.created", "time.updated"], + "indices": ["worktree", "time.created", "time.updated"] + }, + "todo": { + "columns": { + "id": "TEXT PRIMARY KEY", + "sessionID": "TEXT", + "data": "TEXT" + }, + "extract": ["sessionID"], + "indices": ["sessionID"] + }, + "session_diff": { + "columns": { + "id": "TEXT PRIMARY KEY", + "data": "TEXT" + }, + "extract": [], + "indices": [] + }, + "session_share": { + "columns": { + "id": "TEXT PRIMARY KEY", + "secret": "TEXT", + "url": "TEXT", + "data": "TEXT" + }, + "extract": ["secret", "url"], + "indices": [] + } + } +}