diff --git a/bun.lock b/bun.lock index 10ba56f..6f341f7 100644 --- a/bun.lock +++ b/bun.lock @@ -14,14 +14,14 @@ }, "packages/accounts": { "name": "@memory.build/accounts", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@pydantic/logfire-node": "^0.13.1", }, }, "packages/cli": { "name": "@memory.build/cli", - "version": "0.2.5", + "version": "0.2.6", "bin": { "me": "./index.ts", }, @@ -38,7 +38,7 @@ }, "packages/client": { "name": "@memory.build/client", - "version": "0.2.5", + "version": "0.2.6", "dependencies": { "@memory.build/protocol": "workspace:*", }, @@ -49,6 +49,13 @@ "typescript", ], }, + "packages/core": { + "name": "@memory.build/core", + "version": "0.2.5", + "dependencies": { + "@pydantic/logfire-node": "^0.13.1", + }, + }, "packages/docs-site": { "name": "@memory.build/docs-site", "version": "0.0.0", @@ -83,7 +90,7 @@ }, "packages/embedding": { "name": "@memory.build/embedding", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@ai-sdk/openai": "^3.0.0", "@pydantic/logfire-node": "^0.13.1", @@ -92,14 +99,14 @@ }, "packages/engine": { "name": "@memory.build/engine", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@pydantic/logfire-node": "^0.13.1", }, }, "packages/protocol": { "name": "@memory.build/protocol", - "version": "0.2.5", + "version": "0.2.6", "dependencies": { "zod": "^4.0.0", }, @@ -112,7 +119,7 @@ }, "packages/server": { "name": "memory-engine-server", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@memory.build/accounts": "workspace:*", "@memory.build/embedding": "workspace:*", @@ -123,6 +130,13 @@ "zod": "^4.0.0", }, }, + "packages/space": { + "name": "@memory.build/space", + "version": "0.2.5", + "dependencies": { + "@pydantic/logfire-node": "^0.13.1", + }, + }, "packages/web": { "name": "@memory.build/web", "version": "0.1.17", @@ -153,7 +167,7 @@ }, "packages/worker": { "name": "@memory.build/worker", - "version": "0.2.0", + "version": "0.2.5", "dependencies": { "@memory.build/embedding": "workspace:*", "@memory.build/engine": "workspace:*", @@ -163,7 +177,9 @@ "scripts": { "name": "scripts", "dependencies": { + "@memory.build/core": "workspace:*", "@memory.build/embedding": "workspace:*", + "@memory.build/space": "workspace:*", "yaml": "^2.7.0", }, }, @@ -369,6 +385,8 @@ "@memory.build/client": ["@memory.build/client@workspace:packages/client"], + "@memory.build/core": ["@memory.build/core@workspace:packages/core"], + "@memory.build/docs-site": ["@memory.build/docs-site@workspace:packages/docs-site"], "@memory.build/embedding": ["@memory.build/embedding@workspace:packages/embedding"], @@ -377,6 +395,8 @@ "@memory.build/protocol": ["@memory.build/protocol@workspace:packages/protocol"], + "@memory.build/space": ["@memory.build/space@workspace:packages/space"], + "@memory.build/web": ["@memory.build/web@workspace:packages/web"], "@memory.build/worker": ["@memory.build/worker@workspace:packages/worker"], diff --git a/package.json b/package.json index 29e8f2f..7897c2a 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "scripts": { "build": "./bun run --filter '@memory.build/cli' build", "build:all": "./bun scripts/build-all.ts", - "check": "./bun i --silent && ./bun run typecheck && ./bun run lint --write && ./bun run test --only-failures", + "check": "./bun i --silent && ./bun scripts/bundle-web-assets.ts && ./bun run typecheck && ./bun run lint --write && ./bun run test --only-failures", "clean": "rm -rf packages/cli/dist dist", "docs": "./bun --filter @memory.build/docs-site dev", "docs:build": "./bun --filter @memory.build/docs-site build", @@ -21,6 +21,7 @@ "install:local": "./bun scripts/install-local.ts", "lint": "biome check", "me": "./bun run packages/cli/index.ts", + "migrate:db": "./bun scripts/migrate-db.ts", "pg": "./bun run pg:build && docker run -d --name me-postgres -e POSTGRES_HOST_AUTH_METHOD=trust -p 127.0.0.1:5432:5432 me-postgres", "pg:build": "docker build -t me-postgres -f docker/Dockerfile.postgres docker/", "pg:rm": "docker rm -f me-postgres", diff --git a/packages/core/index.ts b/packages/core/index.ts new file mode 100644 index 0000000..fc637ec --- /dev/null +++ b/packages/core/index.ts @@ -0,0 +1,2 @@ +export { type MigrateCoreOptions, migrateCore } from "./migrate/migrate"; +export { CORE_SCHEMA_VERSION } from "./version"; diff --git a/packages/core/migrate/idempotent/000_update.sql b/packages/core/migrate/idempotent/000_update.sql new file mode 100644 index 0000000..53a4ce6 --- /dev/null +++ b/packages/core/migrate/idempotent/000_update.sql @@ -0,0 +1,35 @@ +-- generic trigger function to update updated_at timestamp +create or replace function core.update_updated_at() +returns trigger +as $func$ +begin + new.updated_at = pg_catalog.now(); + return new; +end; +$func$ language plpgsql volatile security definer +set search_path to core, pg_temp; + +create or replace trigger space_before_update_trg +before update on core.space +for each row +execute function core.update_updated_at(); + +create or replace trigger principal_before_update_trg +before update on core.principal +for each row +execute function core.update_updated_at(); + +create or replace trigger principal_space_before_update_trg +before update on core.principal_space +for each row +execute function core.update_updated_at(); + +create or replace trigger group_member_before_update_trg +before update on core.group_member +for each row +execute function core.update_updated_at(); + +create or replace trigger tree_access_before_update_trg +before update on core.tree_access +for each row +execute function core.update_updated_at(); diff --git a/packages/core/migrate/idempotent/001_principal_space.sql b/packages/core/migrate/idempotent/001_principal_space.sql new file mode 100644 index 0000000..b221c4b --- /dev/null +++ b/packages/core/migrate/idempotent/001_principal_space.sql @@ -0,0 +1,41 @@ +------------------------------------------------------------------------------- +-- is_principal_in_space +------------------------------------------------------------------------------- +create or replace function core.is_principal_in_space +( _principal_id uuid +, _space_id uuid +) +returns bool +as $func$ + select exists + ( + select 1 + from core.principal_space ps + where ps.principal_id = _principal_id + and ps.space_id = _space_id + ) +$func$ language sql stable security invoker +; + +------------------------------------------------------------------------------- +-- is_principal_space_admin +------------------------------------------------------------------------------- +create or replace function core.is_principal_space_admin +( _principal_id uuid +, _space_id uuid +) +returns bool +as $func$ + select coalesce + ( + ( + select ps.admin and (not p.kind = 'a') -- agents cannot be space admins + from core.principal_space ps + inner join core.principal p on (ps.principal_id = p.id) + where ps.principal_id = _principal_id + and ps.space_id = _space_id + ) + , false + ) +$func$ language sql stable security invoker +; diff --git a/packages/core/migrate/idempotent/002_group_member.sql b/packages/core/migrate/idempotent/002_group_member.sql new file mode 100644 index 0000000..f7e32c7 --- /dev/null +++ b/packages/core/migrate/idempotent/002_group_member.sql @@ -0,0 +1,26 @@ + +------------------------------------------------------------------------------- +-- member_groups +------------------------------------------------------------------------------- +create or replace function core.member_groups +( _member_id uuid +, _space_id uuid +) +returns table +( group_id uuid +, admin bool +) +as $func$ + select + gm.group_id + , gm.admin and (not m.kind = 'a') -- agent's cannot be group admins + from core.principal m -- the member + -- assert the member belongs to the space + inner join core.principal_space psm on (m.id = psm.principal_id and psm.space_id = _space_id) + -- find the groups the member belongs to in the space + inner join core.group_member gm on (m.member_id = gm.member_id and gm.space_id = _space_id) + -- assert the group belongs to the space + inner join core.principal_space psg on (gm.group_id = psg.principal_id and psg.space_id = _space_id) + where m.member_id = _member_id -- the member +$func$ language sql stable security invoker +; diff --git a/packages/core/migrate/idempotent/003_tree_access.sql b/packages/core/migrate/idempotent/003_tree_access.sql new file mode 100644 index 0000000..ad804a5 --- /dev/null +++ b/packages/core/migrate/idempotent/003_tree_access.sql @@ -0,0 +1,128 @@ +------------------------------------------------------------------------------- +-- member_tree_access +------------------------------------------------------------------------------- +create or replace function core.member_tree_access +( _member_id uuid +, _space_id uuid +) +returns table +( tree_path ltree +, access int +) +as $func$ + -- member's grants via groups + select + ta.tree_path + , ta.access + from core.member_groups(_member_id, _space_id) mg + inner join core.tree_access ta on (mg.group_id = ta.principal_id and ta.space_id = _space_id) +$func$ language sql stable security invoker +; + +------------------------------------------------------------------------------- +-- user_tree_access +------------------------------------------------------------------------------- +create or replace function core.user_tree_access +( _user_id uuid +, _space_id uuid +) +returns table +( tree_path ltree +, access int +) +as $func$ + -- user's direct grants + select + ta.tree_path + , ta.access + from core.principal u + inner join core.principal_space psu on (u.id = psu.principal_id and psu.space_id = _space_id) + inner join core.tree_access ta on (u.id = ta.principal_id and ta.space_id = _space_id) + where u.user_id = _user_id + union + -- user's access via groups + select + x.tree_path + , x.access + from core.member_tree_access(_user_id, _space_id) x +$func$ language sql stable security invoker +; + +------------------------------------------------------------------------------- +-- agent_tree_access +------------------------------------------------------------------------------- +create or replace function core.agent_tree_access +( _agent_id uuid +, _space_id uuid +) +returns table +( tree_path ltree +, access int +) +as $func$ + with agent_access as materialized + ( + -- agent's direct grants + select + ta.tree_path + , ta.access + from core.principal a + inner join core.principal_space ps on (a.id = ps.principal_id and ps.space_id = _space_id) + inner join core.tree_access ta on (a.id = ta.principal_id and ta.space_id = _space_id) + where a.agent_id = _agent_id + union + -- agent's access via groups + select + x.tree_path + , x.access + from core.member_tree_access(_agent_id, _space_id) x + ) + , owner_access as materialized + ( + -- get the access for the user that owns the agent + select + x.tree_path + , x.access + from + ( + select p.owner_id + from core.principal p + where p.agent_id = _agent_id + ) a + cross join lateral core.user_tree_access(a.owner_id, _space_id) x + ) + select + x.tree_path + , max(x.access) + from + ( + -- take the agent's access when it is covered by the owner's access + select + aa.tree_path + , aa.access + from agent_access aa + where exists + ( + -- the owner must have access that is the same or greater than the agent's + select 1 + from owner_access oa + where oa.tree_path @> aa.tree_path + and oa.access >= aa.access + ) + union + -- when the agent has more access than the owner, take the owner's access + select + oa.tree_path + , oa.access + from owner_access oa + where exists + ( + select 1 + from agent_access aa + where aa.tree_path @> oa.tree_path + and aa.access >= oa.access + ) + ) x + group by x.tree_path +$func$ language sql stable security invoker +; diff --git a/packages/core/migrate/idempotent/sql.d.ts b/packages/core/migrate/idempotent/sql.d.ts new file mode 100644 index 0000000..0e51813 --- /dev/null +++ b/packages/core/migrate/idempotent/sql.d.ts @@ -0,0 +1,4 @@ +declare module "*.sql" { + const sql: string; + export default sql; +} diff --git a/packages/core/migrate/incremental/001_space.sql b/packages/core/migrate/incremental/001_space.sql new file mode 100644 index 0000000..931d2f7 --- /dev/null +++ b/packages/core/migrate/incremental/001_space.sql @@ -0,0 +1,12 @@ +------------------------------------------------------------------------------- +-- space +------------------------------------------------------------------------------- +create table core.space +( id uuid not null primary key default uuidv7() check (uuid_extract_version(id) = 7) +, slug text not null unique check (slug ~ '^[a-z0-9]{12}$') +, name citext not null +, language text not null default 'english' check (language ~ '^[a-z_]+$') +-- we likely need columns for embedding provider, model, dimensions +, created_at timestamptz not null default now() +, updated_at timestamptz +); diff --git a/packages/core/migrate/incremental/002_principal.sql b/packages/core/migrate/incremental/002_principal.sql new file mode 100644 index 0000000..7680911 --- /dev/null +++ b/packages/core/migrate/incremental/002_principal.sql @@ -0,0 +1,35 @@ +------------------------------------------------------------------------------- +-- principal +------------------------------------------------------------------------------- +create table core.principal +( id uuid not null primary key default uuidv7() check (uuid_extract_version(id) = 7) +, user_id uuid unique nulls distinct generated always as (case when kind = 'u' then id else null end) stored +, group_id uuid unique nulls distinct generated always as (case when kind = 'g' then id else null end) stored +, agent_id uuid unique nulls distinct generated always as (case when kind = 'a' then id else null end) stored +, member_id uuid unique nulls distinct generated always as (case when kind in ('u', 'a') then id else null end) stored +, owner_id uuid references core.principal (user_id) on delete cascade -- points to agent's owner +, space_id uuid references core.space (id) on delete cascade +, kind text not null check (kind in ('g', 'u', 'a')) -- group, user, agent +, name citext not null check (name::text !~ '/') -- agent names are displayed as / +, created_at timestamptz not null default now() +, updated_at timestamptz +, check + ( + (kind = 'a' and owner_id is not null) -- agents are owned by a user + or + (kind != 'a' and owner_id is null) -- users and groups have no owner + ) +, check + ( + (kind = 'g' and space_id is not null) -- groups belong to a single space + or + (kind != 'g' and space_id is null) -- users and agents are global + ) +); + +-- users must have a globally unique name +create unique index on core.principal (name) include (user_id) where user_id is not null; +-- each user's agents must have a unique name (per that user) +create unique index on core.principal (owner_id, name) where agent_id is not null; +-- each space's groups must have a unique name (per that space) +create unique index on core.principal (space_id, name) where group_id is not null; diff --git a/packages/core/migrate/incremental/003_principal_space.sql b/packages/core/migrate/incremental/003_principal_space.sql new file mode 100644 index 0000000..a00b996 --- /dev/null +++ b/packages/core/migrate/incremental/003_principal_space.sql @@ -0,0 +1,13 @@ +------------------------------------------------------------------------------- +-- principal_space +------------------------------------------------------------------------------- +create table core.principal_space +( space_id uuid not null references core.space (id) on delete cascade +, principal_id uuid not null references core.principal (id) on delete cascade -- can be users, agents, or groups +, admin bool not null default false +, created_at timestamptz not null default now() +, updated_at timestamptz +, unique (principal_id, space_id) include (admin) +); + +create index on core.principal_space (space_id, principal_id) include (admin); diff --git a/packages/core/migrate/incremental/004_group_member.sql b/packages/core/migrate/incremental/004_group_member.sql new file mode 100644 index 0000000..4821cf6 --- /dev/null +++ b/packages/core/migrate/incremental/004_group_member.sql @@ -0,0 +1,15 @@ +------------------------------------------------------------------------------- +-- group_member +------------------------------------------------------------------------------- +create table core.group_member +( space_id uuid not null references core.space (id) on delete cascade +, group_id uuid not null references core.principal (group_id) on delete cascade -- can only be groups +, member_id uuid not null references core.principal (member_id) on delete cascade -- can be users or agents, but not groups +, admin bool not null default false +, created_at timestamptz not null default now() +, updated_at timestamptz +, unique (space_id, member_id, group_id) include (admin) +); + +-- index for listing groups in a space and/or members of a group +create index on core.group_member (space_id, group_id, member_id) include (admin); diff --git a/packages/core/migrate/incremental/005_tree_access.sql b/packages/core/migrate/incremental/005_tree_access.sql new file mode 100644 index 0000000..2efb1c9 --- /dev/null +++ b/packages/core/migrate/incremental/005_tree_access.sql @@ -0,0 +1,12 @@ +------------------------------------------------------------------------------- +-- tree_access +------------------------------------------------------------------------------- +create table core.tree_access +( space_id uuid not null references core.space (id) on delete cascade +, principal_id uuid not null references core.principal (id) on delete cascade -- can be users, agents, or groups +, tree_path ltree not null +, access int not null check (access in (1, 2, 3)) -- 1 = read, 2 = write, 3 = owner +, created_at timestamptz not null default now() +, updated_at timestamptz +, unique (space_id, principal_id, tree_path) include (access) +); diff --git a/packages/core/migrate/incremental/006_api_key.sql b/packages/core/migrate/incremental/006_api_key.sql new file mode 100644 index 0000000..85b7250 --- /dev/null +++ b/packages/core/migrate/incremental/006_api_key.sql @@ -0,0 +1,13 @@ +------------------------------------------------------------------------------- +-- api_key +------------------------------------------------------------------------------- +create table core.api_key +( id uuid not null primary key default uuidv7() check (uuid_extract_version(id) = 7) +, member_id uuid not null references core.principal (member_id) on delete cascade -- may be users or agents, not groups +, lookup_id text unique not null check (lookup_id ~ '^[A-Za-z0-9_-]{16}$') +, secret text not null -- hashed secret +, name text not null +, created_at timestamptz not null default now() +, expires_at timestamptz +, unique (member_id, name) +); diff --git a/packages/core/migrate/incremental/sql.d.ts b/packages/core/migrate/incremental/sql.d.ts new file mode 100644 index 0000000..0e51813 --- /dev/null +++ b/packages/core/migrate/incremental/sql.d.ts @@ -0,0 +1,4 @@ +declare module "*.sql" { + const sql: string; + export default sql; +} diff --git a/packages/core/migrate/migrate.ts b/packages/core/migrate/migrate.ts new file mode 100644 index 0000000..c9e5fe0 --- /dev/null +++ b/packages/core/migrate/migrate.ts @@ -0,0 +1,522 @@ +import { createHash } from "node:crypto"; +import { info, reportError, span } from "@pydantic/logfire-node"; +import { SQL, semver } from "bun"; +import { CORE_SCHEMA_VERSION } from "../version"; +import incremental001 from "./incremental/001_space.sql" with { type: "text" }; +import incremental002 from "./incremental/002_principal.sql" with { + type: "text", +}; +import incremental003 from "./incremental/003_principal_space.sql" with { + type: "text", +}; +import incremental004 from "./incremental/004_group_member.sql" with { + type: "text", +}; +import incremental005 from "./incremental/005_tree_access.sql" with { + type: "text", +}; +import incremental006 from "./incremental/006_api_key.sql" with { + type: "text", +}; +import provisionSql from "./provision.sql" with { type: "text" }; + +interface Incremental { + name: string; + file: string; + sql: string; +} + +const incrementals: Incremental[] = [ + { name: "001_space", file: "incremental/001_space.sql", sql: incremental001 }, + { + name: "002_principal", + file: "incremental/002_principal.sql", + sql: incremental002, + }, + { + name: "003_principal_space", + file: "incremental/003_principal_space.sql", + sql: incremental003, + }, + { + name: "004_group_member", + file: "incremental/004_group_member.sql", + sql: incremental004, + }, + { + name: "005_tree_access", + file: "incremental/005_tree_access.sql", + sql: incremental005, + }, + { + name: "006_api_key", + file: "incremental/006_api_key.sql", + sql: incremental006, + }, +]; + +import idempotent000 from "./idempotent/000_update.sql" with { type: "text" }; +import idempotent001 from "./idempotent/001_principal_space.sql" with { + type: "text", +}; +import idempotent002 from "./idempotent/002_group_member.sql" with { + type: "text", +}; +import idempotent003 from "./idempotent/003_tree_access.sql" with { + type: "text", +}; + +interface Idempotent { + name: string; + file: string; + sql: string; +} + +const idempotents: Idempotent[] = [ + { name: "000_update", file: "idempotent/000_update.sql", sql: idempotent000 }, + { + name: "001_principal_space", + file: "idempotent/001_principal_space.sql", + sql: idempotent001, + }, + { + name: "002_group_member", + file: "idempotent/002_group_member.sql", + sql: idempotent002, + }, + { + name: "003_tree_access", + file: "idempotent/003_tree_access.sql", + sql: idempotent003, + }, +]; + +const CORE_SCHEMA = "core"; +const REQUIRED_EXTENSIONS = [ + { name: "citext", minVersion: "1.6" }, + { name: "ltree", minVersion: "1.3" }, + { name: "vector", minVersion: "0.8.2" }, + { name: "pg_textsearch", minVersion: "1.1.0" }, +] as const; + +export interface MigrateCoreOptions { + logSqlFiles?: boolean; + statementTimeout?: string; + lockTimeout?: string; + transactionTimeout?: string; + idleInTransactionSessionTimeout?: string; +} + +interface NormalizedMigrateCoreOptions { + logSqlFiles: boolean; + schemaVersion: string; + statementTimeout: string; + lockTimeout: string; + transactionTimeout: string; + idleInTransactionSessionTimeout: string; +} + +export async function migrateCore( + sql: SQL, + options: MigrateCoreOptions = {}, +): Promise { + const opts = normalizeMigrateCoreOptions(options); + const attributes = migrateAttributes(opts); + + await span("core.migrate", { + attributes, + callback: async () => { + try { + if (!semver.satisfies(opts.schemaVersion, "*")) { + throw new Error(`Invalid schema version: "${opts.schemaVersion}"`); + } + const [key1, key2] = advisoryLockKey("memory-core:schema:core"); + + await sql.begin(async (tx) => { + await tx`select set_config('statement_timeout', ${opts.statementTimeout}, true)`; + await tx`select set_config('lock_timeout', ${opts.lockTimeout}, true)`; + await tx`select set_config('transaction_timeout', ${opts.transactionTimeout}, true)`; + await tx`select set_config('idle_in_transaction_session_timeout', ${opts.idleInTransactionSessionTimeout}, true)`; + const acquired = await span("core.migrate.acquire_lock", { + attributes, + callback: () => acquireAdvisoryLock(tx, key1, key2), + }); + if (!acquired) { + throw new Error("Unable to acquire lock for core migrations."); + } + + await ensurePostgresVersion(tx); + for (const extension of REQUIRED_EXTENSIONS) { + await span("core.migrate.ensure_extension", { + attributes: { + "db.extension": extension.name, + "db.extension_min_version": extension.minVersion, + }, + callback: () => + ensureExtension(tx, extension.name, extension.minVersion), + }); + } + + if (!(await doesCoreExist(tx))) { + await span("core.migrate.provision", { + attributes: { + ...attributes, + "core.migration_file": "provision.sql", + "core.migration_type": "provision", + }, + callback: () => provisionCore(tx, opts), + }); + info("Core schema provisioned", attributes); + } + await span("core.migrate.run", { + attributes, + callback: () => runMigrations(tx, opts), + }); + }); + info("Core migrations completed", attributes); + } catch (error) { + reportError("Core migration failed", error as Error, attributes); + throw error; + } + }, + }); +} + +function migrateAttributes( + options: NormalizedMigrateCoreOptions, +): Record { + return { + "db.schema": CORE_SCHEMA, + "core.schema_version": options.schemaVersion, + "core.required_extensions": REQUIRED_EXTENSIONS.map( + (extension) => `${extension.name}@>=${extension.minVersion}`, + ), + "db.statement_timeout": options.statementTimeout, + "db.lock_timeout": options.lockTimeout, + "db.transaction_timeout": options.transactionTimeout, + "db.idle_in_transaction_session_timeout": + options.idleInTransactionSessionTimeout, + }; +} + +function normalizeMigrateCoreOptions( + options: MigrateCoreOptions, +): NormalizedMigrateCoreOptions { + return { + logSqlFiles: options.logSqlFiles ?? false, + schemaVersion: CORE_SCHEMA_VERSION, + statementTimeout: options.statementTimeout ?? "20s", + lockTimeout: options.lockTimeout ?? "5s", + transactionTimeout: options.transactionTimeout ?? "1min", + idleInTransactionSessionTimeout: + options.idleInTransactionSessionTimeout ?? "5s", + }; +} + +function advisoryLockKey(schema: string): [number, number] { + const digest = createHash("sha256").update(schema).digest(); + return [digest.readInt32BE(0), digest.readInt32BE(4)]; +} + +const MAX_LOCK_RETRIES = 5; +const BASE_DELAY_MS = 100; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function acquireAdvisoryLock( + tx: SQL, + key1: number, + key2: number, +): Promise { + let acquired = false; + for (let attempt = 0; attempt < MAX_LOCK_RETRIES; attempt++) { + const [result] = await tx` + select pg_try_advisory_xact_lock(${key1}, ${key2}) as acquired + `; + if (result.acquired) { + acquired = true; + break; + } + if (attempt < MAX_LOCK_RETRIES - 1) { + await sleep(BASE_DELAY_MS * 2 ** attempt); + } + } + return acquired; +} + +async function doesCoreExist(tx: SQL): Promise { + const [{ coreExists }] = await tx` + select exists + ( + select 1 + from pg_namespace n + where n.nspname = ${CORE_SCHEMA} + ) as "coreExists" + `; + return coreExists; +} + +async function provisionCore( + tx: SQL, + options: NormalizedMigrateCoreOptions, +): Promise { + await executeSqlFile(tx, options, "provision", "provision.sql", provisionSql); +} + +async function ensurePostgresVersion(tx: SQL): Promise { + const [{ server_version_num }] = await tx` + select current_setting('server_version_num')::int as server_version_num + `; + if (server_version_num < 180000) { + throw new Error( + `PostgreSQL version 18 or higher is required (found ${server_version_num})`, + ); + } +} + +async function ensureExtension( + tx: SQL, + name: string, + minVersion: string, +): Promise { + const [installed] = await tx` + select x.extversion, n.nspname + from pg_extension x + inner join pg_namespace n on (x.extnamespace = n.oid) + where x.extname = ${name} + `; + + if (installed) { + if ( + installed.nspname === "public" && + semver.order(installed.extversion, minVersion) >= 0 + ) { + return; + } + throw new Error( + `Extension "${name}" version ${minVersion} or higher is required in the "public" schema (found ${installed.extversion} installed in "${installed.nspname}")`, + ); + } + + const [available] = await tx` + select default_version + from pg_available_extensions + where name = ${name} + `; + + if (!available || semver.order(available.default_version, minVersion) < 0) { + const found = available + ? `found ${available.default_version} available` + : "not available"; + throw new Error( + `Extension "${name}" version ${minVersion} or higher is required (${found})`, + ); + } + + await tx`create extension if not exists ${tx(name)} with schema public`; +} + +async function runMigrations( + tx: SQL, + options: NormalizedMigrateCoreOptions, +): Promise { + await assertSchemaOwnership(tx); + + const [{ version: dbVersion }] = await tx` + select version from core.version + `; + const cmp = semver.order(options.schemaVersion, dbVersion); + if (cmp < 0) { + throw new Error( + `Application version (${options.schemaVersion}) is older than database version (${dbVersion}). ` + + "Please upgrade the application.", + ); + } + /* run migrations regardless + if (cmp === 0) { + info("Core migration skipped, version current", { + "db.schema": CORE_SCHEMA, + "core.version": dbVersion, + "core.schema_version": options.schemaVersion, + }); + return; + } + */ + + const sorted1 = [...incrementals].sort((a, b) => + a.name.localeCompare(b.name), + ); + + for (const migration of sorted1) { + const [{ existing }] = await tx` + select exists + ( + select 1 + from core.migration + where name = ${migration.name} + ) as existing + `; + + if (existing) { + continue; + } + + await span("core.migrate.incremental", { + attributes: { + "db.schema": CORE_SCHEMA, + "core.migration": migration.name, + "core.migration_file": migration.file, + "core.migration_type": "incremental", + "core.schema_version": options.schemaVersion, + }, + callback: async () => { + await executeSqlFile( + tx, + options, + "incremental", + migration.file, + migration.sql, + ); + await tx` + insert into core.migration (name, applied_at_version) + values (${migration.name}, ${options.schemaVersion})`; + }, + }); + info("Core migration applied", { + "db.schema": CORE_SCHEMA, + "core.migration": migration.name, + "core.migration_file": migration.file, + "core.migration_type": "incremental", + "core.schema_version": options.schemaVersion, + }); + } + + const sorted2 = [...idempotents].sort((a, b) => a.name.localeCompare(b.name)); + + for (const migration of sorted2) { + await span("core.migrate.idempotent", { + attributes: { + "db.schema": CORE_SCHEMA, + "core.migration": migration.name, + "core.migration_file": migration.file, + "core.migration_type": "idempotent", + "core.schema_version": options.schemaVersion, + }, + callback: () => + executeSqlFile( + tx, + options, + "idempotent", + migration.file, + migration.sql, + ), + }); + } + + await tx`update core.version set version = ${options.schemaVersion}, at = now()`; +} + +async function executeSqlFile( + tx: SQL, + options: NormalizedMigrateCoreOptions, + type: string, + file: string, + sqlText: string, +): Promise { + logSqlFile(options, type, file); + try { + await tx.unsafe(sqlText); + } catch (error) { + logSqlExecutionError(options, type, file, sqlText, error); + throw error; + } +} + +function logSqlFile( + options: NormalizedMigrateCoreOptions, + type: string, + file: string, +): void { + if (!options.logSqlFiles) return; + console.error(`[migrate:db] core ${type} packages/core/migrate/${file}`); +} + +function logSqlExecutionError( + options: NormalizedMigrateCoreOptions, + type: string, + file: string, + sqlText: string, + error: unknown, +): void { + if (!options.logSqlFiles) return; + console.error( + `[migrate:db] failed core ${type} packages/core/migrate/${file}`, + ); + logPostgresSqlLocation(sqlText, error); +} + +function logPostgresSqlLocation(sqlText: string, error: unknown): void { + if (!(error instanceof SQL.PostgresError)) return; + const position = Number(error.position); + if (!Number.isSafeInteger(position) || position < 1) return; + + const location = sqlLocation(sqlText, position); + if (!location) return; + console.error( + `[migrate:db] sql position ${position} -> line ${location.line}, column ${location.column}`, + ); + console.error(sqlContext(sqlText, location.line, location.column)); +} + +function sqlLocation( + sqlText: string, + position: number, +): { line: number; column: number } | undefined { + if (position > sqlText.length + 1) return undefined; + let line = 1; + let column = 1; + for (let i = 0; i < position - 1; i++) { + if (sqlText.charCodeAt(i) === 10) { + line++; + column = 1; + } else { + column++; + } + } + return { line, column }; +} + +function sqlContext(sqlText: string, line: number, column: number): string { + const lines = sqlText.split("\n"); + const start = Math.max(1, line - 2); + const end = Math.min(lines.length, line + 2); + const width = String(end).length; + const output = ["[migrate:db] sql context:"]; + + for (let n = start; n <= end; n++) { + const marker = n === line ? ">" : " "; + output.push(`${marker} ${String(n).padStart(width)} | ${lines[n - 1]}`); + if (n === line) { + output.push(` ${" ".repeat(width)} | ${" ".repeat(column - 1)}^`); + } + } + + return output.join("\n"); +} + +async function assertSchemaOwnership(tx: SQL): Promise { + const [result] = await tx` + select + n.nspowner = (select pg_catalog.to_regrole(current_user)::oid) as is_owner + from pg_catalog.pg_namespace n + where n.nspname = ${CORE_SCHEMA} + `; + + if (!result?.is_owner) { + throw new Error( + "Only the owner of the core schema can run database migrations", + ); + } +} diff --git a/packages/core/migrate/provision.sql b/packages/core/migrate/provision.sql new file mode 100644 index 0000000..479b55c --- /dev/null +++ b/packages/core/migrate/provision.sql @@ -0,0 +1,15 @@ +create schema core; + +create table core.version +( version text not null +, at timestamptz not null default now() +); + +create unique index version_singleton_idx on core.version ((true)); -- only ONE row allowed +insert into core.version (version) values ('0.0.0'); + +create table core.migration +( name text not null constraint migration_pkey primary key +, applied_at_version text not null +, applied_at timestamptz not null default pg_catalog.clock_timestamp() +); diff --git a/packages/core/migrate/sql.d.ts b/packages/core/migrate/sql.d.ts new file mode 100644 index 0000000..0e51813 --- /dev/null +++ b/packages/core/migrate/sql.d.ts @@ -0,0 +1,4 @@ +declare module "*.sql" { + const sql: string; + export default sql; +} diff --git a/packages/core/package.json b/packages/core/package.json new file mode 100644 index 0000000..a38ca89 --- /dev/null +++ b/packages/core/package.json @@ -0,0 +1,9 @@ +{ + "name": "@memory.build/core", + "version": "0.2.5", + "private": true, + "type": "module", + "dependencies": { + "@pydantic/logfire-node": "^0.13.1" + } +} diff --git a/packages/core/tsconfig.json b/packages/core/tsconfig.json new file mode 100644 index 0000000..23b1d27 --- /dev/null +++ b/packages/core/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../../tsconfig.json", + "include": ["**/*.ts", "**/*.d.ts"] +} diff --git a/packages/core/version.ts b/packages/core/version.ts new file mode 100644 index 0000000..9fd61d7 --- /dev/null +++ b/packages/core/version.ts @@ -0,0 +1 @@ +export const CORE_SCHEMA_VERSION = "0.0.1"; diff --git a/packages/space/index.ts b/packages/space/index.ts new file mode 100644 index 0000000..2ca99e8 --- /dev/null +++ b/packages/space/index.ts @@ -0,0 +1,9 @@ +export { bootstrapSpaceDatabase } from "./migrate/bootstrap"; +export { type MigrateSpaceOptions, migrateSpace } from "./migrate/migrate"; +export { + isValidSlug, + isValidSpaceSchema, + schemaToSlug, + slugToSchema, +} from "./slug"; +export { SPACE_SCHEMA_VERSION } from "./version"; diff --git a/packages/space/migrate/bootstrap.ts b/packages/space/migrate/bootstrap.ts new file mode 100644 index 0000000..07d67ec --- /dev/null +++ b/packages/space/migrate/bootstrap.ts @@ -0,0 +1,151 @@ +import { info, reportError, span } from "@pydantic/logfire-node"; +import { type SQL, semver } from "bun"; + +const REQUIRED_EXTENSIONS = [ + { name: "citext", minVersion: "1.6" }, + { name: "ltree", minVersion: "1.3" }, + { name: "vector", minVersion: "0.8.2" }, + { name: "pg_textsearch", minVersion: "1.1.0" }, +] as const; + +/** + * Prepare a physical database to host space schemas. + * + * This does not create or migrate an individual space. Spaces are still created + * on demand by migrateSpace(), which provisions a specific me_ schema. + */ +export async function bootstrapSpaceDatabase( + sql: SQL, + statementTimeout: string = "20s", + lockTimeout: string = "5s", + transactionTimeout: string = "30s", + idleInTransactionSessionTimeout: string = "30s", + shardId?: number, +): Promise { + const attributes = { + "db.shard": shardId, + "db.statement_timeout": statementTimeout, + "db.lock_timeout": lockTimeout, + "db.transaction_timeout": transactionTimeout, + "db.idle_in_transaction_session_timeout": idleInTransactionSessionTimeout, + "space.required_extensions": REQUIRED_EXTENSIONS.map( + (extension) => `${extension.name}@>=${extension.minVersion}`, + ), + }; + + await span("space.bootstrap", { + attributes, + callback: async () => { + try { + await sql.begin(async (tx) => { + if (shardId !== undefined) { + await tx.unsafe(`set local pgdog.shard to ${String(shardId)}`); + } + await ensurePostgresVersion(tx); + await span("space.bootstrap.acquire_lock", { + callback: () => acquireAdvisoryLock(tx), + }); + await tx`select set_config('statement_timeout', ${statementTimeout}, true)`; + await tx`select set_config('lock_timeout', ${lockTimeout}, true)`; + await tx`select set_config('transaction_timeout', ${transactionTimeout}, true)`; + await tx`select set_config('idle_in_transaction_session_timeout', ${idleInTransactionSessionTimeout}, true)`; + for (const extension of REQUIRED_EXTENSIONS) { + await span("space.bootstrap.ensure_extension", { + attributes: { + "db.extension": extension.name, + "db.extension_min_version": extension.minVersion, + }, + callback: () => + ensureExtension(tx, extension.name, extension.minVersion), + }); + } + }); + info("Space bootstrap completed", attributes); + } catch (error) { + reportError("Space bootstrap failed", error as Error, attributes); + throw error; + } + }, + }); +} + +const MAX_LOCK_RETRIES = 5; +const BASE_DELAY_MS = 100; +const BOOTSTRAP_LOCK_ID = 1982010637711; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function acquireAdvisoryLock(tx: SQL): Promise { + let acquired = false; + for (let attempt = 0; attempt < MAX_LOCK_RETRIES; attempt++) { + const [result] = await tx` + select pg_try_advisory_xact_lock(${BOOTSTRAP_LOCK_ID}) as acquired + `; + if (result.acquired) { + acquired = true; + break; + } + if (attempt < MAX_LOCK_RETRIES - 1) { + await sleep(BASE_DELAY_MS * 2 ** attempt); + } + } + + if (!acquired) { + throw new Error(`Failed to acquire advisory lock`); + } +} + +async function ensurePostgresVersion(tx: SQL): Promise { + const [{ server_version_num }] = await tx` + select current_setting('server_version_num')::int as server_version_num + `; + if (server_version_num < 180000) { + throw new Error( + `PostgreSQL version 18 or higher is required (found ${server_version_num})`, + ); + } +} + +async function ensureExtension( + tx: SQL, + name: string, + minVersion: string, +): Promise { + const [installed] = await tx` + select x.extversion, n.nspname + from pg_extension x + inner join pg_namespace n on (x.extnamespace = n.oid) + where x.extname = ${name} + `; + + if (installed) { + if ( + installed.nspname === "public" && + semver.order(installed.extversion, minVersion) >= 0 + ) { + return; + } + throw new Error( + `Extension "${name}" version ${minVersion} or higher is required in the "public" schema (found ${installed.extversion} installed in "${installed.nspname}")`, + ); + } + + const [available] = await tx` + select default_version + from pg_available_extensions + where name = ${name} + `; + + if (!available || semver.order(available.default_version, minVersion) < 0) { + const found = available + ? `found ${available.default_version} available` + : "not available"; + throw new Error( + `Extension "${name}" version ${minVersion} or higher is required (${found})`, + ); + } + + await tx`create extension if not exists ${tx(name)} with schema public`; +} diff --git a/packages/space/migrate/idempotent/001_memory.sql b/packages/space/migrate/idempotent/001_memory.sql new file mode 100644 index 0000000..c4048d8 --- /dev/null +++ b/packages/space/migrate/idempotent/001_memory.sql @@ -0,0 +1,617 @@ +------------------------------------------------------------------------------- +-- memory triggers +------------------------------------------------------------------------------- +create or replace function {{schema}}.memory_before_update() +returns trigger +as $func$ +begin + -- always update the timestamp + new.updated_at = pg_catalog.now(); + + -- content changed -> new embedding needs to be generated + if old.content is distinct from new.content + and old.embedding is not distinct from new.embedding + then + new.embedding = null; + new.embedding_version = old.embedding_version operator(pg_catalog.+) 1; + end if; + + return new; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp -- public required for pgvector's `is not distinct from` +; + +create or replace trigger memory_before_update_trg +before update on {{schema}}.memory +for each row +execute function {{schema}}.memory_before_update(); + +------------------------------------------------------------------------------- +-- tree_access +------------------------------------------------------------------------------- +create or replace function {{schema}}.tree_access(_tree_access jsonb) +returns table +( tree_path ltree +, access int +) +as $func$ + select + x.tree_path + , x.access + from jsonb_to_recordset(_tree_access) x(tree_path ltree, access int) +$func$ language sql immutable strict security invoker +; + +------------------------------------------------------------------------------- +-- has_tree_access +------------------------------------------------------------------------------- +create or replace function {{schema}}.has_tree_access +( _tree_access jsonb +, _tree_path ltree +, _access int +) +returns bool +as $func$ + select exists + ( + select 1 + from {{schema}}.tree_access(_tree_access) x + where x.tree_path @> _tree_path + and x.access >= _access + ) +$func$ language sql immutable strict security invoker +; + +------------------------------------------------------------------------------- +-- get memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.get_memory +( _tree_access jsonb +, _id uuid default null +) +returns table +( id uuid +, tree ltree +, meta jsonb +, temporal tstzrange +, content text +, created_at timestamptz +, updated_at timestamptz +, has_embedding bool +) +as $func$ + select + m.id + , m.tree + , m.meta + , m.temporal + , m.content + , m.created_at + , m.updated_at + , m.embedding is not null + from {{schema}}.memory m + where m.id = _id + and {{schema}}.has_tree_access(_tree_access, m.tree, 1) +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- create memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.create_memory +( _tree_access jsonb +, _tree ltree +, _content text +, _id uuid default null +, _meta jsonb default '{}' +, _temporal tstzrange default null +) +returns uuid +as $func$ +begin + if not {{schema}}.has_tree_access(_tree_access, _tree, 2) then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + insert into {{schema}}.memory + ( id + , tree + , meta + , temporal + , content + ) + values + ( coalesce(_id, uuidv7()) + , _tree + , coalesce(_meta, '{}'::jsonb) + , _temporal + , _content + ) + returning id into strict _id + ; + return _id; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- patch memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.patch_memory +( _tree_access jsonb +, _id uuid +, _patch jsonb +) +returns bool +as $func$ +declare + _src ltree; + _dst ltree; + _ok bool; +begin + -- at least one valid field must be present + select count(*) filter (where k in ('meta', 'tree', 'temporal', 'content')) > 0 + into strict _ok + from jsonb_each(_patch) o(k, v) + ; + + if not _ok then + raise exception 'no valid patch fields found' + using errcode = 'invalid_parameter_value'; + end if; + + _dst = (_patch->>'tree')::ltree; + + -- cannot set tree to null + if _patch ? 'tree' and _dst is null then + raise exception 'tree cannot be set to null' + using errcode = 'invalid_parameter_value'; + end if; + + -- find the existing memory and get it's tree + select m.tree into _src + from {{schema}}.memory m + where m.id = _id + for update -- don't let anyone "move" the memory while we're working on it + ; + + if not found then + return false; + end if; + + with a as materialized + ( + select a.tree_path, a.access + from {{schema}}.tree_access(_tree_access) a + ) + select + exists + ( + select 1 + from a + where a.tree_path @> _src + and a.access >= 2 + ) + and + ( + _dst is null + or _src @> _dst + or exists + ( + select 1 + from a + where a.tree_path @> _dst + and a.access >= 2 + ) + ) + into strict _ok + ; + + if not _ok then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + update {{schema}}.memory m set + tree = case when _patch ? 'tree' then (_patch->>'tree')::ltree else m.tree end + , meta = case when _patch ? 'meta' then _patch->'meta' else m.meta end + , temporal = case when _patch ? 'temporal' then (_patch->>'temporal')::tstzrange else m.temporal end + , content = case when _patch ? 'content' then _patch->>'content' else m.content end + where id = _id + returning id into _id + ; + + return _id is not null; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- move tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.move_tree +( _tree_access jsonb +, _src ltree +, _dst ltree +, _dry_run bool default false +) +returns bigint +as $func$ +declare + _has_src bool; + _has_dst bool; + _moved bigint; +begin + -- must have read/write on _src + -- must have read/write on _dst + with a as materialized + ( + select a.tree_path, a.access + from {{schema}}.tree_access(_tree_access) a + ) + select + exists + ( + select 1 + from a + where a.tree_path @> _src + and a.access >= 2 + ) + , exists + ( + select 1 + from a + where a.tree_path @> _dst + and a.access >= 2 + ) + into strict _has_src, _has_dst + ; + + if not _has_src then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + if not _has_dst then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + with x as + ( + select m.id + from {{schema}}.memory m + where _src @> m.tree + ) + , u as + ( + update {{schema}}.memory m + set tree = + case + when nlevel(m.tree) = nlevel(_src) then _dst + else _dst || subpath(m.tree, nlevel(_src), nlevel(m.tree) - nlevel(_src)) + end + from x + where m.id = x.id + and not _dry_run + ) + select count(*) into strict _moved + from x + ; + return _moved; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- copy tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.copy_tree +( _tree_access jsonb +, _src ltree +, _dst ltree +, _dry_run bool default false +) +returns bigint +as $func$ +declare + _has_src bool; + _has_dst bool; + _copied bigint; +begin + -- must have read on _src + -- must have read/write on _dst + with a as materialized + ( + select a.tree_path, a.access + from {{schema}}.tree_access(_tree_access) a + ) + select + exists + ( + select 1 + from a + where a.tree_path @> _src + and a.access >= 1 + ) + , exists + ( + select 1 + from a + where a.tree_path @> _dst + and a.access >= 2 + ) + into strict _has_src, _has_dst + ; + + if not _has_src then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + if not _has_dst then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + with m as + ( + select m.* + from {{schema}}.memory m + where _src @> m.tree + ) + , i as + ( + insert into {{schema}}.memory + ( meta + , tree + , temporal + , content + , embedding + , embedding_version + ) + select + m.meta + , case + when nlevel(m.tree) = nlevel(_src) then _dst + else _dst || subpath(m.tree, nlevel(_src), nlevel(m.tree) - nlevel(_src)) + end as dst + , m.temporal + , m.content + , m.embedding + , m.embedding_version + from m + where not _dry_run + ) + select count(*) into strict _copied + from m + ; + + return _copied; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- delete memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.delete_memory +( _tree_access jsonb +, _id uuid +) +returns bool +as $func$ +declare + _tree ltree; +begin + select m.tree into _tree + from {{schema}}.memory m + where m.id = _id + for update + ; + + if not found then + return false; + end if; + + if not {{schema}}.has_tree_access(_tree_access, _tree, 2) then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + delete from {{schema}}.memory + where id = _id + ; + return found; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- delete tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.delete_tree +( _tree_access jsonb +, _tree ltree +, _dry_run bool default false +) +returns bigint +as $func$ +declare + _has_access bool; + _deleted bigint; +begin + -- must have read/write on _tree + select exists + ( + select 1 + from {{schema}}.tree_access(_tree_access) a + where a.tree_path @> _tree + and a.access >= 2 + ) + into strict _has_access + ; + + if not _has_access then + raise exception 'insufficient tree access' + using errcode = 'insufficient_privilege'; + end if; + + if _dry_run then + select count(*) into strict _deleted + from {{schema}}.memory m + where _tree @> m.tree + ; + else + with d as + ( + delete from {{schema}}.memory m + where _tree @> m.tree + returning id + ) + select count(*) into strict _deleted + from d + ; + end if; + + return _deleted; +end; +$func$ language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- count tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.count_tree +( _tree_access jsonb +, _tree ltree +, _access int4 +) +returns bigint +as $func$ + with x as materialized + ( + select a.tree_path + from {{schema}}.tree_access(_tree_access) a + where a.access >= _access + ) + select count(*) + from {{schema}}.memory m + where _tree @> m.tree + and exists + ( + select 1 + from x + where x.tree_path @> m.tree + ) +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- count tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.count_tree +( _tree_access jsonb +, _query lquery +, _access int4 +) +returns bigint +as $func$ + with x as materialized + ( + select a.tree_path + from {{schema}}.tree_access(_tree_access) a + where a.access >= _access + ) + select count(*) + from {{schema}}.memory m + where m.tree ~ _query + and exists + ( + select 1 + from x + where x.tree_path @> m.tree + ) +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- count tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.count_tree +( _tree_access jsonb +, _query ltxtquery +, _access int4 +) +returns bigint +as $func$ + with x as materialized + ( + select a.tree_path + from {{schema}}.tree_access(_tree_access) a + where a.access >= _access + ) + select count(*) + from {{schema}}.memory m + where m.tree @ _query + and exists + ( + select 1 + from x + where x.tree_path @> m.tree + ) +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- list tree +------------------------------------------------------------------------------- +create or replace function {{schema}}.list_tree +( _tree_access jsonb +, _query lquery +) +returns table +( tree ltree +, count bigint +) +as $func$ + with a as materialized + ( + select a.tree_path + from {{schema}}.tree_access(_tree_access) a + where a.access >= 1 + ) + , m as + ( + select distinct m.id, m.tree + from {{schema}}.memory m + where m.tree ~ _query + and exists + ( + select 1 + from a + where a.tree_path @> m.tree + ) + ) + select + subltree(m.tree, 0, i) as tree + , count(m.id) as count + from m + cross join lateral generate_series(1, nlevel(m.tree)) i + group by 1 + order by 1 +$func$ language sql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; diff --git a/packages/space/migrate/idempotent/002_search.sql b/packages/space/migrate/idempotent/002_search.sql new file mode 100644 index 0000000..6da0335 --- /dev/null +++ b/packages/space/migrate/idempotent/002_search.sql @@ -0,0 +1,332 @@ +------------------------------------------------------------------------------- +-- search_memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.search_memory +( _tree_access jsonb +, _bm25 bm25query default null +, _vec halfvec({{embedding_dimensions}}) default null +, _max_vec_dist float8 default null +, _ltree ltree default null +, _lquery lquery default null +, _ltxtquery ltxtquery default null +, _meta_contains jsonb default null +, _temporal_within tstzrange default null +, _temporal_overlaps tstzrange default null +, _temporal_before timestamptz default null +, _temporal_after timestamptz default null +, _regexp text default null +, _limit bigint default 10 +) +returns table +( id uuid +, meta jsonb +, tree ltree +, temporal tstzrange +, content text +, has_embedding bool +, created_at timestamptz +, updated_at timestamptz +, score float8 +) +as $func$ +declare + _filter_count int = 0; + _score text; + _filters text[] = '{}'::text; + _order_by text; + _sql text; +begin + -- _bm25 OR _vec but NOT BOTH + if _bm25 is not null and _vec is not null then + raise exception 'providing both _bm25 and _vec is not supported' + using errcode = 'invalid_parameter_value'; + end if; + + if _max_vec_dist is not null and _vec is null then + raise exception '_max_vec_dist provided but _vec was not provided' + using errcode = 'invalid_parameter_value'; + end if; + + -- min 1, max 1000, default 10 + _limit = greatest(least(coalesce(_limit, 10), 1000), 1); + + -- bm25 or semantic + -- score and order by + case + when _bm25 is not null then + _filter_count = _filter_count + 1; + -- <@> is negative bm25 score. smaller values means better match. order by this for index scans + -- negative score * -1 = score. higher score means better match + _score = format($sql$, (m.content <@> %L::bm25query) * -1 as score$sql$, _bm25); + _order_by = format($sql$order by m.content <@> %L::bm25query, m.id$sql$, _bm25); + when _vec is not null then + _filter_count = _filter_count + 1; + -- <=> is cosine distance. smaller distance means better match. order by this for index scans + -- distance * -1 = "score". higher score means better match + _score = format($sql$, (m.embedding <=> %L::halfvec({{embedding_dimensions}})) * -1 as score$sql$, _vec); + _order_by = format($sql$order by m.embedding <=> %L::halfvec({{embedding_dimensions}}), m.id$sql$, _vec); + _filters = array_append + ( _filters + , $sql$and m.embedding is not null$sql$ + ); + if _max_vec_dist is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and (m.embedding <=> %L::halfvec({{embedding_dimensions}})) <= %L::float8$sql$, _vec, _max_vec_dist) + ); + end if; + else + _score = $sql$, -1 as score$sql$; + _order_by = $sql$order by m.id$sql$; + end case; + + -- ltree + if _ltree is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and %L::ltree @> m.tree$sql$, _ltree) + ); + end if; + + -- lquery + if _lquery is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and m.tree ~ %L::lquery$sql$, _lquery) + ); + end if; + + -- ltxtquery + if _ltxtquery is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and m.tree @ %L::ltxtquery$sql$, _ltxtquery) + ); + end if; + + -- meta_contains + if _meta_contains is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and m.meta @> %L::jsonb$sql$, _meta_contains) + ); + end if; + + -- temporal_within + if _temporal_within is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and %L::tstzrange @> m.temporal$sql$, _temporal_within) + ); + end if; + + -- temporal_overlaps + if _temporal_overlaps is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and %L::tstzrange && m.temporal$sql$, _temporal_overlaps) + ); + end if; + + -- temporal_before + if _temporal_before is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and tstzrange('-infinity'::timestamptz, %L::timestamptz, '[]') @> m.temporal$sql$, _temporal_before) + ); + end if; + + -- temporal_after + if _temporal_after is not null then + _filter_count = _filter_count + 1; + _filters = array_append + ( _filters + , format($sql$and tstzrange(%L::timestamptz, 'infinity'::timestamptz, '[]') @> m.temporal$sql$, _temporal_after) + ); + end if; + + -- regexp + if _regexp is not null then + if _filter_count = 0 then + raise exception 'regexp must not be the only filter criteria' + using errcode = 'invalid_parameter_value'; + end if; + _filters = array_append + ( _filters + , format($sql$and m.content ~* %L::text$sql$, _regexp) + ); + end if; + + -- construct the query + _sql = format( + $sql$ + with x as materialized + ( + select x.tree_path + from jsonb_to_recordset($1) x(tree_path ltree, access int) + where x.access >= 1 + ) + select + m.id + , m.meta + , m.tree + , m.temporal + , m.content + , m.embedding is not null + , m.created_at + , m.updated_at + %s + from {{schema}}.memory m + where exists + ( + select 1 + from x + where x.tree_path @> m.tree + ) + %s + %s + limit $2 + $sql$ + , _score + , coalesce + ( + ( + select string_agg(x, E'\n ') + from unnest(_filters) x + ) + , '' + ) + , _order_by + ); + + return query execute _sql using _tree_access, _limit; +end; +$func$ language plpgsql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; + +------------------------------------------------------------------------------- +-- hybrid_search_memory +------------------------------------------------------------------------------- +create or replace function {{schema}}.hybrid_search_memory +( _tree_access jsonb +, _bm25 bm25query +, _vec halfvec({{embedding_dimensions}}) +, _max_vec_dist float8 default null +, _ltree ltree default null +, _lquery lquery default null +, _ltxtquery ltxtquery default null +, _meta_contains jsonb default null +, _temporal_within tstzrange default null +, _temporal_overlaps tstzrange default null +, _temporal_before timestamptz default null +, _temporal_after timestamptz default null +, _regexp text default null +, _k float8 default 60.0 +, _candidate_limit bigint default 30 +, _fulltext_weight float8 default 1.0 +, _semantic_weight float8 default 1.0 +, _limit bigint default 10 +) +returns table +( id uuid +, meta jsonb +, tree ltree +, temporal tstzrange +, content text +, has_embedding bool +, created_at timestamptz +, updated_at timestamptz +, score float8 +) +as $func$ +declare +begin + if _bm25 is null then + raise exception '_bm25 must not be null' + using errcode = 'invalid_parameter_value'; + end if; + + if _vec is null then + raise exception '_vec must not be null' + using errcode = 'invalid_parameter_value'; + end if; + + _k = greatest(coalesce(_k, 60.0), 0.0); + _limit = greatest(least(coalesce(_limit, 10), 1000), 1); + _candidate_limit = greatest + ( least(coalesce(_candidate_limit, 30), 1000) + , _limit + ); + _fulltext_weight = greatest(least(coalesce(_fulltext_weight, 1.0), 1.0), 0.0); + _semantic_weight = greatest(least(coalesce(_semantic_weight, 1.0), 1.0), 0.0); + + -- reciprocal rank fusion + return query + select + coalesce(x1.id, x2.id) as id + , coalesce(x1.meta, x2.meta) as meta + , coalesce(x1.tree, x2.tree) as tree + , coalesce(x1.temporal, x2.temporal) as temporal + , coalesce(x1.content, x2.content) as content + , coalesce(x1.has_embedding, x2.has_embedding) as has_embedding + , coalesce(x1.created_at, x2.created_at) as created_at + , coalesce(x1.updated_at, x2.updated_at) as updated_at + , coalesce(_fulltext_weight / (_k + x1.rank), 0.0) + + coalesce(_semantic_weight / (_k + x2.rank), 0.0) as score + from + ( + select + row_number() over (order by m.score desc, m.id) as rank + , m.* + from {{schema}}.search_memory + ( _tree_access => _tree_access + , _bm25 => _bm25 + , _ltree => _ltree + , _lquery => _lquery + , _ltxtquery => _ltxtquery + , _meta_contains => _meta_contains + , _temporal_within => _temporal_within + , _temporal_overlaps => _temporal_overlaps + , _temporal_before => _temporal_before + , _temporal_after => _temporal_after + , _regexp => _regexp + , _limit => _candidate_limit + ) m + ) x1 + full outer join + ( + select + row_number() over (order by m.score desc, m.id) as rank + , m.* + from {{schema}}.search_memory + ( _tree_access => _tree_access + , _vec => _vec + , _max_vec_dist => _max_vec_dist + , _ltree => _ltree + , _lquery => _lquery + , _ltxtquery => _ltxtquery + , _meta_contains => _meta_contains + , _temporal_within => _temporal_within + , _temporal_overlaps => _temporal_overlaps + , _temporal_before => _temporal_before + , _temporal_after => _temporal_after + , _regexp => _regexp + , _limit => _candidate_limit + ) m + ) x2 on (x1.id = x2.id) + order by score desc, id + limit _limit + ; +end; +$func$ language plpgsql stable security invoker +set search_path to pg_catalog, {{schema}}, public, pg_temp +; diff --git a/packages/space/migrate/idempotent/003_embedding_queue.sql b/packages/space/migrate/idempotent/003_embedding_queue.sql new file mode 100644 index 0000000..5aba816 --- /dev/null +++ b/packages/space/migrate/idempotent/003_embedding_queue.sql @@ -0,0 +1,163 @@ + +------------------------------------------------------------------------------- +-- enqueue_embedding +------------------------------------------------------------------------------- +create or replace function {{schema}}.enqueue_embedding() +returns trigger +as $func$ +begin + insert into {{schema}}.embedding_queue (memory_id, embedding_version) + values (new.id, new.embedding_version); + return new; +end; +$func$ +language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; + +create or replace trigger memory_enqueue_embedding_insert +after insert on {{schema}}.memory +for each row +when (new.embedding is null) -- it's possible to insert with an embedding +execute function {{schema}}.enqueue_embedding() +; + +create or replace trigger memory_enqueue_embedding_update +after update on {{schema}}.memory +for each row +when +( old.content is distinct from new.content + and new.embedding is null +) +execute function {{schema}}.enqueue_embedding() +; + +------------------------------------------------------------------------------- +-- claim_embedding_batch +------------------------------------------------------------------------------- +create or replace function {{schema}}.claim_embedding_batch +( _batch_size int default 10 +, _lock_duration interval default '5 minutes' +, _max_attempts int default 3 +) +returns table +( queue_id bigint +, memory_id uuid +, embedding_version int +, content text +) +as $func$ +declare + _rec record; + _mem record; + _claimed_count int = 0; +begin + -- bulk-cancel visible queue rows superseded by a newer row for the same memory + update {{schema}}.embedding_queue eq + set outcome = 'cancelled' + where eq.outcome is null + and eq.vt <= now() + and exists + ( + select 1 + from {{schema}}.embedding_queue newer + where newer.memory_id = eq.memory_id + and newer.embedding_version > eq.embedding_version + and newer.outcome is null + ); + + -- sweep: finalize exhausted rows orphaned by worker crash + -- (attempts reached max but outcome was never written back) + update {{schema}}.embedding_queue + set + outcome = 'failed' + , last_error = coalesce(last_error, 'exceeded max attempts') + where outcome is null + and vt <= now() + and attempts >= _max_attempts + ; + + for _rec in + ( + select + eq.id + , eq.memory_id + , eq.embedding_version + from {{schema}}.embedding_queue eq + where eq.outcome is null + and eq.vt <= now() + and eq.attempts < _max_attempts + order by eq.vt + for update skip locked + ) + loop + -- check memory still exists + current version + select m.content, m.embedding_version + into _mem + from {{schema}}.memory m + where m.id = _rec.memory_id + ; + + if not found or _mem.content is null then + -- memory deleted or empty → cancel queue row + update {{schema}}.embedding_queue + set outcome = 'cancelled' + where id = _rec.id; + continue; + end if; + + if _rec.embedding_version != _mem.embedding_version then + -- stale version → cancel + update {{schema}}.embedding_queue + set outcome = 'cancelled' + where id = _rec.id; + continue; + end if; + + -- claim this row + update {{schema}}.embedding_queue q set + vt = now() + _lock_duration + , attempts = q.attempts + 1 + where id = _rec.id; + + queue_id = _rec.id; + memory_id = _rec.memory_id; + embedding_version = _rec.embedding_version; + content = _mem.content; + return next; + + _claimed_count = _claimed_count + 1; + exit when _claimed_count >= _batch_size; + end loop; +end; +$func$ +language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; + +------------------------------------------------------------------------------- +-- prune embedding queue +------------------------------------------------------------------------------- +-- prune terminal queue rows older than the retention window. +-- runs opportunistically from the worker on spaces that returned no +-- claimable work, so the queue table doesn't grow unbounded. +-- +-- relies on embedding_queue_archive_idx (created_at) where outcome is not null +-- from migration 005, so the no-op case is cheap. +create or replace function {{schema}}.prune_embedding_queue(_retention interval default '7 days') +returns bigint +as $func$ +declare + pruned bigint; +begin + delete from {{schema}}.embedding_queue + where outcome is not null + and created_at < now() - _retention + ; + get diagnostics pruned = row_count; + return pruned; +end; +$func$ +language plpgsql volatile security invoker +set search_path to pg_catalog, {{schema}}, pg_temp +; diff --git a/packages/space/migrate/idempotent/sql.d.ts b/packages/space/migrate/idempotent/sql.d.ts new file mode 100644 index 0000000..89b092e --- /dev/null +++ b/packages/space/migrate/idempotent/sql.d.ts @@ -0,0 +1,4 @@ +declare module "*.sql" { + const content: string; + export default content; +} diff --git a/packages/space/migrate/incremental/001_memory.sql b/packages/space/migrate/incremental/001_memory.sql new file mode 100644 index 0000000..c66eea7 --- /dev/null +++ b/packages/space/migrate/incremental/001_memory.sql @@ -0,0 +1,48 @@ +------------------------------------------------------------------------------- +-- memory +------------------------------------------------------------------------------- +create table {{schema}}.memory +( id uuid not null primary key default uuidv7() check (uuid_extract_version(id) = 7) +, meta jsonb not null default '{}' check (jsonb_typeof(meta) = 'object') +, tree ltree not null default ''::ltree +, temporal tstzrange +, content text not null +, embedding halfvec({{embedding_dimensions}}) +, embedding_version int4 not null default 1 +, created_at timestamptz not null default now() +, updated_at timestamptz +); + +-- index for faceted search +create index memory_meta_gin_idx on {{schema}}.memory using gin (meta); + +-- index for temporal search +create index memory_temporal_gist_idx on {{schema}}.memory using gist (temporal) where (temporal is not null); + +-- index for BM25 text search +create index memory_content_bm25_idx on {{schema}}.memory using bm25 (content) +with (text_config = {{bm25_text_config}}, k1 = {{bm25_k1}}, b = {{bm25_b}}); + +-- index for vector similarity search +create index memory_embedding_hnsw_idx on {{schema}}.memory using hnsw (embedding halfvec_cosine_ops) +with (m = {{hnsw_m}}, ef_construction = {{hnsw_ef_construction}}); + +-- index for hierarchical organization +create index memory_tree_gist_idx on {{schema}}.memory using gist (tree); + +/* +enforce consistent temporal range conventions: +- point-in-time events: lower = upper with inclusive bounds '[same,same]' +- time periods: lower < upper with inclusive-exclusive bounds '[start,end)' +*/ +alter table {{schema}}.memory add constraint temporal_bounds_convention check +( + temporal is null + or ( + -- point-in-time: both bounds equal and inclusive + (lower(temporal) = upper(temporal) and lower_inc(temporal) and upper_inc(temporal)) + or + -- time range: start before end, inclusive-exclusive + (lower(temporal) < upper(temporal) and lower_inc(temporal) and not upper_inc(temporal)) + ) +); diff --git a/packages/space/migrate/incremental/002_embedding_queue.sql b/packages/space/migrate/incremental/002_embedding_queue.sql new file mode 100644 index 0000000..468fe45 --- /dev/null +++ b/packages/space/migrate/incremental/002_embedding_queue.sql @@ -0,0 +1,22 @@ +------------------------------------------------------------------------------- +-- embedding queue +------------------------------------------------------------------------------- +-- per-space embedding queue table +create table {{schema}}.embedding_queue +( id bigint generated always as identity primary key +, memory_id uuid not null references {{schema}}.memory(id) on delete cascade +, embedding_version int not null +, vt timestamptz not null default now() +, outcome text check (outcome is null or outcome in ('completed', 'failed', 'cancelled')) +, attempts int not null default 0 +, last_error text +, created_at timestamptz not null default now() +, updated_at timestamptz +); + +-- index to find items to claim +create index embedding_queue_claim_idx on {{schema}}.embedding_queue (vt) where outcome is null; +-- index also used in finding items to claim. used to ensure there aren't any items for the same memory with a newer version +create index embedding_queue_memory_idx on {{schema}}.embedding_queue (memory_id, embedding_version desc) where outcome is null; +-- index to find items that have resolved to an outcome. these can be pruned +create index embedding_queue_archive_idx on {{schema}}.embedding_queue (created_at) where outcome is not null; diff --git a/packages/space/migrate/incremental/sql.d.ts b/packages/space/migrate/incremental/sql.d.ts new file mode 100644 index 0000000..89b092e --- /dev/null +++ b/packages/space/migrate/incremental/sql.d.ts @@ -0,0 +1,4 @@ +declare module "*.sql" { + const content: string; + export default content; +} diff --git a/packages/space/migrate/migrate.ts b/packages/space/migrate/migrate.ts new file mode 100644 index 0000000..b5154ba --- /dev/null +++ b/packages/space/migrate/migrate.ts @@ -0,0 +1,504 @@ +import { createHash } from "node:crypto"; +import { info, reportError, span } from "@pydantic/logfire-node"; +import { SQL, semver } from "bun"; +import { isValidSlug, slugToSchema } from "../slug"; +import { SPACE_SCHEMA_VERSION } from "../version"; +import incremental001 from "./incremental/001_memory.sql" with { type: "text" }; +import incremental002 from "./incremental/002_embedding_queue.sql" with { + type: "text", +}; +import provisionSql from "./provision.sql" with { type: "text" }; + +interface Incremental { + name: string; + file: string; + sql: string; +} + +const incrementals: Incremental[] = [ + { + name: "001_memory", + file: "incremental/001_memory.sql", + sql: incremental001, + }, + { + name: "002_embedding_queue", + file: "incremental/002_embedding_queue.sql", + sql: incremental002, + }, +]; + +import idempotent001 from "./idempotent/001_memory.sql" with { type: "text" }; +import idempotent002 from "./idempotent/002_search.sql" with { type: "text" }; +import idempotent003 from "./idempotent/003_embedding_queue.sql" with { + type: "text", +}; + +interface Idempotent { + name: string; + file: string; + sql: string; +} + +const idempotents: Idempotent[] = [ + { name: "001_memory", file: "idempotent/001_memory.sql", sql: idempotent001 }, + { name: "002_search", file: "idempotent/002_search.sql", sql: idempotent002 }, + { + name: "003_embedding_queue", + file: "idempotent/003_embedding_queue.sql", + sql: idempotent003, + }, +]; + +export interface MigrateSpaceOptions { + slug: string; + logSqlFiles?: boolean; + shardId?: number; + embeddingDimensions?: number; + bm25TextConfig?: string; + bm25K1?: number; + bm25B?: number; + hnswM?: number; + hnswEfConstruction?: number; + statementTimeout?: string; + lockTimeout?: string; + transactionTimeout?: string; + idleInTransactionSessionTimeout?: string; +} + +interface NormalizedMigrateSpaceOptions { + slug: string; + logSqlFiles: boolean; + schemaVersion: string; + shardId?: number; + embeddingDimensions: number; + bm25TextConfig: string; + bm25K1: number; + bm25B: number; + hnswM: number; + hnswEfConstruction: number; + statementTimeout: string; + lockTimeout: string; + transactionTimeout: string; + idleInTransactionSessionTimeout: string; +} + +export async function migrateSpace( + sql: SQL, + options: MigrateSpaceOptions, +): Promise { + const opts = normalizeMigrateSpaceOptions(options); + const attributes = migrateAttributes(opts); + + await span("space.migrate", { + attributes, + callback: async () => { + try { + if (!isValidSlug(opts.slug)) { + throw new Error( + `Invalid space slug: "${opts.slug}" — must be 12 lowercase alphanumeric characters`, + ); + } + if (!semver.satisfies(opts.schemaVersion, "*")) { + throw new Error(`Invalid schema version: "${opts.schemaVersion}"`); + } + const schema = slugToSchema(opts.slug); + const schemaAttributes = { ...attributes, "db.schema": schema }; + const [key1, key2] = advisoryLockKey(`memory-space:schema:${schema}`); + + await sql.begin(async (tx) => { + if (opts.shardId !== undefined) { + if (!Number.isSafeInteger(opts.shardId)) { + throw new Error( + `shardId must be a safe integer, got: ${opts.shardId}`, + ); + } + await tx.unsafe(`set local pgdog.shard to ${String(opts.shardId)}`); + } + await tx`select set_config('statement_timeout', ${opts.statementTimeout}, true)`; + await tx`select set_config('lock_timeout', ${opts.lockTimeout}, true)`; + await tx`select set_config('transaction_timeout', ${opts.transactionTimeout}, true)`; + await tx`select set_config('idle_in_transaction_session_timeout', ${opts.idleInTransactionSessionTimeout}, true)`; + const acquired = await span("space.migrate.acquire_lock", { + attributes: schemaAttributes, + callback: () => acquireAdvisoryLock(tx, key1, key2), + }); + if (!acquired) { + throw new Error( + `Unable to acquire lock for space slug ${opts.slug} migrations.`, + ); + } + + if (!(await doesSpaceExist(tx, schema))) { + await span("space.migrate.provision", { + attributes: { + ...schemaAttributes, + "space.migration_file": "provision.sql", + "space.migration_type": "provision", + }, + callback: () => provisionSpace(tx, schema, opts), + }); + info("Space schema provisioned", schemaAttributes); + } + await span("space.migrate.run", { + attributes: schemaAttributes, + callback: () => runMigrations(tx, schema, opts), + }); + }); + info("Space migrations completed", schemaAttributes); + } catch (error) { + reportError("Space migration failed", error as Error, attributes); + throw error; + } + }, + }); +} + +function migrateAttributes( + options: NormalizedMigrateSpaceOptions, +): Record { + return { + "space.slug": options.slug, + "space.schema_version": options.schemaVersion, + "db.shard": options.shardId, + "db.statement_timeout": options.statementTimeout, + "db.lock_timeout": options.lockTimeout, + "db.transaction_timeout": options.transactionTimeout, + "db.idle_in_transaction_session_timeout": + options.idleInTransactionSessionTimeout, + }; +} + +function normalizeMigrateSpaceOptions( + options: MigrateSpaceOptions, +): NormalizedMigrateSpaceOptions { + return { + slug: options.slug, + logSqlFiles: options.logSqlFiles ?? false, + schemaVersion: SPACE_SCHEMA_VERSION, + shardId: options.shardId, + embeddingDimensions: options.embeddingDimensions ?? 1536, + bm25TextConfig: options.bm25TextConfig ?? "english", + bm25K1: options.bm25K1 ?? 1.2, + bm25B: options.bm25B ?? 0.75, + hnswM: options.hnswM ?? 16, + hnswEfConstruction: options.hnswEfConstruction ?? 64, + statementTimeout: options.statementTimeout ?? "20s", + lockTimeout: options.lockTimeout ?? "5s", + transactionTimeout: options.transactionTimeout ?? "1min", + idleInTransactionSessionTimeout: + options.idleInTransactionSessionTimeout ?? "5s", + }; +} + +function templateVars( + schema: string, + options: NormalizedMigrateSpaceOptions, +): Record { + return { + ...options, + schema, + embedding_dimensions: options.embeddingDimensions, + bm25_text_config: options.bm25TextConfig, + bm25_k1: options.bm25K1, + bm25_b: options.bm25B, + hnsw_m: options.hnswM, + hnsw_ef_construction: options.hnswEfConstruction, + }; +} + +function advisoryLockKey(schema: string): [number, number] { + const digest = createHash("sha256").update(schema).digest(); + return [digest.readInt32BE(0), digest.readInt32BE(4)]; +} + +const MAX_LOCK_RETRIES = 5; +const BASE_DELAY_MS = 100; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function acquireAdvisoryLock( + tx: SQL, + key1: number, + key2: number, +): Promise { + let acquired = false; + for (let attempt = 0; attempt < MAX_LOCK_RETRIES; attempt++) { + const [result] = await tx` + select pg_try_advisory_xact_lock(${key1}, ${key2}) as acquired + `; + if (result.acquired) { + acquired = true; + break; + } + if (attempt < MAX_LOCK_RETRIES - 1) { + await sleep(BASE_DELAY_MS * 2 ** attempt); + } + } + return acquired; +} + +async function doesSpaceExist(tx: SQL, schema: string): Promise { + const [{ spaceExists }] = await tx` + select exists + ( + select 1 + from pg_namespace n + where n.nspname = ${schema} + ) as "spaceExists" + `; + return spaceExists; +} + +async function provisionSpace( + tx: SQL, + schema: string, + options: NormalizedMigrateSpaceOptions, +): Promise { + await executeSqlFile( + tx, + options, + schema, + "provision", + "provision.sql", + template(provisionSql, { schema }), + ); +} + +async function runMigrations( + tx: SQL, + schema: string, + options: NormalizedMigrateSpaceOptions, +): Promise { + // check ownership + await assertSchemaOwnership(tx, schema); + + // check version + const [{ version: dbVersion }] = await tx` + select version from ${tx(schema)}.version + `; + const cmp = semver.order(options.schemaVersion, dbVersion); + // abort if target is older than the database + if (cmp < 0) { + throw new Error( + `Application version (${options.schemaVersion}) is older than database version (${dbVersion}). ` + + "Please upgrade the application.", + ); + } + /* run migrations regardless + if (cmp === 0) { + // version matches. no need to run migrations + info("Space migration skipped, version current", { + "db.schema": schema, + "space.version": dbVersion, + "space.schema_version": options.schemaVersion, + }); + return; + } + */ + + // run incremental migrations + const sorted1 = [...incrementals].sort((a, b) => + a.name.localeCompare(b.name), + ); + + for (const migration of sorted1) { + const [{ existing }] = await tx` + select exists + ( + select 1 + from ${tx(schema)}.migration + where name = ${migration.name} + ) as existing + `; + + if (existing) { + continue; + } + + await span("space.migrate.incremental", { + attributes: { + "db.schema": schema, + "space.migration": migration.name, + "space.migration_file": migration.file, + "space.migration_type": "incremental", + "space.schema_version": options.schemaVersion, + }, + callback: async () => { + const renderedSql = template( + migration.sql, + templateVars(schema, options), + ); + await executeSqlFile( + tx, + options, + schema, + "incremental", + migration.file, + renderedSql, + ); + await tx` + insert into ${tx(schema)}.migration (name, applied_at_version) + values (${migration.name}, ${options.schemaVersion})`; + }, + }); + info("Space migration applied", { + "db.schema": schema, + "space.migration": migration.name, + "space.migration_file": migration.file, + "space.migration_type": "incremental", + "space.schema_version": options.schemaVersion, + }); + } + + // run idempotent migrations + const sorted2 = [...idempotents].sort((a, b) => a.name.localeCompare(b.name)); + + for (const migration of sorted2) { + await span("space.migrate.idempotent", { + attributes: { + "db.schema": schema, + "space.migration": migration.name, + "space.migration_file": migration.file, + "space.migration_type": "idempotent", + "space.schema_version": options.schemaVersion, + }, + callback: async () => { + const renderedSql = template( + migration.sql, + templateVars(schema, options), + ); + await executeSqlFile( + tx, + options, + schema, + "idempotent", + migration.file, + renderedSql, + ); + }, + }); + } + + // update version + await tx`update ${tx(schema)}.version set version = ${options.schemaVersion}, at = now()`; +} + +async function executeSqlFile( + tx: SQL, + options: NormalizedMigrateSpaceOptions, + schema: string, + type: string, + file: string, + sqlText: string, +): Promise { + logSqlFile(options, schema, type, file); + try { + await tx.unsafe(sqlText); + } catch (error) { + logSqlExecutionError(options, schema, type, file, sqlText, error); + throw error; + } +} + +function logSqlFile( + options: NormalizedMigrateSpaceOptions, + schema: string, + type: string, + file: string, +): void { + if (!options.logSqlFiles) return; + console.error( + `[migrate:db] space ${schema} ${type} packages/space/migrate/${file}`, + ); +} + +function logSqlExecutionError( + options: NormalizedMigrateSpaceOptions, + schema: string, + type: string, + file: string, + sqlText: string, + error: unknown, +): void { + if (!options.logSqlFiles) return; + console.error( + `[migrate:db] failed space ${schema} ${type} packages/space/migrate/${file}`, + ); + logPostgresSqlLocation(sqlText, error); +} + +function logPostgresSqlLocation(sqlText: string, error: unknown): void { + if (!(error instanceof SQL.PostgresError)) return; + const position = Number(error.position); + if (!Number.isSafeInteger(position) || position < 1) return; + + const location = sqlLocation(sqlText, position); + if (!location) return; + console.error( + `[migrate:db] sql position ${position} -> line ${location.line}, column ${location.column}`, + ); + console.error(sqlContext(sqlText, location.line, location.column)); +} + +function sqlLocation( + sqlText: string, + position: number, +): { line: number; column: number } | undefined { + if (position > sqlText.length + 1) return undefined; + let line = 1; + let column = 1; + for (let i = 0; i < position - 1; i++) { + if (sqlText.charCodeAt(i) === 10) { + line++; + column = 1; + } else { + column++; + } + } + return { line, column }; +} + +function sqlContext(sqlText: string, line: number, column: number): string { + const lines = sqlText.split("\n"); + const start = Math.max(1, line - 2); + const end = Math.min(lines.length, line + 2); + const width = String(end).length; + const output = ["[migrate:db] sql context:"]; + + for (let n = start; n <= end; n++) { + const marker = n === line ? ">" : " "; + output.push(`${marker} ${String(n).padStart(width)} | ${lines[n - 1]}`); + if (n === line) { + output.push(` ${" ".repeat(width)} | ${" ".repeat(column - 1)}^`); + } + } + + return output.join("\n"); +} + +async function assertSchemaOwnership(tx: SQL, schema: string): Promise { + const [result] = await tx` + select + n.nspowner = (select pg_catalog.to_regrole(current_user)::oid) as is_owner + from pg_catalog.pg_namespace n + where n.nspname = ${schema} + `; + + if (!result?.is_owner) { + throw new Error( + `Only the owner of the ${schema} schema can run database migrations`, + ); + } +} + +function template(sql: string, vars: Record): string { + return sql.replace(/\{\{(\w+)\}\}/g, (_, key) => { + if (!(key in vars)) { + throw new Error(`Missing template variable: ${key}`); + } + return String(vars[key]); + }); +} diff --git a/packages/space/migrate/provision.sql b/packages/space/migrate/provision.sql new file mode 100644 index 0000000..e98b9d9 --- /dev/null +++ b/packages/space/migrate/provision.sql @@ -0,0 +1,15 @@ +create schema {{schema}}; + +create table {{schema}}.version +( version text not null +, at timestamptz not null default now() +); + +create unique index version_singleton_idx on {{schema}}.version ((true)); -- only ONE row allowed +insert into {{schema}}.version (version) values ('0.0.0'); + +create table {{schema}}.migration +( name text not null constraint migration_pkey primary key +, applied_at_version text not null +, applied_at timestamptz not null default pg_catalog.clock_timestamp() +); diff --git a/packages/space/migrate/sql.d.ts b/packages/space/migrate/sql.d.ts new file mode 100644 index 0000000..89b092e --- /dev/null +++ b/packages/space/migrate/sql.d.ts @@ -0,0 +1,4 @@ +declare module "*.sql" { + const content: string; + export default content; +} diff --git a/packages/space/package.json b/packages/space/package.json new file mode 100644 index 0000000..92f1a9d --- /dev/null +++ b/packages/space/package.json @@ -0,0 +1,9 @@ +{ + "name": "@memory.build/space", + "version": "0.2.5", + "private": true, + "type": "module", + "dependencies": { + "@pydantic/logfire-node": "^0.13.1" + } +} diff --git a/packages/space/slug.ts b/packages/space/slug.ts new file mode 100644 index 0000000..067b323 --- /dev/null +++ b/packages/space/slug.ts @@ -0,0 +1,18 @@ +const SPACE_SCHEMA_RE = /^me_[a-z0-9]{12}$/; +const SLUG_RE = /^[a-z0-9]{12}$/; + +export function isValidSpaceSchema(name: string): boolean { + return SPACE_SCHEMA_RE.test(name); +} + +export function isValidSlug(slug: string): boolean { + return SLUG_RE.test(slug); +} + +export function slugToSchema(slug: string): string { + return `me_${slug}`; +} + +export function schemaToSlug(schema: string): string { + return schema.slice(3); +} diff --git a/packages/space/tsconfig.json b/packages/space/tsconfig.json new file mode 100644 index 0000000..23b1d27 --- /dev/null +++ b/packages/space/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../../tsconfig.json", + "include": ["**/*.ts", "**/*.d.ts"] +} diff --git a/packages/space/version.ts b/packages/space/version.ts new file mode 100644 index 0000000..1b92647 --- /dev/null +++ b/packages/space/version.ts @@ -0,0 +1 @@ +export const SPACE_SCHEMA_VERSION = "0.0.1"; diff --git a/scripts/migrate-db.ts b/scripts/migrate-db.ts new file mode 100644 index 0000000..471deca --- /dev/null +++ b/scripts/migrate-db.ts @@ -0,0 +1,143 @@ +#!/usr/bin/env bun +import { CORE_SCHEMA_VERSION, migrateCore } from "@memory.build/core"; +import { + bootstrapSpaceDatabase, + migrateSpace, + SPACE_SCHEMA_VERSION, + slugToSchema, +} from "@memory.build/space"; +import { SQL } from "bun"; + +const DEFAULT_DATABASE_URL = "postgresql://postgres@127.0.0.1:5432/postgres"; +const DEFAULT_SPACE_SLUG = "dev000000001"; + +type Mode = "all" | "core" | "space-db" | "space"; + +function usage(): string { + return `Usage: ./bun run migrate:db [all|core|space-db|space] + +Environment: + DATABASE_URL Postgres connection string. Falls back to ENGINE_DATABASE_URL, then ${DEFAULT_DATABASE_URL} + SPACE_SLUG Space slug to migrate. Defaults to ${DEFAULT_SPACE_SLUG} + +Modes: + all Migrate core, prepare database for spaces, and migrate the dev space. Default. + core Migrate only the core schema. + space-db Prepare only the physical database for spaces. + space Prepare the database for spaces and migrate one space. +`; +} + +function parseMode(arg: string | undefined): Mode { + if (!arg) return "all"; + if (arg === "--help" || arg === "-h") { + console.log(usage()); + process.exit(0); + } + if ( + arg === "all" || + arg === "core" || + arg === "space-db" || + arg === "space" + ) { + return arg; + } + console.error(`Invalid migration mode: ${arg}`); + console.error(usage()); + process.exit(1); +} + +function databaseUrl(): string { + return ( + process.env.DATABASE_URL ?? + process.env.ENGINE_DATABASE_URL ?? + DEFAULT_DATABASE_URL + ); +} + +async function main(): Promise { + const mode = parseMode(process.argv[2]); + const url = databaseUrl(); + const spaceSlug = process.env.SPACE_SLUG ?? DEFAULT_SPACE_SLUG; + const sql = new SQL(url); + + console.log(`Database: ${url}`); + console.log(`Mode: ${mode}`); + console.log(`Space slug: ${spaceSlug}`); + console.log(`Core schema version: ${CORE_SCHEMA_VERSION}`); + console.log(`Space schema version: ${SPACE_SCHEMA_VERSION}`); + console.log(""); + + try { + if (mode === "all" || mode === "core") { + await migrateCore(sql, { logSqlFiles: true }); + console.log("Migrated core."); + } + + if (mode === "all" || mode === "space-db" || mode === "space") { + await bootstrapSpaceDatabase(sql); + console.log("Prepared database for spaces."); + } + + if (mode === "all" || mode === "space") { + await migrateSpace(sql, { slug: spaceSlug, logSqlFiles: true }); + console.log(`Migrated space ${slugToSchema(spaceSlug)}.`); + } + } finally { + await sql.close(); + } +} + +main().catch((error) => { + console.error(""); + console.error( + "Migration failed:", + error instanceof Error ? error.message : error, + ); + printErrorDetails(error); + process.exit(1); +}); + +function printErrorDetails(error: unknown): void { + if (!error || typeof error !== "object") return; + + const details = error as Record; + const keys = [ + "name", + "errno", + "code", + "severity", + "detail", + "hint", + "position", + "internalPosition", + "internalQuery", + "where", + "schema", + "table", + "column", + "dataType", + "constraint", + "file", + "line", + "routine", + ]; + const seen = new Set(keys); + const extraKeys = [ + ...Object.getOwnPropertyNames(error), + ...Object.keys(details), + ].filter((key) => !seen.has(key) && key !== "message" && key !== "stack"); + + const entries = [...keys, ...extraKeys] + .map((key) => [key, details[key]] as const) + .filter( + ([, value]) => value !== undefined && value !== null && value !== "", + ); + + if (entries.length === 0) return; + + console.error("Postgres error details:"); + for (const [key, value] of entries) { + console.error(` ${key}: ${String(value)}`); + } +} diff --git a/scripts/package.json b/scripts/package.json index 8bd79d2..8919499 100644 --- a/scripts/package.json +++ b/scripts/package.json @@ -3,7 +3,9 @@ "private": true, "type": "module", "dependencies": { + "@memory.build/core": "workspace:*", "@memory.build/embedding": "workspace:*", + "@memory.build/space": "workspace:*", "yaml": "^2.7.0" } }