diff --git a/api/lib/env.ts b/api/lib/env.ts index b52700a..1f5169e 100644 --- a/api/lib/env.ts +++ b/api/lib/env.ts @@ -39,3 +39,8 @@ export const CLICKHOUSE_PASSWORD = env.CLICKHOUSE_PASSWORD if (!CLICKHOUSE_PASSWORD) { throw Error('CLICKHOUSE_PASSWORD: field required in the env') } + +// Optional interval (ms) for refreshing external SQL database schemas +// Defaults to 24 hours +export const DB_SCHEMA_REFRESH_MS = Number(env.DB_SCHEMA_REFRESH_MS) || + 24 * 60 * 60 * 1000 diff --git a/api/routes.ts b/api/routes.ts index 58f838d..e7b20a2 100644 --- a/api/routes.ts +++ b/api/routes.ts @@ -2,6 +2,7 @@ import { makeRouter, route } from '/api/lib/router.ts' import type { RequestContext } from '/api/lib/context.ts' import { handleGoogleCallback, initiateGoogleAuth } from './auth.ts' import { + DatabaseSchemasCollection, DeploymentDef, DeploymentsCollection, ProjectsCollection, @@ -333,6 +334,37 @@ const defs = { output: deploymentOutput, description: 'Regenerate a deployment token', }), + 'GET/api/deployment/schema': route({ + authorize: withUserSession, + fn: (_ctx, { url }) => { + const dep = DeploymentsCollection.get(url) + if (!dep) throw respond.NotFound({ message: 'Deployment not found' }) + if (!dep.databaseEnabled) { + throw respond.BadRequest({ + message: 'Database not enabled for deployment', + }) + } + const schema = DatabaseSchemasCollection.get(url) + if (!schema) throw respond.NotFound({ message: 'Schema not cached yet' }) + return schema + }, + input: OBJ({ url: STR('Deployment URL') }), + output: OBJ({ + deploymentUrl: STR('Deployment url (matches deployment.url)'), + dialect: STR('Detected SQL dialect'), + refreshedAt: STR('ISO datetime of last refresh'), + tables: ARR(OBJ({ + columns: ARR(OBJ({ + name: STR('Column name'), + type: STR('Column data type'), + ordinal: NUM('Column ordinal position'), + })), + schema: optional(STR('Schema name')), + table: STR('Table name'), + })), + }, 'Database schema cache for a deployment'), + description: 'Get cached database schema for a deployment', + }), 'DELETE/api/deployment': route({ authorize: withAdminSession, fn: async (_ctx, input) => { diff --git a/api/schema.ts b/api/schema.ts index 78da210..959ad69 100644 --- a/api/schema.ts +++ b/api/schema.ts @@ -1,4 +1,4 @@ -import { ARR, BOOL, OBJ, optional, STR } from './lib/validator.ts' +import { ARR, BOOL, NUM, OBJ, optional, STR } from './lib/validator.ts' import { Asserted } from './lib/router.ts' import { createCollection } from './lib/json_store.ts' @@ -39,6 +39,25 @@ export const DeploymentDef = OBJ({ }, 'The deployment schema definition') export type Deployment = Asserted +// A flattened representation of a remote SQL database logical schema for a deployment +// "dialect" is a best-effort detection (postgres | mysql | sqlite | sqlserver | oracle | duckdb | unknown) +// "tables" is a JSON stringified array of { schema?, table, columns: [{ name, type, ordinal }] } +export const DatabaseSchemaDef = OBJ({ + deploymentUrl: STR('Deployment url (matches deployment.url)'), + dialect: STR('Detected SQL dialect'), + refreshedAt: STR('ISO datetime of last refresh'), + tables: ARR(OBJ({ + columns: ARR(OBJ({ + name: STR(), + type: STR(), + ordinal: NUM(), + })), + schema: optional(STR()), + table: STR(), + })), +}, 'Database schema cache for a deployment') +export type DatabaseSchema = Asserted + export const UsersCollection = await createCollection( { name: 'users', primaryKey: 'userEmail' }, ) @@ -60,3 +79,8 @@ export const DeploymentsCollection = await createCollection< >( { name: 'deployments', primaryKey: 'url' }, ) + +export const DatabaseSchemasCollection = await createCollection< + DatabaseSchema, + 'deploymentUrl' +>({ name: 'db_schemas', primaryKey: 'deploymentUrl' }) diff --git a/api/server.ts b/api/server.ts index 3460aef..a5ec8c1 100644 --- a/api/server.ts +++ b/api/server.ts @@ -9,6 +9,7 @@ import { type RequestContext, requestContext } from '/api/lib/context.ts' import { join } from 'jsr:@std/path/join' import { serveDir } from 'jsr:@std/http/file-server' import { PORT } from './lib/env.ts' +import { startSchemaRefreshLoop } from './sql.ts' const isProd = Deno.args.includes('--env=prod') const staticDir = isProd @@ -97,6 +98,13 @@ export const fetch = async (req: Request) => { log.info('server-start') +// Start periodic DB schema refresh (non-blocking) +try { + startSchemaRefreshLoop() +} catch (err) { + log.error('schema-loop-start-failed', { err }) +} + Deno.serve({ port: PORT, }, fetch) diff --git a/api/sql.ts b/api/sql.ts new file mode 100644 index 0000000..f58724c --- /dev/null +++ b/api/sql.ts @@ -0,0 +1,131 @@ +import { DatabaseSchemasCollection, DeploymentsCollection } from './schema.ts' +import { DB_SCHEMA_REFRESH_MS } from './lib/env.ts' +import { log } from './lib/log.ts' + +async function runSQL( + endpoint: string, + token: string, + query: string, + params?: unknown, +) { + const res = await fetch(endpoint, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ query, params }), + }) + if (!res.ok) throw Error(`sql endpoint error ${res.status}`) + const data = await res.json() + + return data +} + +// Dialect detection attempts (run first successful) +const DETECTION_QUERIES: { name: string; sql: string; matcher: RegExp }[] = [ + { + name: 'sqlite', + sql: 'SELECT sqlite_version() as v', + matcher: /\d+\.\d+\.\d+/, + }, +] + +async function detectDialect(endpoint: string, token: string): Promise { + for (const d of DETECTION_QUERIES) { + try { + const rows = await runSQL(endpoint, token, d.sql) + log.debug('dialect-detection', { dialect: d.name, rows }) + if (rows.length) { + const text = JSON.stringify(rows[0]) + if (d.matcher.test(text)) return d.name + } + } catch { /* ignore */ } + } + return 'unknown' +} + +// Introspection queries per dialect returning columns list +// Standardized output fields: table_schema (nullable), table_name, column_name, data_type, ordinal_position +const INTROSPECTION: Record = { + sqlite: + `SELECT NULL AS table_schema, m.name AS table_name, p.name AS column_name, p.type AS data_type, p.cid + 1 AS ordinal_position FROM sqlite_master m JOIN pragma_table_info(m.name) p WHERE m.type = 'table' AND m.name NOT LIKE 'sqlite_%' ORDER BY m.name, p.cid`, + unknown: + `SELECT table_schema, table_name, column_name, data_type, ordinal_position FROM information_schema.columns ORDER BY table_schema, table_name, ordinal_position`, +} + +async function fetchSchema(endpoint: string, token: string, dialect: string) { + const sql = INTROSPECTION[dialect] ?? INTROSPECTION.unknown + return await runSQL(endpoint, token, sql) +} + +type ColumnInfo = { name: string; type: string; ordinal: number } +type TableInfo = { + schema: string | undefined + table: string + columns: ColumnInfo[] +} + +export async function refreshOneSchema( + dep: ReturnType, +) { + if (!dep || !dep.databaseEnabled || !dep.sqlEndpoint || !dep.sqlToken) return + try { + const dialect = await detectDialect(dep.sqlEndpoint, dep.sqlToken) + const rows = await fetchSchema(dep.sqlEndpoint, dep.sqlToken, dialect) + // group rows + const tableMap = new Map() + for (const r of rows) { + const schema = (r.table_schema as string) || undefined + const table = r.table_name as string + if (!table) continue + const key = (schema ? schema + '.' : '') + table + if (!tableMap.has(key)) tableMap.set(key, { schema, table, columns: [] }) + tableMap.get(key)!.columns.push({ + name: String(r.column_name), + type: String(r.data_type || ''), + ordinal: Number(r.ordinal_position || 0), + }) + } + const tables = [...tableMap.values()].map((t) => ({ + ...t, + columns: t.columns.sort((a, b) => a.ordinal - b.ordinal), + })) + const payload = { + deploymentUrl: dep.url, + dialect, + refreshedAt: new Date().toISOString(), + tables: tables, + } + const existing = DatabaseSchemasCollection.get(dep.url) + if (existing) { + await DatabaseSchemasCollection.update(dep.url, payload) + } else { + await DatabaseSchemasCollection.insert(payload) + } + log.info('schema-refreshed', { + deployment: dep.url, + dialect, + tables: tables.length, + }) + } catch (err) { + log.error('schema-refresh-failed', { deployment: dep.url, err }) + } +} + +export async function refreshAllSchemas() { + for (const dep of DeploymentsCollection.values()) { + await refreshOneSchema(dep) + } +} + +let intervalHandle: number | undefined +export function startSchemaRefreshLoop() { + if (intervalHandle) return + // initial kick (non-blocking) + refreshAllSchemas() + intervalHandle = setInterval(() => { + refreshAllSchemas() + }, DB_SCHEMA_REFRESH_MS) as unknown as number + log.info('schema-refresh-loop-started', { everyMs: DB_SCHEMA_REFRESH_MS }) +}