Skip to content
Merged
6 changes: 5 additions & 1 deletion .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ APP_ENV=test
PORT=3021
GOOGLE_CLIENT_ID=...test.apps.googleusercontent.com
CLIENT_SECRET=GOC...test
REDIRECT_URI=http://localhost:7737/api/auth/google
REDIRECT_URI=http://localhost:7737/api/auth/google

CLICKHOUSE_HOST=http://localhost:8443
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=token_pass
166 changes: 166 additions & 0 deletions api/click-house-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { createClient } from 'npm:@clickhouse/client'
import {
CLICKHOUSE_HOST,
CLICKHOUSE_PASSWORD,
CLICKHOUSE_USER,
} from './lib/env.ts'
import { respond } from './lib/response.ts'
import { log } from './lib/log.ts'
import { ARR, NUM, OBJ, optional, STR, UNION } from './lib/validator.ts'
import { Asserted } from './lib/router.ts'

const LogSchema = OBJ({
timestamp: NUM('The timestamp of the log event'),
trace_id: NUM('A float64 representation of the trace ID'),
span_id: optional(NUM('A float64 representation of the span ID')),
severity_number: NUM('The severity number of the log event'),
attributes: optional(OBJ({}, 'A map of attributes')),
event_name: STR('The name of the event'),
service_version: optional(STR('Service version')),
service_instance_id: optional(STR('Service instance ID')),
}, 'A log event')
const LogsInputSchema = UNION(
LogSchema,
ARR(LogSchema, 'An array of log events'),
)

type Log = Asserted<typeof LogSchema>
type LogsInput = Asserted<typeof LogsInputSchema>

const client = createClient({
url: CLICKHOUSE_HOST,
username: CLICKHOUSE_USER,
password: CLICKHOUSE_PASSWORD,
compression: {
request: true,
response: true,
},
clickhouse_settings: {
date_time_input_format: 'best_effort',
},
})

const numberToHex128 = (() => {
const alphabet = new TextEncoder().encode('0123456789abcdef')
const output = new Uint8Array(16)
const view = new DataView(new Uint8Array(8).buffer)
const dec = new TextDecoder()
return (id: number) => {
view.setFloat64(0, id, false)
let i = -1
while (++i < 8) {
const x = view.getUint8(i)
output[i * 2] = alphabet[x >> 4]
output[i * 2 + 1] = alphabet[x & 0xF]
}
return dec.decode(output)
}
})()

async function insertLogs(
service_name: string,
data: LogsInput,
) {
const logsToInsert = Array.isArray(data) ? data : [data]
if (logsToInsert.length === 0) throw respond.NoContent()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ok to throw a response ?


const rows = logsToInsert.map((log) => {
const traceHex = numberToHex128(log.trace_id)
const spanHex = numberToHex128(log.span_id ?? log.trace_id)
return {
...log,
timestamp: new Date(log.timestamp),
attributes: log.attributes ?? {},
service_name: service_name,
trace_id: traceHex,
span_id: spanHex,
}
})

log.debug('Inserting logs into ClickHouse', { rows })

try {
await client.insert({ table: 'logs', values: rows, format: 'JSONEachRow' })
return respond.OK()
} catch (error) {
log.error('Error inserting logs into ClickHouse:', { error })
throw respond.InternalServerError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, shouldn't we just return in that case ?

}
}

async function getLogs({
resource,
severity_number,
start_date,
end_date,
sort_by,
sort_order,
search,
}: {
resource: string
severity_number?: string
start_date?: string
end_date?: string
sort_by?: string
sort_order?: 'ASC' | 'DESC'
search?: Record<string, string>
}) {
const queryParts: string[] = []
const queryParams: Record<string, unknown> = { service_name: resource }

queryParts.push('service_name = {service_name:String}')
queryParams.service_name = resource

if (severity_number) {
queryParts.push('severity_number = {severity_number:UInt8}')
queryParams.severity_number = severity_number
}

if (start_date) {
queryParts.push('timestamp >= {start_date:DateTime}')
queryParams.start_date = new Date(start_date)
}

if (end_date) {
queryParts.push('timestamp <= {end_date:DateTime}')
queryParams.end_date = new Date(end_date)
}

if (search) {
if (search.trace_id) {
queryParts.push('trace_id = {trace_id:String}')
queryParams.trace_id = search.trace_id
}
if (search.span_id) {
queryParts.push('span_id = {span_id:String}')
queryParams.span_id = search.span_id
}
if (search.event_name) {
queryParts.push('event_name = {event_name:String}')
queryParams.event_name = search.event_name
}
}

const query = `
SELECT *
FROM logs
WHERE ${queryParts.join(' AND ')}
${sort_by ? `ORDER BY ${sort_by} ${sort_order || 'DESC'}` : ''}
LIMIT 1000
`

try {
const resultSet = await client.query({
query,
query_params: queryParams,
format: 'JSON',
})

return (await resultSet.json<Log>()).data
} catch (error) {
log.error('Error querying logs from ClickHouse:', { error })
throw respond.InternalServerError()
}
}

export { client, getLogs, insertLogs, LogSchema, LogsInputSchema }
2 changes: 2 additions & 0 deletions api/lib/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type RequestContext = {
readonly user: User | undefined
readonly trace: number
readonly span: number | undefined
resource: string | undefined
}

// we set default values so we don't have to check everytime if they exists
Expand All @@ -33,6 +34,7 @@ export const makeContext = (
cookies: {},
user: undefined,
span: undefined,
resource: undefined,
url,
req,
...extra,
Expand Down
13 changes: 13 additions & 0 deletions api/lib/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,16 @@ export const ORIGIN = new URL(REDIRECT_URI).origin

export const SECRET = env.SECRET ||
'iUokBru8WPSMAuMspijlt7F-Cnpqyg84F36b1G681h0'

export const CLICKHOUSE_HOST = env.CLICKHOUSE_HOST
if (!CLICKHOUSE_HOST) {
throw Error('CLICKHOUSE_HOST: field required in the env')
}
export const CLICKHOUSE_USER = env.CLICKHOUSE_USER
if (!CLICKHOUSE_USER) {
throw Error('CLICKHOUSE_USER: field required in the env')
}
export const CLICKHOUSE_PASSWORD = env.CLICKHOUSE_PASSWORD
if (!CLICKHOUSE_PASSWORD) {
throw Error('CLICKHOUSE_PASSWORD: field required in the env')
}
24 changes: 12 additions & 12 deletions api/lib/json_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ const batch = async <T>(
await Promise.all(pool)
}

export type BaseRecord = { createdAt?: number; updatedAt?: number }
export type BaseRecord = { createdAt: number; updatedAt: number }

export async function createCollection<
T extends Record<PropertyKey, unknown> & BaseRecord,
T extends Record<PropertyKey, unknown>,
K extends keyof T,
>({ name, primaryKey }: CollectionOptions<T, K>) {
const dir = join(DB_DIR, name)
Expand Down Expand Up @@ -66,34 +66,34 @@ export async function createCollection<
const id = data[primaryKey]
if (!id) throw Error(`Missing primary key ${primaryKey}`)
if (records.has(id)) throw Error(`${id} already exists`)
Object.assign(data, { createdAt: Date.now() })
Object.assign(data, { createdAt: Date.now(), updatedAt: Date.now() })
records.set(id, data)
await saveRecord(data)

return data
return data as T & BaseRecord
},

[Symbol.iterator]: () => records[Symbol.iterator],
keys: () => records.keys(),
values: () => records.values(),
entries: () => records.entries(),
get: (id: T[K]) => records.get(id),
values: () => records.values() as MapIterator<T & BaseRecord>,
entries: () => records.entries() as MapIterator<[T[K], T & BaseRecord]>,
get: (id: T[K]) => records.get(id) as T & BaseRecord | undefined,
assert: (id: T[K]) => {
const match = records.get(id)
if (match) return match
if (match) return match as T & BaseRecord
throw new Deno.errors.NotFound(`record ${id} not found`)
},
find: (predicate: (record: T) => unknown) =>
records.values().find(predicate),
records.values().find(predicate) as T & BaseRecord | undefined,
filter: (predicate: (record: T) => unknown) =>
records.values().filter(predicate).toArray(),
records.values().filter(predicate).toArray() as (T & BaseRecord)[],
async update(id: T[K], changes: Partial<Omit<T, K>>) {
const record = records.get(id)
if (!record) throw new Deno.errors.NotFound(`record ${id} not found`)
const updated = { ...record, ...changes, _updatedAt: Date.now() } as T
const updated = { ...record, ...changes, updatedAt: Date.now() } as T
records.set(id, updated)
await saveRecord(updated)
return updated
return updated as T & BaseRecord
},

async delete(id: T[K]) {
Expand Down
36 changes: 36 additions & 0 deletions api/lib/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ type DefList<T extends readonly (string | number)[]> = {
assert: (value: unknown) => T[number]
}

type DefUnion<T extends readonly Def[]> = {
type: 'union'
of: T
report: Validator<DefUnion<T>>
optional?: boolean
description?: string
assert: (value: unknown) => ReturnType<T[number]['assert']>
}

type DefObject<T extends Record<string, Def>> = {
type: 'object'
properties: { [K in keyof T]: T[K] }
Expand Down Expand Up @@ -69,6 +78,7 @@ export type DefBase =
| DefArray<any>
| DefObject<Record<string, any>>
| DefList<any>
| DefUnion<any>

type OptionalAssert<T extends Def['assert']> = (
value: unknown,
Expand Down Expand Up @@ -258,6 +268,32 @@ export const LIST = <const T extends readonly (string | number)[]>(
description,
})

export const UNION = <T extends readonly Def[]>(...types: T): DefUnion<T> => ({
type: 'union',
of: types,
report: (value: unknown, path: (string | number)[] = []) => {
const failures: ValidatorFailure<DefUnion<T>>[] = []
for (const type of types) {
const result = type.report(value, path)
if (result.length === 0) return []
failures.push(...result)
}
return failures
},
assert: (value: unknown): ReturnType<T[number]['assert']> => {
for (const type of types) {
try {
return type.assert(value)
} catch {
// Ignore
}
}
throw new Error(
`Invalid value. Expected one of: ${types.map((t) => t.type).join(', ')}`,
)
},
})

// const Article = OBJ({
// id: NUM("Unique identifier for the article"),
// title: STR("Title of the article"),
Expand Down
Loading
Loading