Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions apps/backend/src/app/api/cron/aggregate-analytics/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { NextRequest, NextResponse } from 'next/server';
import { withCronAuth } from '@/lib/api/cron-auth';
import { analyticsAggregationService } from '@/services/analytics-aggregation.service';

async function handleAggregateAnalytics(_req: NextRequest) {
try {
const [hourly, daily] = await Promise.all([
analyticsAggregationService.aggregate('1h'),
analyticsAggregationService.aggregate('24h'),
]);

return NextResponse.json({
success: true,
hourly: { bucketsWritten: hourly.bucketsWritten },
daily: { bucketsWritten: daily.bucketsWritten },
});
} catch (error: any) {
console.error('Analytics aggregation failed:', error);
return NextResponse.json(
{ error: error.message || 'Aggregation failed' },
{ status: 500 }
);
}
}

export const GET = withCronAuth(handleAggregateAnalytics);
110 changes: 110 additions & 0 deletions apps/backend/src/services/analytics-aggregation.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { createClient } from '@/lib/supabase/server';

export type RollupGranularity = '1h' | '24h';

const BUCKET_MS: Record<RollupGranularity, number> = {
'1h': 60 * 60 * 1_000,
'24h': 24 * 60 * 60 * 1_000,
};

/**
* Incremental analytics aggregation service.
*
* For each granularity (1h / 24h) it:
* 1. Reads the cursor (last_run_at) from rollup_cursors
* 2. Fetches only new deployment_analytics rows since that cursor
* 3. Groups them into time buckets and upserts into analytics_rollups
* 4. Advances the cursor to the current time
*
* The upsert is idempotent — re-running with the same data produces the
* same result because we recompute each bucket from the raw rows.
*/
export class AnalyticsAggregationService {
async aggregate(granularity: RollupGranularity): Promise<{ bucketsWritten: number }> {
const supabase = createClient();
const bucketMs = BUCKET_MS[granularity];

// 1. Read cursor
const { data: cursorRow, error: cursorErr } = await supabase
.from('rollup_cursors')
.select('last_run_at')
.eq('granularity', granularity)
.single();

if (cursorErr) throw new Error(`Failed to read rollup cursor: ${cursorErr.message}`);
const since = new Date(cursorRow!.last_run_at);
const now = new Date();

// 2. Fetch new events since the last cursor
const { data: rows, error: rowsErr } = await supabase
.from('deployment_analytics')
.select('deployment_id, metric_type, metric_value, recorded_at')
.gte('recorded_at', since.toISOString())
.lt('recorded_at', now.toISOString())
.order('recorded_at', { ascending: true });

if (rowsErr) throw new Error(`Failed to fetch analytics rows: ${rowsErr.message}`);
if (!rows || rows.length === 0) return { bucketsWritten: 0 };

// 3. Group into buckets
type BucketKey = string; // `${deploymentId}|${metricType}|${bucketStart}`
const buckets = new Map<BucketKey, {
deployment_id: string;
metric_type: string;
bucket_start: string;
total_value: number;
record_count: number;
up_count: number;
}>();

for (const row of rows) {
const ts = new Date(row.recorded_at).getTime();
const bucketStart = new Date(Math.floor(ts / bucketMs) * bucketMs).toISOString();
const key: BucketKey = `${row.deployment_id}|${row.metric_type}|${bucketStart}`;

const existing = buckets.get(key) ?? {
deployment_id: row.deployment_id,
metric_type: row.metric_type,
bucket_start: bucketStart,
total_value: 0,
record_count: 0,
up_count: 0,
};

existing.total_value += Number(row.metric_value);
existing.record_count += 1;
if (row.metric_type === 'uptime_check' && row.metric_value === 1) {
existing.up_count += 1;
}

buckets.set(key, existing);
}

// 4. Upsert rollup buckets
const upsertRows = Array.from(buckets.values()).map((b) => ({
...b,
granularity,
updated_at: now.toISOString(),
}));

const { error: upsertErr } = await supabase
.from('analytics_rollups')
.upsert(upsertRows, {
onConflict: 'deployment_id,metric_type,granularity,bucket_start',
});

if (upsertErr) throw new Error(`Failed to upsert rollups: ${upsertErr.message}`);

// 5. Advance cursor
const { error: updateErr } = await supabase
.from('rollup_cursors')
.update({ last_run_at: now.toISOString() })
.eq('granularity', granularity);

if (updateErr) throw new Error(`Failed to advance rollup cursor: ${updateErr.message}`);

return { bucketsWritten: buckets.size };
}
}

export const analyticsAggregationService = new AnalyticsAggregationService();
166 changes: 166 additions & 0 deletions apps/backend/tests/analytics/analytics-aggregation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { AnalyticsAggregationService } from '@/services/analytics-aggregation.service';

// ── Supabase mock ─────────────────────────────────────────────────────────────

type Row = Record<string, unknown>;

function buildSupabaseMock(opts: {
cursor: string;
rows: Row[];
upsertError?: string;
updateError?: string;
}) {
const upserted: Row[][] = [];
const updated: Row[][] = [];

const mock = {
from: (table: string) => {
if (table === 'rollup_cursors') {
return {
select: () => ({
eq: () => ({
single: async () => ({
data: { last_run_at: opts.cursor },
error: null,
}),
}),
}),
update: (data: Row) => ({
eq: () => {
updated.push([data]);
return { error: opts.updateError ? { message: opts.updateError } : null };
},
}),
};
}

if (table === 'deployment_analytics') {
return {
select: () => ({
gte: () => ({
lt: () => ({
order: async () => ({
data: opts.rows,
error: null,
}),
}),
}),
}),
};
}

if (table === 'analytics_rollups') {
return {
upsert: (rows: Row[]) => {
upserted.push(rows);
return { error: opts.upsertError ? { message: opts.upsertError } : null };
},
};
}

throw new Error(`Unexpected table: ${table}`);
},
_upserted: upserted,
_updated: updated,
};

return mock;
}

vi.mock('@/lib/supabase/server', () => ({
createClient: vi.fn(),
}));

import * as serverModule from '@/lib/supabase/server';

describe('AnalyticsAggregationService', () => {
const svc = new AnalyticsAggregationService();
const createClientMock = vi.mocked(serverModule.createClient);

beforeEach(() => {
vi.clearAllMocks();
});

it('returns 0 bucketsWritten when no new rows since cursor', async () => {
const db = buildSupabaseMock({ cursor: new Date().toISOString(), rows: [] });
createClientMock.mockReturnValue(db as any);

const result = await svc.aggregate('1h');
expect(result.bucketsWritten).toBe(0);
});

it('correctly groups rows into hourly buckets', async () => {
const t0 = new Date('2024-01-01T10:00:00Z');
const t1 = new Date('2024-01-01T10:30:00Z'); // same hour bucket
const t2 = new Date('2024-01-01T11:15:00Z'); // next hour bucket

const rows = [
{ deployment_id: 'dep-1', metric_type: 'page_view', metric_value: 1, recorded_at: t0.toISOString() },
{ deployment_id: 'dep-1', metric_type: 'page_view', metric_value: 1, recorded_at: t1.toISOString() },
{ deployment_id: 'dep-1', metric_type: 'page_view', metric_value: 1, recorded_at: t2.toISOString() },
];

const db = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows });
createClientMock.mockReturnValue(db as any);

const result = await svc.aggregate('1h');
expect(result.bucketsWritten).toBe(2); // two distinct hourly buckets

const upserted = db._upserted[0];
const bucket10 = upserted.find((r) => (r.bucket_start as string).includes('T10:'));
const bucket11 = upserted.find((r) => (r.bucket_start as string).includes('T11:'));

expect(bucket10?.record_count).toBe(2);
expect(bucket11?.record_count).toBe(1);
});

it('counts up_count for uptime_check metric', async () => {
const rows = [
{ deployment_id: 'dep-2', metric_type: 'uptime_check', metric_value: 1, recorded_at: '2024-01-01T10:00:00Z' },
{ deployment_id: 'dep-2', metric_type: 'uptime_check', metric_value: 0, recorded_at: '2024-01-01T10:10:00Z' },
{ deployment_id: 'dep-2', metric_type: 'uptime_check', metric_value: 1, recorded_at: '2024-01-01T10:20:00Z' },
];

const db = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows });
createClientMock.mockReturnValue(db as any);

await svc.aggregate('1h');

const upserted = db._upserted[0];
expect(upserted[0].up_count).toBe(2);
expect(upserted[0].record_count).toBe(3);
});

it('is idempotent — running twice with the same rows produces the same upsert payload', async () => {
const rows = [
{ deployment_id: 'dep-3', metric_type: 'page_view', metric_value: 5, recorded_at: '2024-01-01T08:00:00Z' },
];

const db1 = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows });
createClientMock.mockReturnValue(db1 as any);
await svc.aggregate('24h');

const db2 = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows });
createClientMock.mockReturnValue(db2 as any);
await svc.aggregate('24h');

// Both runs produce identical upsert payloads (same bucket keys)
const keys1 = db1._upserted[0].map((r) => `${r.deployment_id}|${r.metric_type}|${r.bucket_start}`);
const keys2 = db2._upserted[0].map((r) => `${r.deployment_id}|${r.metric_type}|${r.bucket_start}`);
expect(keys1).toEqual(keys2);
});

it('assigns rows to correct 24h bucket boundaries', async () => {
const rows = [
{ deployment_id: 'dep-4', metric_type: 'page_view', metric_value: 1, recorded_at: '2024-01-01T23:59:00Z' },
{ deployment_id: 'dep-4', metric_type: 'page_view', metric_value: 1, recorded_at: '2024-01-02T00:01:00Z' },
];

const db = buildSupabaseMock({ cursor: '1970-01-01T00:00:00Z', rows });
createClientMock.mockReturnValue(db as any);

const result = await svc.aggregate('24h');
expect(result.bucketsWritten).toBe(2); // two different day buckets
});
});
64 changes: 64 additions & 0 deletions supabase/migrations/016_analytics_rollups.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
-- Migration 016: Analytics Time-Series Rollup Tables
--
-- Adds hourly and daily pre-aggregated rollup buckets to enable
-- fast dashboard queries without full table scans of deployment_analytics.
--
-- Issue: #768 — Analytics Data Pipeline with Time-Series Aggregation

-- ── analytics_rollups table ───────────────────────────────────────────────────

CREATE TABLE IF NOT EXISTS analytics_rollups (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
deployment_id UUID NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
metric_type TEXT NOT NULL,
bucket_start TIMESTAMPTZ NOT NULL,
granularity TEXT NOT NULL CHECK (granularity IN ('1h', '24h')),
total_value NUMERIC NOT NULL DEFAULT 0,
record_count INT NOT NULL DEFAULT 0,
up_count INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),

UNIQUE (deployment_id, metric_type, granularity, bucket_start)
);

CREATE INDEX IF NOT EXISTS analytics_rollups_deployment_idx
ON analytics_rollups (deployment_id, metric_type, granularity, bucket_start DESC);

-- ── rollup_cursors: tracks last aggregated event per granularity ──────────────

CREATE TABLE IF NOT EXISTS rollup_cursors (
granularity TEXT NOT NULL,
last_run_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01T00:00:00Z',

PRIMARY KEY (granularity)
);

INSERT INTO rollup_cursors (granularity, last_run_at)
VALUES ('1h', '1970-01-01T00:00:00Z'), ('24h', '1970-01-01T00:00:00Z')
ON CONFLICT DO NOTHING;

-- ── RLS ───────────────────────────────────────────────────────────────────────

ALTER TABLE analytics_rollups ENABLE ROW LEVEL SECURITY;
ALTER TABLE rollup_cursors ENABLE ROW LEVEL SECURITY;

-- Users may read rollups for their own deployments
CREATE POLICY "users read own rollups"
ON analytics_rollups FOR SELECT
USING (
EXISTS (
SELECT 1 FROM deployments d
WHERE d.id = deployment_id
AND d.user_id = auth.uid()
)
);

-- Service role handles writes (cron job)
CREATE POLICY "service role write rollups"
ON analytics_rollups FOR ALL
USING (auth.role() = 'service_role');

CREATE POLICY "service role manage cursors"
ON rollup_cursors FOR ALL
USING (auth.role() = 'service_role');
Loading