Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/lib/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ export const CLICKHOUSE_PASSWORD = env.CLICKHOUSE_PASSWORD
if (!CLICKHOUSE_PASSWORD) {
throw Error('CLICKHOUSE_PASSWORD: field required in the env')
}

// Optional interval (ms) for refreshing external SQL database schemas
// Defaults to 24 hours
export const DB_SCHEMA_REFRESH_MS = Number(env.DB_SCHEMA_REFRESH_MS) ||
24 * 60 * 60 * 1000
32 changes: 32 additions & 0 deletions api/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { makeRouter, route } from '/api/lib/router.ts'
import type { RequestContext } from '/api/lib/context.ts'
import { handleGoogleCallback, initiateGoogleAuth } from './auth.ts'
import {
DatabaseSchemasCollection,
DeploymentDef,
DeploymentsCollection,
ProjectsCollection,
Expand Down Expand Up @@ -333,6 +334,37 @@ const defs = {
output: deploymentOutput,
description: 'Regenerate a deployment token',
}),
'GET/api/deployment/schema': route({
authorize: withUserSession,
fn: (_ctx, { url }) => {
const dep = DeploymentsCollection.get(url)
if (!dep) throw respond.NotFound({ message: 'Deployment not found' })
if (!dep.databaseEnabled) {
throw respond.BadRequest({
message: 'Database not enabled for deployment',
})
}
const schema = DatabaseSchemasCollection.get(url)
if (!schema) throw respond.NotFound({ message: 'Schema not cached yet' })
return schema
},
input: OBJ({ url: STR('Deployment URL') }),
output: OBJ({
deploymentUrl: STR('Deployment url (matches deployment.url)'),
dialect: STR('Detected SQL dialect'),
refreshedAt: STR('ISO datetime of last refresh'),
tables: ARR(OBJ({
columns: ARR(OBJ({
name: STR('Column name'),
type: STR('Column data type'),
ordinal: NUM('Column ordinal position'),
})),
schema: optional(STR('Schema name')),
table: STR('Table name'),
})),
}, 'Database schema cache for a deployment'),
Comment on lines +352 to +365
Copy link

Copilot AI Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output schema definition duplicates the DatabaseSchemaDef from schema.ts. Consider importing and reusing DatabaseSchemaDef to maintain consistency and reduce duplication.

Copilot uses AI. Check for mistakes.
description: 'Get cached database schema for a deployment',
}),
'DELETE/api/deployment': route({
authorize: withAdminSession,
fn: async (_ctx, input) => {
Expand Down
26 changes: 25 additions & 1 deletion api/schema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ARR, BOOL, OBJ, optional, STR } from './lib/validator.ts'
import { ARR, BOOL, NUM, OBJ, optional, STR } from './lib/validator.ts'
import { Asserted } from './lib/router.ts'
import { createCollection } from './lib/json_store.ts'

Expand Down Expand Up @@ -39,6 +39,25 @@ export const DeploymentDef = OBJ({
}, 'The deployment schema definition')
export type Deployment = Asserted<typeof DeploymentDef>

// A flattened representation of a remote SQL database logical schema for a deployment
// "dialect" is a best-effort detection (postgres | mysql | sqlite | sqlserver | oracle | duckdb | unknown)
// "tables" is a JSON stringified array of { schema?, table, columns: [{ name, type, ordinal }] }
export const DatabaseSchemaDef = OBJ({
deploymentUrl: STR('Deployment url (matches deployment.url)'),
dialect: STR('Detected SQL dialect'),
refreshedAt: STR('ISO datetime of last refresh'),
tables: ARR(OBJ({
columns: ARR(OBJ({
name: STR(),
type: STR(),
ordinal: NUM(),
})),
schema: optional(STR()),
table: STR(),
})),
}, 'Database schema cache for a deployment')
export type DatabaseSchema = Asserted<typeof DatabaseSchemaDef>

export const UsersCollection = await createCollection<User, 'userEmail'>(
{ name: 'users', primaryKey: 'userEmail' },
)
Expand All @@ -60,3 +79,8 @@ export const DeploymentsCollection = await createCollection<
>(
{ name: 'deployments', primaryKey: 'url' },
)

export const DatabaseSchemasCollection = await createCollection<
DatabaseSchema,
'deploymentUrl'
>({ name: 'db_schemas', primaryKey: 'deploymentUrl' })
8 changes: 8 additions & 0 deletions api/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { type RequestContext, requestContext } from '/api/lib/context.ts'
import { join } from 'jsr:@std/path/join'
import { serveDir } from 'jsr:@std/http/file-server'
import { PORT } from './lib/env.ts'
import { startSchemaRefreshLoop } from './sql.ts'

const isProd = Deno.args.includes('--env=prod')
const staticDir = isProd
Expand Down Expand Up @@ -97,6 +98,13 @@ export const fetch = async (req: Request) => {

log.info('server-start')

// Start periodic DB schema refresh (non-blocking)
try {
startSchemaRefreshLoop()
} catch (err) {
log.error('schema-loop-start-failed', { err })
}

Deno.serve({
port: PORT,
}, fetch)
131 changes: 131 additions & 0 deletions api/sql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { DatabaseSchemasCollection, DeploymentsCollection } from './schema.ts'
import { DB_SCHEMA_REFRESH_MS } from './lib/env.ts'
import { log } from './lib/log.ts'

async function runSQL(
endpoint: string,
token: string,
query: string,
params?: unknown,
) {
const res = await fetch(endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
body: JSON.stringify({ query, params }),
})
if (!res.ok) throw Error(`sql endpoint error ${res.status}`)
const data = await res.json()

return data
}

// Dialect detection attempts (run first successful)
const DETECTION_QUERIES: { name: string; sql: string; matcher: RegExp }[] = [
{
name: 'sqlite',
sql: 'SELECT sqlite_version() as v',
matcher: /\d+\.\d+\.\d+/,
},
]

async function detectDialect(endpoint: string, token: string): Promise<string> {
for (const d of DETECTION_QUERIES) {
try {
const rows = await runSQL(endpoint, token, d.sql)
log.debug('dialect-detection', { dialect: d.name, rows })
if (rows.length) {
const text = JSON.stringify(rows[0])
if (d.matcher.test(text)) return d.name
}
} catch { /* ignore */ }
}
return 'unknown'
}

// Introspection queries per dialect returning columns list
// Standardized output fields: table_schema (nullable), table_name, column_name, data_type, ordinal_position
const INTROSPECTION: Record<string, string> = {
sqlite:
`SELECT NULL AS table_schema, m.name AS table_name, p.name AS column_name, p.type AS data_type, p.cid + 1 AS ordinal_position FROM sqlite_master m JOIN pragma_table_info(m.name) p WHERE m.type = 'table' AND m.name NOT LIKE 'sqlite_%' ORDER BY m.name, p.cid`,
unknown:
`SELECT table_schema, table_name, column_name, data_type, ordinal_position FROM information_schema.columns ORDER BY table_schema, table_name, ordinal_position`,
}

async function fetchSchema(endpoint: string, token: string, dialect: string) {
const sql = INTROSPECTION[dialect] ?? INTROSPECTION.unknown
return await runSQL(endpoint, token, sql)
}

type ColumnInfo = { name: string; type: string; ordinal: number }
type TableInfo = {
schema: string | undefined
table: string
columns: ColumnInfo[]
}

export async function refreshOneSchema(
dep: ReturnType<typeof DeploymentsCollection.get>,
) {
if (!dep || !dep.databaseEnabled || !dep.sqlEndpoint || !dep.sqlToken) return
try {
const dialect = await detectDialect(dep.sqlEndpoint, dep.sqlToken)
const rows = await fetchSchema(dep.sqlEndpoint, dep.sqlToken, dialect)
// group rows
const tableMap = new Map<string, TableInfo>()
for (const r of rows) {
const schema = (r.table_schema as string) || undefined
const table = r.table_name as string
if (!table) continue
const key = (schema ? schema + '.' : '') + table
if (!tableMap.has(key)) tableMap.set(key, { schema, table, columns: [] })
tableMap.get(key)!.columns.push({
name: String(r.column_name),
type: String(r.data_type || ''),
ordinal: Number(r.ordinal_position || 0),
})
}
const tables = [...tableMap.values()].map((t) => ({
...t,
columns: t.columns.sort((a, b) => a.ordinal - b.ordinal),
}))
const payload = {
deploymentUrl: dep.url,
dialect,
refreshedAt: new Date().toISOString(),
tables: tables,
}
const existing = DatabaseSchemasCollection.get(dep.url)
if (existing) {
await DatabaseSchemasCollection.update(dep.url, payload)
} else {
await DatabaseSchemasCollection.insert(payload)
}
log.info('schema-refreshed', {
deployment: dep.url,
dialect,
tables: tables.length,
})
} catch (err) {
log.error('schema-refresh-failed', { deployment: dep.url, err })
}
}

export async function refreshAllSchemas() {
for (const dep of DeploymentsCollection.values()) {
await refreshOneSchema(dep)
}
Comment on lines +117 to +119
Copy link

Copilot AI Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sequential processing of deployments could be slow with many deployments. Consider using Promise.allSettled() to refresh schemas concurrently while preventing one failure from blocking others.

Suggested change
for (const dep of DeploymentsCollection.values()) {
await refreshOneSchema(dep)
}
await Promise.allSettled(
Array.from(DeploymentsCollection.values()).map(dep => refreshOneSchema(dep))
);

Copilot uses AI. Check for mistakes.
}

let intervalHandle: number | undefined
export function startSchemaRefreshLoop() {
if (intervalHandle) return
// initial kick (non-blocking)
refreshAllSchemas()
Comment on lines +123 to +126
Copy link

Copilot AI Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial schema refresh call is not awaited, which means the interval starts immediately without waiting for the first refresh to complete. This could lead to overlapping refresh operations if the refresh takes longer than the interval. Consider awaiting this call or adding concurrency protection.

Suggested change
export function startSchemaRefreshLoop() {
if (intervalHandle) return
// initial kick (non-blocking)
refreshAllSchemas()
export async function startSchemaRefreshLoop() {
if (intervalHandle) return
// initial kick (awaited)
await refreshAllSchemas()

Copilot uses AI. Check for mistakes.
intervalHandle = setInterval(() => {
refreshAllSchemas()
}, DB_SCHEMA_REFRESH_MS) as unknown as number
log.info('schema-refresh-loop-started', { everyMs: DB_SCHEMA_REFRESH_MS })
}