Skip to content

Commit 913ff62

Browse files
committed
feat(schema): Enhance log schema with additional fields and improve data types for better logging structure
1 parent b3e0b67 commit 913ff62

5 files changed

Lines changed: 139 additions & 74 deletions

File tree

api/click-house-client.ts

Lines changed: 74 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,25 @@ import {
66
} from './lib/env.ts'
77
import { respond } from './lib/response.ts'
88
import { log } from './lib/log.ts'
9-
import { ARR, NUM, OBJ, STR, UNION } from './lib/validator.ts'
9+
import { ARR, NUM, OBJ, optional, STR, UNION } from './lib/validator.ts'
1010
import { Asserted } from './lib/router.ts'
1111

1212
const LogSchema = OBJ({
13-
timestamp: STR(),
14-
trace_id: STR(),
15-
span_id: STR(),
16-
severity_number: NUM(),
17-
attributes: OBJ({}),
18-
event_name: STR(),
19-
context: OBJ({}),
20-
})
21-
22-
const LogsInputSchema = UNION(LogSchema, ARR(LogSchema))
23-
24-
export type Log = Asserted<typeof LogSchema>
13+
timestamp: NUM('The timestamp of the log event'),
14+
trace_id: NUM('A float64 representation of the trace ID'),
15+
span_id: NUM('A float64 representation of the span ID'),
16+
severity_number: NUM('The severity number of the log event'),
17+
attributes: optional(OBJ({}, 'A map of attributes')),
18+
event_name: STR('The name of the event'),
19+
service_version: optional(STR('Service version')),
20+
service_instance_id: optional(STR('Service instance ID')),
21+
}, 'A log event')
22+
const LogsInputSchema = UNION(
23+
LogSchema,
24+
ARR(LogSchema, 'An array of log events'),
25+
)
26+
27+
type Log = Asserted<typeof LogSchema>
2528
type LogsInput = Asserted<typeof LogsInputSchema>
2629

2730
const client = createClient({
@@ -34,26 +37,65 @@ const client = createClient({
3437
},
3538
})
3639

40+
export function float64ToId128(
41+
{ trace, span }: { trace: number; span?: number },
42+
) {
43+
const id128 = new Uint8Array(16)
44+
const view = new DataView(id128.buffer)
45+
view.setFloat64(0, trace, false)
46+
if (span) {
47+
view.setFloat64(8, span, false)
48+
} else {
49+
view.setFloat64(8, trace, false)
50+
}
51+
return id128
52+
}
53+
54+
export function bytesToHex(bytes: Uint8Array) {
55+
return Array.from(bytes).map((b) => b.toString(16).padStart(2, '0')).join('')
56+
}
57+
const escapeSql = (s: unknown) =>
58+
String(s ?? '').replace(/\\/g, '\\\\').replace(/'/g, "''")
59+
3760
async function insertLogs(
38-
resource: string,
61+
service_name: string,
3962
data: LogsInput,
4063
) {
4164
const logsToInsert = Array.isArray(data) ? data : [data]
42-
if (logsToInsert.length === 0) {
43-
throw respond.NoContent()
44-
}
45-
46-
const values = logsToInsert.map((log) => ({
47-
...log,
48-
resource,
49-
}))
65+
if (logsToInsert.length === 0) throw respond.NoContent()
66+
67+
const rows = logsToInsert.map((log) => {
68+
const traceHex = bytesToHex(
69+
float64ToId128({ trace: log.trace_id }),
70+
)
71+
const spanHex = bytesToHex(
72+
float64ToId128({
73+
trace: log.trace_id,
74+
span: log.span_id,
75+
}),
76+
)
77+
78+
const ts = log.timestamp
79+
const attrs = escapeSql(JSON.stringify(log.attributes))
80+
const event = escapeSql(log.event_name)
81+
const svc = escapeSql(service_name)
82+
const svcVer = escapeSql(log.service_version ?? 'unknown')
83+
const svcInst = escapeSql(log.service_instance_id ?? 'unknown')
84+
// trace_id / span_id inserted as binary via unhex('<32hex>')
85+
return `('${svc}','${svcVer}','${svcInst}','${ts}',unhex('${traceHex}'),unhex('${spanHex}'),${log.severity_number},'${attrs}','${event}')`
86+
})
87+
88+
const q = `
89+
INSERT INTO logs
90+
(service_name, service_version, service_instance_id,
91+
timestamp, trace_id, span_id,
92+
severity_number, attributes, event_name)
93+
VALUES
94+
${rows.join(',')}
95+
`.trim()
5096

5197
try {
52-
await client.insert({
53-
table: 'logs',
54-
values,
55-
format: 'JSONEachRow',
56-
})
98+
await client.command({ query: q })
5799
return respond.OK()
58100
} catch (error) {
59101
log.error('Error inserting logs into ClickHouse:', { error })
@@ -89,9 +131,9 @@ async function getLogs({
89131
search?: Record<string, string>
90132
}) {
91133
const queryParts: string[] = []
92-
const queryParams: Record<string, unknown> = { resource }
134+
const queryParams: Record<string, unknown> = { service_name: resource }
93135

94-
queryParts.push('resource = {resource:String}')
136+
queryParts.push('service_name = {service_name:String}')
95137

96138
if (level) {
97139
queryParts.push('severity_number = {level:UInt8}')
@@ -134,11 +176,12 @@ async function getLogs({
134176
try {
135177
const resultSet = await client.query({
136178
query,
179+
137180
query_params: queryParams,
138-
format: 'JSONEachRow',
181+
format: 'JSON',
139182
})
140183

141-
return resultSet.json<Log[]>()
184+
return (await resultSet.json<Log>()).data
142185
} catch (error) {
143186
log.error('Error querying logs from ClickHouse:', { error })
144187
throw respond.InternalServerError()

api/lib/json_store.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ const batch = async <T>(
3232
await Promise.all(pool)
3333
}
3434

35-
export type BaseRecord = { createdAt?: number; updatedAt?: number }
35+
export type BaseRecord = { createdAt: number; updatedAt: number }
3636

3737
export async function createCollection<
38-
T extends Record<PropertyKey, unknown> & BaseRecord,
38+
T extends Record<PropertyKey, unknown>,
3939
K extends keyof T,
4040
>({ name, primaryKey }: CollectionOptions<T, K>) {
4141
const dir = join(DB_DIR, name)
@@ -66,34 +66,34 @@ export async function createCollection<
6666
const id = data[primaryKey]
6767
if (!id) throw Error(`Missing primary key ${primaryKey}`)
6868
if (records.has(id)) throw Error(`${id} already exists`)
69-
Object.assign(data, { createdAt: Date.now() })
69+
Object.assign(data, { createdAt: Date.now(), updatedAt: Date.now() })
7070
records.set(id, data)
7171
await saveRecord(data)
7272

73-
return data
73+
return data as T & BaseRecord
7474
},
7575

7676
[Symbol.iterator]: () => records[Symbol.iterator],
7777
keys: () => records.keys(),
78-
values: () => records.values(),
79-
entries: () => records.entries(),
80-
get: (id: T[K]) => records.get(id),
78+
values: () => records.values() as MapIterator<T & BaseRecord>,
79+
entries: () => records.entries() as MapIterator<[T[K], T & BaseRecord]>,
80+
get: (id: T[K]) => records.get(id) as T & BaseRecord | undefined,
8181
assert: (id: T[K]) => {
8282
const match = records.get(id)
83-
if (match) return match
83+
if (match) return match as T & BaseRecord
8484
throw new Deno.errors.NotFound(`record ${id} not found`)
8585
},
8686
find: (predicate: (record: T) => unknown) =>
87-
records.values().find(predicate),
87+
records.values().find(predicate) as T & BaseRecord | undefined,
8888
filter: (predicate: (record: T) => unknown) =>
89-
records.values().filter(predicate).toArray(),
89+
records.values().filter(predicate).toArray() as (T & BaseRecord)[],
9090
async update(id: T[K], changes: Partial<Omit<T, K>>) {
9191
const record = records.get(id)
9292
if (!record) throw new Deno.errors.NotFound(`record ${id} not found`)
93-
const updated = { ...record, ...changes, _updatedAt: Date.now() } as T
93+
const updated = { ...record, ...changes, updatedAt: Date.now() } as T
9494
records.set(id, updated)
9595
await saveRecord(updated)
96-
return updated
96+
return updated as T & BaseRecord
9797
},
9898

9999
async delete(id: T[K]) {

api/routes.ts

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
LogsInputSchema,
2424
} from './click-house-client.ts'
2525
import { decryptMessage, encryptMessage } from './user.ts'
26+
import { log } from './lib/log.ts'
2627

2728
const withUserSession = ({ user }: RequestContext) => {
2829
if (!user) throw Error('Missing user session')
@@ -35,14 +36,19 @@ const withAdminSession = ({ user }: RequestContext) => {
3536
const withDeploymentSession = async (ctx: RequestContext) => {
3637
const token = ctx.req.headers.get('Authorization')?.replace(/^Bearer /i, '')
3738
if (!token) throw respond.Unauthorized({ message: 'Missing token' })
38-
const message = await decryptMessage(token)
39-
if (!message) throw respond.Unauthorized({ message: 'Invalid token' })
40-
const data = JSON.parse(message)
41-
const dep = DeploymentsCollection.get(data?.url)
42-
if (!dep || dep.tokenSalt !== data?.tokenSalt) {
39+
try {
40+
const message = await decryptMessage(token)
41+
if (!message) throw respond.Unauthorized({ message: 'Invalid token' })
42+
const data = JSON.parse(message)
43+
const dep = DeploymentsCollection.get(data?.url)
44+
if (!dep || dep.tokenSalt !== data?.tokenSalt) {
45+
throw respond.Unauthorized({ message: 'Invalid token' })
46+
}
47+
ctx.resource = dep?.url
48+
} catch (error) {
49+
log.error('Error validating deployment token:', { error })
4350
throw respond.Unauthorized({ message: 'Invalid token' })
4451
}
45-
ctx.resource = dep?.url
4652
}
4753

4854
const deploymentOutput = OBJ({
@@ -229,8 +235,6 @@ const defs = {
229235
return deployments.map(({ tokenSalt: _, ...d }) => {
230236
return {
231237
...d,
232-
createdAt: d.createdAt,
233-
updatedAt: d.updatedAt,
234238
token: undefined,
235239
sqlToken: undefined,
236240
sqlEndpoint: undefined,
@@ -252,8 +256,6 @@ const defs = {
252256
)
253257
return {
254258
...deployment,
255-
createdAt: deployment.createdAt,
256-
updatedAt: deployment.updatedAt,
257259
token,
258260
}
259261
},
@@ -275,8 +277,6 @@ const defs = {
275277
)
276278
return {
277279
...deployment,
278-
createdAt: deployment.createdAt,
279-
updatedAt: deployment.updatedAt,
280280
token,
281281
}
282282
},
@@ -294,8 +294,6 @@ const defs = {
294294
)
295295
return {
296296
...deployment,
297-
createdAt: deployment.createdAt,
298-
updatedAt: deployment.updatedAt,
299297
token,
300298
}
301299
},
@@ -319,8 +317,6 @@ const defs = {
319317
)
320318
return {
321319
...deployment,
322-
createdAt: deployment.createdAt,
323-
updatedAt: deployment.updatedAt,
324320
token,
325321
}
326322
},
@@ -350,9 +346,22 @@ const defs = {
350346
}),
351347
'GET/api/logs': route({
352348
authorize: withUserSession,
353-
fn: async (_ctx, params) => {
354-
const logs = await getLogs(params)
355-
return logs.flat()
349+
fn: (ctx, params) => {
350+
const deployments = DeploymentsCollection.filter((d) =>
351+
d.projectId === params.resource
352+
)
353+
if (deployments.length === 0) {
354+
throw respond.NotFound({ message: 'Deployments not found' })
355+
}
356+
const project = ProjectsCollection.get(deployments[0].projectId)
357+
if (!project) throw respond.NotFound({ message: 'Project not found' })
358+
if (!project.isPublic && !ctx.user?.isAdmin) {
359+
const team = TeamsCollection.find((t) => t.teamId === project.teamId)
360+
if (!team?.teamMembers.includes(ctx.user?.userEmail || '')) {
361+
throw respond.Forbidden({ message: 'Access to project logs denied' })
362+
}
363+
}
364+
return getLogs(params)
356365
},
357366
input: OBJ({
358367
resource: STR('The resource to fetch logs for'),
@@ -363,7 +372,7 @@ const defs = {
363372
sort_order: optional(
364373
LIST(['ASC', 'DESC'], 'The sort order (ASC or DESC)'),
365374
),
366-
search: optional(OBJ({}, 'A map of fields to search by')),
375+
// search: optional(OBJ({}, 'A map of fields to search by')),
367376
}),
368377
output: ARR(LogSchema, 'List of logs'),
369378
description: 'Get logs from ClickHouse',

api/schema.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import { ARR, BOOL, OBJ, optional, STR } from './lib/validator.ts'
22
import { Asserted } from './lib/router.ts'
3-
import { BaseRecord, createCollection } from './lib/json_store.ts'
3+
import { createCollection } from './lib/json_store.ts'
44

55
export const UserDef = OBJ({
66
userEmail: STR('The user email address'),
77
userFullName: STR('The user login name'),
88
userPicture: optional(STR('The user profile picture URL')),
99
isAdmin: BOOL('Is the user an admin?'),
1010
}, 'The user schema definition')
11-
export type User = Asserted<typeof UserDef> & BaseRecord
11+
export type User = Asserted<typeof UserDef>
1212

1313
export const TeamDef = OBJ({
1414
teamId: STR('The unique identifier for the team'),
@@ -18,7 +18,7 @@ export const TeamDef = OBJ({
1818
'The list of user emails who are members of the team',
1919
),
2020
}, 'The team schema definition')
21-
export type Team = Asserted<typeof TeamDef> & BaseRecord
21+
export type Team = Asserted<typeof TeamDef>
2222

2323
export const ProjectDef = OBJ({
2424
slug: STR('The unique identifier for the project'),
@@ -27,7 +27,7 @@ export const ProjectDef = OBJ({
2727
isPublic: BOOL('Is the project public?'),
2828
repositoryUrl: optional(STR('The URL of the project repository')),
2929
}, 'The project schema definition')
30-
export type Project = Asserted<typeof ProjectDef> & BaseRecord
30+
export type Project = Asserted<typeof ProjectDef>
3131

3232
export const DeploymentDef = OBJ({
3333
projectId: STR('The ID of the project this deployment belongs to'),
@@ -37,7 +37,7 @@ export const DeploymentDef = OBJ({
3737
sqlEndpoint: optional(STR('The SQL execution endpoint for the database')),
3838
sqlToken: optional(STR('The security token for the SQL endpoint')),
3939
}, 'The deployment schema definition')
40-
export type Deployment = Asserted<typeof DeploymentDef> & BaseRecord
40+
export type Deployment = Asserted<typeof DeploymentDef>
4141

4242
export const UsersCollection = await createCollection<User, 'userEmail'>(
4343
{ name: 'users', primaryKey: 'userEmail' },

0 commit comments

Comments
 (0)