Skip to content

Commit ca1dfe5

Browse files
authored
Implementations Of The Deployment Database Introspection Logic (#65)
* feat(schema): add endpoint to fetch cached database schema and implement schema refresh loop * refactor(sql): remove unused dialect detection queries and introspection SQL for unsupported databases
1 parent d0a7231 commit ca1dfe5

File tree

5 files changed

+201
-1
lines changed

5 files changed

+201
-1
lines changed

api/lib/env.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ export const CLICKHOUSE_PASSWORD = env.CLICKHOUSE_PASSWORD
3939
if (!CLICKHOUSE_PASSWORD) {
4040
throw Error('CLICKHOUSE_PASSWORD: field required in the env')
4141
}
42+
43+
// Optional interval (ms) for refreshing external SQL database schemas
44+
// Defaults to 24 hours
45+
export const DB_SCHEMA_REFRESH_MS = Number(env.DB_SCHEMA_REFRESH_MS) ||
46+
24 * 60 * 60 * 1000

api/routes.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { makeRouter, route } from '/api/lib/router.ts'
22
import type { RequestContext } from '/api/lib/context.ts'
33
import { handleGoogleCallback, initiateGoogleAuth } from './auth.ts'
44
import {
5+
DatabaseSchemasCollection,
56
DeploymentDef,
67
DeploymentsCollection,
78
ProjectsCollection,
@@ -333,6 +334,37 @@ const defs = {
333334
output: deploymentOutput,
334335
description: 'Regenerate a deployment token',
335336
}),
337+
'GET/api/deployment/schema': route({
338+
authorize: withUserSession,
339+
fn: (_ctx, { url }) => {
340+
const dep = DeploymentsCollection.get(url)
341+
if (!dep) throw respond.NotFound({ message: 'Deployment not found' })
342+
if (!dep.databaseEnabled) {
343+
throw respond.BadRequest({
344+
message: 'Database not enabled for deployment',
345+
})
346+
}
347+
const schema = DatabaseSchemasCollection.get(url)
348+
if (!schema) throw respond.NotFound({ message: 'Schema not cached yet' })
349+
return schema
350+
},
351+
input: OBJ({ url: STR('Deployment URL') }),
352+
output: OBJ({
353+
deploymentUrl: STR('Deployment url (matches deployment.url)'),
354+
dialect: STR('Detected SQL dialect'),
355+
refreshedAt: STR('ISO datetime of last refresh'),
356+
tables: ARR(OBJ({
357+
columns: ARR(OBJ({
358+
name: STR('Column name'),
359+
type: STR('Column data type'),
360+
ordinal: NUM('Column ordinal position'),
361+
})),
362+
schema: optional(STR('Schema name')),
363+
table: STR('Table name'),
364+
})),
365+
}, 'Database schema cache for a deployment'),
366+
description: 'Get cached database schema for a deployment',
367+
}),
336368
'DELETE/api/deployment': route({
337369
authorize: withAdminSession,
338370
fn: async (_ctx, input) => {

api/schema.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ARR, BOOL, OBJ, optional, STR } from './lib/validator.ts'
1+
import { ARR, BOOL, NUM, OBJ, optional, STR } from './lib/validator.ts'
22
import { Asserted } from './lib/router.ts'
33
import { createCollection } from './lib/json_store.ts'
44

@@ -39,6 +39,25 @@ export const DeploymentDef = OBJ({
3939
}, 'The deployment schema definition')
4040
export type Deployment = Asserted<typeof DeploymentDef>
4141

42+
// A flattened representation of a remote SQL database logical schema for a deployment
43+
// "dialect" is a best-effort detection (postgres | mysql | sqlite | sqlserver | oracle | duckdb | unknown)
44+
// "tables" is a JSON stringified array of { schema?, table, columns: [{ name, type, ordinal }] }
45+
export const DatabaseSchemaDef = OBJ({
46+
deploymentUrl: STR('Deployment url (matches deployment.url)'),
47+
dialect: STR('Detected SQL dialect'),
48+
refreshedAt: STR('ISO datetime of last refresh'),
49+
tables: ARR(OBJ({
50+
columns: ARR(OBJ({
51+
name: STR(),
52+
type: STR(),
53+
ordinal: NUM(),
54+
})),
55+
schema: optional(STR()),
56+
table: STR(),
57+
})),
58+
}, 'Database schema cache for a deployment')
59+
export type DatabaseSchema = Asserted<typeof DatabaseSchemaDef>
60+
4261
export const UsersCollection = await createCollection<User, 'userEmail'>(
4362
{ name: 'users', primaryKey: 'userEmail' },
4463
)
@@ -60,3 +79,8 @@ export const DeploymentsCollection = await createCollection<
6079
>(
6180
{ name: 'deployments', primaryKey: 'url' },
6281
)
82+
83+
export const DatabaseSchemasCollection = await createCollection<
84+
DatabaseSchema,
85+
'deploymentUrl'
86+
>({ name: 'db_schemas', primaryKey: 'deploymentUrl' })

api/server.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { type RequestContext, requestContext } from '/api/lib/context.ts'
99
import { join } from 'jsr:@std/path/join'
1010
import { serveDir } from 'jsr:@std/http/file-server'
1111
import { PORT } from './lib/env.ts'
12+
import { startSchemaRefreshLoop } from './sql.ts'
1213

1314
const isProd = Deno.args.includes('--env=prod')
1415
const staticDir = isProd
@@ -97,6 +98,13 @@ export const fetch = async (req: Request) => {
9798

9899
log.info('server-start')
99100

101+
// Start periodic DB schema refresh (non-blocking)
102+
try {
103+
startSchemaRefreshLoop()
104+
} catch (err) {
105+
log.error('schema-loop-start-failed', { err })
106+
}
107+
100108
Deno.serve({
101109
port: PORT,
102110
}, fetch)

api/sql.ts

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import { DatabaseSchemasCollection, DeploymentsCollection } from './schema.ts'
2+
import { DB_SCHEMA_REFRESH_MS } from './lib/env.ts'
3+
import { log } from './lib/log.ts'
4+
5+
async function runSQL(
6+
endpoint: string,
7+
token: string,
8+
query: string,
9+
params?: unknown,
10+
) {
11+
const res = await fetch(endpoint, {
12+
method: 'POST',
13+
headers: {
14+
'Content-Type': 'application/json',
15+
Authorization: `Bearer ${token}`,
16+
},
17+
body: JSON.stringify({ query, params }),
18+
})
19+
if (!res.ok) throw Error(`sql endpoint error ${res.status}`)
20+
const data = await res.json()
21+
22+
return data
23+
}
24+
25+
// Dialect detection attempts (run first successful)
26+
const DETECTION_QUERIES: { name: string; sql: string; matcher: RegExp }[] = [
27+
{
28+
name: 'sqlite',
29+
sql: 'SELECT sqlite_version() as v',
30+
matcher: /\d+\.\d+\.\d+/,
31+
},
32+
]
33+
34+
async function detectDialect(endpoint: string, token: string): Promise<string> {
35+
for (const d of DETECTION_QUERIES) {
36+
try {
37+
const rows = await runSQL(endpoint, token, d.sql)
38+
log.debug('dialect-detection', { dialect: d.name, rows })
39+
if (rows.length) {
40+
const text = JSON.stringify(rows[0])
41+
if (d.matcher.test(text)) return d.name
42+
}
43+
} catch { /* ignore */ }
44+
}
45+
return 'unknown'
46+
}
47+
48+
// Introspection queries per dialect returning columns list
49+
// Standardized output fields: table_schema (nullable), table_name, column_name, data_type, ordinal_position
50+
const INTROSPECTION: Record<string, string> = {
51+
sqlite:
52+
`SELECT NULL AS table_schema, m.name AS table_name, p.name AS column_name, p.type AS data_type, p.cid + 1 AS ordinal_position FROM sqlite_master m JOIN pragma_table_info(m.name) p WHERE m.type = 'table' AND m.name NOT LIKE 'sqlite_%' ORDER BY m.name, p.cid`,
53+
unknown:
54+
`SELECT table_schema, table_name, column_name, data_type, ordinal_position FROM information_schema.columns ORDER BY table_schema, table_name, ordinal_position`,
55+
}
56+
57+
async function fetchSchema(endpoint: string, token: string, dialect: string) {
58+
const sql = INTROSPECTION[dialect] ?? INTROSPECTION.unknown
59+
return await runSQL(endpoint, token, sql)
60+
}
61+
62+
type ColumnInfo = { name: string; type: string; ordinal: number }
63+
type TableInfo = {
64+
schema: string | undefined
65+
table: string
66+
columns: ColumnInfo[]
67+
}
68+
69+
export async function refreshOneSchema(
70+
dep: ReturnType<typeof DeploymentsCollection.get>,
71+
) {
72+
if (!dep || !dep.databaseEnabled || !dep.sqlEndpoint || !dep.sqlToken) return
73+
try {
74+
const dialect = await detectDialect(dep.sqlEndpoint, dep.sqlToken)
75+
const rows = await fetchSchema(dep.sqlEndpoint, dep.sqlToken, dialect)
76+
// group rows
77+
const tableMap = new Map<string, TableInfo>()
78+
for (const r of rows) {
79+
const schema = (r.table_schema as string) || undefined
80+
const table = r.table_name as string
81+
if (!table) continue
82+
const key = (schema ? schema + '.' : '') + table
83+
if (!tableMap.has(key)) tableMap.set(key, { schema, table, columns: [] })
84+
tableMap.get(key)!.columns.push({
85+
name: String(r.column_name),
86+
type: String(r.data_type || ''),
87+
ordinal: Number(r.ordinal_position || 0),
88+
})
89+
}
90+
const tables = [...tableMap.values()].map((t) => ({
91+
...t,
92+
columns: t.columns.sort((a, b) => a.ordinal - b.ordinal),
93+
}))
94+
const payload = {
95+
deploymentUrl: dep.url,
96+
dialect,
97+
refreshedAt: new Date().toISOString(),
98+
tables: tables,
99+
}
100+
const existing = DatabaseSchemasCollection.get(dep.url)
101+
if (existing) {
102+
await DatabaseSchemasCollection.update(dep.url, payload)
103+
} else {
104+
await DatabaseSchemasCollection.insert(payload)
105+
}
106+
log.info('schema-refreshed', {
107+
deployment: dep.url,
108+
dialect,
109+
tables: tables.length,
110+
})
111+
} catch (err) {
112+
log.error('schema-refresh-failed', { deployment: dep.url, err })
113+
}
114+
}
115+
116+
export async function refreshAllSchemas() {
117+
for (const dep of DeploymentsCollection.values()) {
118+
await refreshOneSchema(dep)
119+
}
120+
}
121+
122+
let intervalHandle: number | undefined
123+
export function startSchemaRefreshLoop() {
124+
if (intervalHandle) return
125+
// initial kick (non-blocking)
126+
refreshAllSchemas()
127+
intervalHandle = setInterval(() => {
128+
refreshAllSchemas()
129+
}, DB_SCHEMA_REFRESH_MS) as unknown as number
130+
log.info('schema-refresh-loop-started', { everyMs: DB_SCHEMA_REFRESH_MS })
131+
}

0 commit comments

Comments
 (0)