diff --git a/apps/backend/src/app/api/cron/aggregate-analytics/route.ts b/apps/backend/src/app/api/cron/aggregate-analytics/route.ts new file mode 100644 index 00000000..0b62b96d --- /dev/null +++ b/apps/backend/src/app/api/cron/aggregate-analytics/route.ts @@ -0,0 +1,26 @@ +import { NextRequest, NextResponse } from 'next/server'; +import { withCronAuth } from '@/lib/api/cron-auth'; +import { analyticsAggregationService } from '@/services/analytics-aggregation.service'; + +async function handleAggregateAnalytics(_req: NextRequest) { + try { + const [hourly, daily] = await Promise.all([ + analyticsAggregationService.aggregate('1h'), + analyticsAggregationService.aggregate('24h'), + ]); + + return NextResponse.json({ + success: true, + hourly: { bucketsWritten: hourly.bucketsWritten }, + daily: { bucketsWritten: daily.bucketsWritten }, + }); + } catch (error: any) { + console.error('Analytics aggregation failed:', error); + return NextResponse.json( + { error: error.message || 'Aggregation failed' }, + { status: 500 } + ); + } +} + +export const GET = withCronAuth(handleAggregateAnalytics); diff --git a/apps/backend/src/services/analytics-aggregation.service.ts b/apps/backend/src/services/analytics-aggregation.service.ts new file mode 100644 index 00000000..75b68337 --- /dev/null +++ b/apps/backend/src/services/analytics-aggregation.service.ts @@ -0,0 +1,110 @@ +import { createClient } from '@/lib/supabase/server'; + +export type RollupGranularity = '1h' | '24h'; + +const BUCKET_MS: Record = { + '1h': 60 * 60 * 1_000, + '24h': 24 * 60 * 60 * 1_000, +}; + +/** + * Incremental analytics aggregation service. + * + * For each granularity (1h / 24h) it: + * 1. Reads the cursor (last_run_at) from rollup_cursors + * 2. Fetches only new deployment_analytics rows since that cursor + * 3. Groups them into time buckets and upserts into analytics_rollups + * 4. Advances the cursor to the current time + * + * The upsert is idempotent — re-running with the same data produces the + * same result because we recompute each bucket from the raw rows. + */ +export class AnalyticsAggregationService { + async aggregate(granularity: RollupGranularity): Promise<{ bucketsWritten: number }> { + const supabase = createClient(); + const bucketMs = BUCKET_MS[granularity]; + + // 1. Read cursor + const { data: cursorRow, error: cursorErr } = await supabase + .from('rollup_cursors') + .select('last_run_at') + .eq('granularity', granularity) + .single(); + + if (cursorErr) throw new Error(`Failed to read rollup cursor: ${cursorErr.message}`); + const since = new Date(cursorRow!.last_run_at); + const now = new Date(); + + // 2. Fetch new events since the last cursor + const { data: rows, error: rowsErr } = await supabase + .from('deployment_analytics') + .select('deployment_id, metric_type, metric_value, recorded_at') + .gte('recorded_at', since.toISOString()) + .lt('recorded_at', now.toISOString()) + .order('recorded_at', { ascending: true }); + + if (rowsErr) throw new Error(`Failed to fetch analytics rows: ${rowsErr.message}`); + if (!rows || rows.length === 0) return { bucketsWritten: 0 }; + + // 3. Group into buckets + type BucketKey = string; // `${deploymentId}|${metricType}|${bucketStart}` + const buckets = new Map(); + + for (const row of rows) { + const ts = new Date(row.recorded_at).getTime(); + const bucketStart = new Date(Math.floor(ts / bucketMs) * bucketMs).toISOString(); + const key: BucketKey = `${row.deployment_id}|${row.metric_type}|${bucketStart}`; + + const existing = buckets.get(key) ?? { + deployment_id: row.deployment_id, + metric_type: row.metric_type, + bucket_start: bucketStart, + total_value: 0, + record_count: 0, + up_count: 0, + }; + + existing.total_value += Number(row.metric_value); + existing.record_count += 1; + if (row.metric_type === 'uptime_check' && row.metric_value === 1) { + existing.up_count += 1; + } + + buckets.set(key, existing); + } + + // 4. Upsert rollup buckets + const upsertRows = Array.from(buckets.values()).map((b) => ({ + ...b, + granularity, + updated_at: now.toISOString(), + })); + + const { error: upsertErr } = await supabase + .from('analytics_rollups') + .upsert(upsertRows, { + onConflict: 'deployment_id,metric_type,granularity,bucket_start', + }); + + if (upsertErr) throw new Error(`Failed to upsert rollups: ${upsertErr.message}`); + + // 5. Advance cursor + const { error: updateErr } = await supabase + .from('rollup_cursors') + .update({ last_run_at: now.toISOString() }) + .eq('granularity', granularity); + + if (updateErr) throw new Error(`Failed to advance rollup cursor: ${updateErr.message}`); + + return { bucketsWritten: buckets.size }; + } +} + +export const analyticsAggregationService = new AnalyticsAggregationService(); diff --git a/apps/backend/tests/analytics/analytics-aggregation.test.ts b/apps/backend/tests/analytics/analytics-aggregation.test.ts new file mode 100644 index 00000000..85b869c0 --- /dev/null +++ b/apps/backend/tests/analytics/analytics-aggregation.test.ts @@ -0,0 +1,166 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { AnalyticsAggregationService } from '@/services/analytics-aggregation.service'; + +// ── Supabase mock ───────────────────────────────────────────────────────────── + +type Row = Record; + +function buildSupabaseMock(opts: { + cursor: string; + rows: Row[]; + upsertError?: string; + updateError?: string; +}) { + const upserted: Row[][] = []; + const updated: Row[][] = []; + + const mock = { + from: (table: string) => { + if (table === 'rollup_cursors') { + return { + select: () => ({ + eq: () => ({ + single: async () => ({ + data: { last_run_at: opts.cursor }, + error: null, + }), + }), + }), + update: (data: Row) => ({ + eq: () => { + updated.push([data]); + return { error: opts.updateError ? { message: opts.updateError } : null }; + }, + }), + }; + } + + if (table === 'deployment_analytics') { + return { + select: () => ({ + gte: () => ({ + lt: () => ({ + order: async () => ({ + data: opts.rows, + error: null, + }), + }), + }), + }), + }; + } + + if (table === 'analytics_rollups') { + return { + upsert: (rows: Row[]) => { + upserted.push(rows); + return { error: opts.upsertError ? { message: opts.upsertError } : null }; + }, + }; + } + + throw new Error(`Unexpected table: ${table}`); + }, + _upserted: upserted, + _updated: updated, + }; + + return mock; +} + +vi.mock('@/lib/supabase/server', () => ({ + createClient: vi.fn(), +})); + +import * as serverModule from '@/lib/supabase/server'; + +describe('AnalyticsAggregationService', () => { + const svc = new AnalyticsAggregationService(); + const createClientMock = vi.mocked(serverModule.createClient); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('returns 0 bucketsWritten when no new rows since cursor', async () => { + const db = buildSupabaseMock({ cursor: new Date().toISOString(), rows: [] }); + createClientMock.mockReturnValue(db as any); + + const result = await svc.aggregate('1h'); + expect(result.bucketsWritten).toBe(0); + }); + + it('correctly groups rows into hourly buckets', async () => { + const t0 = new Date('2024-01-01T10:00:00Z'); + const t1 = new Date('2024-01-01T10:30:00Z'); // same hour bucket + const t2 = new Date('2024-01-01T11:15:00Z'); // next hour bucket + + const rows = [ + { deployment_id: 'dep-1', metric_type: 'page_view', metric_value: 1, recorded_at: t0.toISOString() }, + { deployment_id: 'dep-1', metric_type: 'page_view', metric_value: 1, recorded_at: t1.toISOString() }, + { deployment_id: 'dep-1', metric_type: 'page_view', metric_value: 1, recorded_at: t2.toISOString() }, + ]; + + const db = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows }); + createClientMock.mockReturnValue(db as any); + + const result = await svc.aggregate('1h'); + expect(result.bucketsWritten).toBe(2); // two distinct hourly buckets + + const upserted = db._upserted[0]; + const bucket10 = upserted.find((r) => (r.bucket_start as string).includes('T10:')); + const bucket11 = upserted.find((r) => (r.bucket_start as string).includes('T11:')); + + expect(bucket10?.record_count).toBe(2); + expect(bucket11?.record_count).toBe(1); + }); + + it('counts up_count for uptime_check metric', async () => { + const rows = [ + { deployment_id: 'dep-2', metric_type: 'uptime_check', metric_value: 1, recorded_at: '2024-01-01T10:00:00Z' }, + { deployment_id: 'dep-2', metric_type: 'uptime_check', metric_value: 0, recorded_at: '2024-01-01T10:10:00Z' }, + { deployment_id: 'dep-2', metric_type: 'uptime_check', metric_value: 1, recorded_at: '2024-01-01T10:20:00Z' }, + ]; + + const db = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows }); + createClientMock.mockReturnValue(db as any); + + await svc.aggregate('1h'); + + const upserted = db._upserted[0]; + expect(upserted[0].up_count).toBe(2); + expect(upserted[0].record_count).toBe(3); + }); + + it('is idempotent — running twice with the same rows produces the same upsert payload', async () => { + const rows = [ + { deployment_id: 'dep-3', metric_type: 'page_view', metric_value: 5, recorded_at: '2024-01-01T08:00:00Z' }, + ]; + + const db1 = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows }); + createClientMock.mockReturnValue(db1 as any); + await svc.aggregate('24h'); + + const db2 = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows }); + createClientMock.mockReturnValue(db2 as any); + await svc.aggregate('24h'); + + // Both runs produce identical upsert payloads (same bucket keys) + const keys1 = db1._upserted[0].map((r) => `${r.deployment_id}|${r.metric_type}|${r.bucket_start}`); + const keys2 = db2._upserted[0].map((r) => `${r.deployment_id}|${r.metric_type}|${r.bucket_start}`); + expect(keys1).toEqual(keys2); + }); + + it('assigns rows to correct 24h bucket boundaries', async () => { + const rows = [ + { deployment_id: 'dep-4', metric_type: 'page_view', metric_value: 1, recorded_at: '2024-01-01T23:59:00Z' }, + { deployment_id: 'dep-4', metric_type: 'page_view', metric_value: 1, recorded_at: '2024-01-02T00:01:00Z' }, + ]; + + const db = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows }); + createClientMock.mockReturnValue(db as any); + + const result = await svc.aggregate('24h'); + expect(result.bucketsWritten).toBe(2); // two different day buckets + }); +}); diff --git a/supabase/migrations/016_analytics_rollups.sql b/supabase/migrations/016_analytics_rollups.sql new file mode 100644 index 00000000..52e06228 --- /dev/null +++ b/supabase/migrations/016_analytics_rollups.sql @@ -0,0 +1,64 @@ +-- Migration 016: Analytics Time-Series Rollup Tables +-- +-- Adds hourly and daily pre-aggregated rollup buckets to enable +-- fast dashboard queries without full table scans of deployment_analytics. +-- +-- Issue: #768 — Analytics Data Pipeline with Time-Series Aggregation + +-- ── analytics_rollups table ─────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS analytics_rollups ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + deployment_id UUID NOT NULL REFERENCES deployments(id) ON DELETE CASCADE, + metric_type TEXT NOT NULL, + bucket_start TIMESTAMPTZ NOT NULL, + granularity TEXT NOT NULL CHECK (granularity IN ('1h', '24h')), + total_value NUMERIC NOT NULL DEFAULT 0, + record_count INT NOT NULL DEFAULT 0, + up_count INT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + + UNIQUE (deployment_id, metric_type, granularity, bucket_start) +); + +CREATE INDEX IF NOT EXISTS analytics_rollups_deployment_idx + ON analytics_rollups (deployment_id, metric_type, granularity, bucket_start DESC); + +-- ── rollup_cursors: tracks last aggregated event per granularity ────────────── + +CREATE TABLE IF NOT EXISTS rollup_cursors ( + granularity TEXT NOT NULL, + last_run_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01T00:00:00Z', + + PRIMARY KEY (granularity) +); + +INSERT INTO rollup_cursors (granularity, last_run_at) +VALUES ('1h', '1970-01-01T00:00:00Z'), ('24h', '1970-01-01T00:00:00Z') +ON CONFLICT DO NOTHING; + +-- ── RLS ─────────────────────────────────────────────────────────────────────── + +ALTER TABLE analytics_rollups ENABLE ROW LEVEL SECURITY; +ALTER TABLE rollup_cursors ENABLE ROW LEVEL SECURITY; + +-- Users may read rollups for their own deployments +CREATE POLICY "users read own rollups" + ON analytics_rollups FOR SELECT + USING ( + EXISTS ( + SELECT 1 FROM deployments d + WHERE d.id = deployment_id + AND d.user_id = auth.uid() + ) + ); + +-- Service role handles writes (cron job) +CREATE POLICY "service role write rollups" + ON analytics_rollups FOR ALL + USING (auth.role() = 'service_role'); + +CREATE POLICY "service role manage cursors" + ON rollup_cursors FOR ALL + USING (auth.role() = 'service_role');