Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
749 changes: 749 additions & 0 deletions backend/package-lock.json

Large diffs are not rendered by default.

22 changes: 12 additions & 10 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
getStreamHistory,
countStreamEvents,
getStreamEventSummary,
StreamEventType,
} from "./services/eventHistory";
import { fetchOpenIssues } from "./services/openIssues";
import {
Expand Down Expand Up @@ -282,7 +283,6 @@ app.use(helmet({
preload: true,
},
}));
app.use(cors());
const ALLOWED_ORIGINS = process.env.ALLOWED_ORIGINS;

if (ALLOWED_ORIGINS) {
Expand Down Expand Up @@ -588,25 +588,27 @@ app.get("/api/events", readLimiter, (req: Request, res: Response) => {
}

const query = parsedQuery.data;
const hasPage = req.query.page !== undefined;
const hasLimit = req.query.limit !== undefined;

const eventType = query.eventType as Parameters<typeof getGlobalEvents>[2];
const total = countAllEvents(eventType);
const eventType = query.eventType as StreamEventType | undefined;
const streamId = query.streamId;
const since = query.since;

const total = countAllEvents(eventType, streamId, since);

const page = query.page ?? PAGINATION_DEFAULT_PAGE;
const limit =
!hasPage && !hasLimit ? total : (query.limit ?? PAGINATION_DEFAULT_LIMIT);
const pageSize = query.pageSize ?? query.limit ?? PAGINATION_DEFAULT_LIMIT;

const offset = (page - 1) * limit;
const offset = (page - 1) * pageSize;
const data = getGlobalEvents(
limit === 0 ? 0 : limit,
pageSize,
offset,
eventType,
query.cursor,
streamId,
since,
);

res.json({ data, total, page, limit });
res.json({ data, total, page, pageSize, limit: pageSize });
});

app.get(
Expand Down
75 changes: 75 additions & 0 deletions backend/src/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,81 @@ describe("Backend Integration Tests", () => {
expect(streamIds.has("2")).toBe(true);
expect(streamIds.has("3")).toBe(true);
});

it("should filter by streamId", async () => {
const response = await request(app)
.get("/api/events")
.query({ streamId: "1" });

expect(response.status).toBe(200);
expect(response.body.data).toHaveLength(2);
expect(response.body.total).toBe(2);
response.body.data.forEach((e: any) => {
expect(e.streamId).toBe("1");
});
});

it("should filter by since timestamp", async () => {
const now = Math.floor(Date.now() / 1000);
// Events were inserted at now+1, now+2, now+3 (created) and now+100 (canceled)
// Filtering since now+50 should only return the canceled event
const response = await request(app)
.get("/api/events")
.query({ since: now + 50 });

expect(response.status).toBe(200);
expect(response.body.data).toHaveLength(1);
expect(response.body.data[0].eventType).toBe("canceled");
});

it("should use pageSize parameter (default 20)", async () => {
const response = await request(app)
.get("/api/events")
.query({ pageSize: 2 });

expect(response.status).toBe(200);
expect(response.body.data).toHaveLength(2);
expect(response.body.pageSize).toBe(2);
expect(response.body.total).toBe(4);
expect(response.body.page).toBe(1);
});

it("should combine eventType and streamId filters", async () => {
const response = await request(app)
.get("/api/events")
.query({ eventType: "created", streamId: "1" });

expect(response.status).toBe(200);
expect(response.body.data).toHaveLength(1);
expect(response.body.total).toBe(1);
expect(response.body.data[0].eventType).toBe("created");
expect(response.body.data[0].streamId).toBe("1");
});

it("should combine eventType, streamId, and since filters", async () => {
const now = Math.floor(Date.now() / 1000);
const response = await request(app)
.get("/api/events")
.query({ eventType: "created", streamId: "1", since: now });

expect(response.status).toBe(200);
expect(response.body.data).toHaveLength(1);
expect(response.body.total).toBe(1);
expect(response.body.data[0].eventType).toBe("created");
expect(response.body.data[0].streamId).toBe("1");
});

it("should paginate with pageSize", async () => {
const response = await request(app)
.get("/api/events")
.query({ page: 1, pageSize: 2 });

expect(response.status).toBe(200);
expect(response.body.data).toHaveLength(2);
expect(response.body.total).toBe(4);
expect(response.body.page).toBe(1);
expect(response.body.pageSize).toBe(2);
});
});
});

Expand Down
2 changes: 2 additions & 0 deletions backend/src/middleware/requestLogger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { Request, Response } from "express";
describe("requestLogger", () => {
const originalNodeEnv = process.env.NODE_ENV;
const loggerInfoSpy = vi.spyOn(logger, "info").mockImplementation(() => logger);
vi.spyOn(logger, "child").mockImplementation(() => logger);

beforeEach(() => {
loggerInfoSpy.mockClear();
Expand All @@ -29,6 +30,7 @@ describe("requestLogger", () => {

const res = new EventEmitter() as Response;
(res as any).statusCode = 201;
(res as any).setHeader = vi.fn();

const next = vi.fn();

Expand Down
10 changes: 5 additions & 5 deletions backend/src/services/eventHistory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ describe("eventHistory", () => {

expect(countStreamEvents("stream-1")).toBe(2);

const history = getStreamHistory("stream-1");
const history = getStreamHistory("stream-1", 20, 0, "asc");
expect(history).toHaveLength(2);
expect(history.map((e) => e.eventType)).toEqual(["created", "claimed"]);
});
Expand Down Expand Up @@ -149,7 +149,7 @@ describe("eventHistory", () => {
recordEvent("stream-4", "start_time_updated", 2000);
recordEvent("stream-4", "canceled", 4000);

const history = getStreamHistory("stream-4");
const history = getStreamHistory("stream-4", 20, 0, "asc");

expect(history.map((e) => e.timestamp)).toEqual([1000, 2000, 3000, 4000]);
expect(history.map((e) => e.eventType)).toEqual([
Expand All @@ -167,7 +167,7 @@ describe("eventHistory", () => {
recordEvent("stream-5", "claimed", 1000, "second");
recordEvent("stream-5", "canceled", 1000, "third");

const history = getStreamHistory("stream-5");
const history = getStreamHistory("stream-5", 20, 0, "asc");

expect(history.map((e) => e.actor)).toEqual(["first", "second", "third"]);
});
Expand All @@ -179,8 +179,8 @@ describe("eventHistory", () => {
recordEvent("stream-B", "created", 500);
recordEvent("stream-A", "claimed", 2000);

const historyA = getStreamHistory("stream-A");
const historyB = getStreamHistory("stream-B");
const historyA = getStreamHistory("stream-A", 20, 0, "asc");
const historyB = getStreamHistory("stream-B", 20, 0, "asc");

expect(historyA).toHaveLength(2);
expect(historyA.map((e) => e.timestamp)).toEqual([1000, 2000]);
Expand Down
75 changes: 56 additions & 19 deletions backend/src/services/eventHistory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,37 +112,74 @@ export function getGlobalEvents(
offset: number,
eventType?: StreamEventType,
cursor?: number,
streamId?: string,
since?: number,
): StreamEvent[] {
const db = getDb();
const conditions: string[] = [];
const params: any[] = [];

if (eventType) {
let query = `SELECT * FROM stream_events WHERE event_type = ?`;
const params: any[] = [eventType];
conditions.push("event_type = ?");
params.push(eventType);
}

if (cursor !== undefined) {
conditions.push("id < ?");
params.push(cursor);
}

if (cursor !== undefined) {
query += ` AND id < ?`;
params.push(cursor);
}
if (streamId) {
conditions.push("stream_id = ?");
params.push(streamId);
}

query += ` ORDER BY timestamp DESC, id DESC LIMIT ? OFFSET ?`;
params.push(limit, offset);
if (since !== undefined) {
conditions.push("timestamp > ?");
params.push(since);
}

const rows = db.prepare(query).all(...params) as EventRow[];
return rows.map(rowToEvent);
let query = "SELECT * FROM stream_events";
if (conditions.length > 0) {
query += " WHERE " + conditions.join(" AND ");
}
return getAllEvents(limit, offset, cursor);
query += " ORDER BY timestamp DESC, id DESC LIMIT ? OFFSET ?";
params.push(limit, offset);

const rows = db.prepare(query).all(...params) as EventRow[];
return rows.map(rowToEvent);
}

export function countAllEvents(eventType?: StreamEventType): number {
export function countAllEvents(
eventType?: StreamEventType,
streamId?: string,
since?: number,
): number {
const db = getDb();
const conditions: string[] = [];
const params: any[] = [];

if (eventType) {
const row = db
.prepare(`SELECT COUNT(*) as count FROM stream_events WHERE event_type = ?`)
.get(eventType) as { count: number };
return row.count;
conditions.push("event_type = ?");
params.push(eventType);
}
const row = db
.prepare(`SELECT COUNT(*) as count FROM stream_events`)
.get() as { count: number };

if (streamId) {
conditions.push("stream_id = ?");
params.push(streamId);
}

if (since !== undefined) {
conditions.push("timestamp > ?");
params.push(since);
}

let query = "SELECT COUNT(*) as count FROM stream_events";
if (conditions.length > 0) {
query += " WHERE " + conditions.join(" AND ");
}

const row = db.prepare(query).get(...params) as { count: number };
return row.count;
}

Expand Down
15 changes: 13 additions & 2 deletions backend/src/services/streamStore.reconcile.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const mockState = vi.hoisted(() => ({
const dbMocks = vi.hoisted(() => ({
initDb: vi.fn(),
getDb: vi.fn(),
syncFtsIndex: vi.fn(),
}));

const eventHistoryMocks = vi.hoisted(() => ({
Expand All @@ -38,9 +39,19 @@ const eventHistoryMocks = vi.hoisted(() => ({
}),
}));

const loggerMocks = vi.hoisted(() => ({
logger: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
},
}));

vi.mock("./db", () => dbMocks);
vi.mock("./eventHistory", () => eventHistoryMocks);
vi.mock("./webhook", () => ({ triggerWebhook: vi.fn() }));
vi.mock("../logger", () => loggerMocks);

vi.mock("@stellar/stellar-sdk", () => {
class MockContract {
Expand Down Expand Up @@ -316,7 +327,7 @@ describe("reconcileMissingStreams – sync correctness", () => {
// Trigger timeout on all RPC calls
mockState.simulateTimeout = true;

const errorSpy = vi.spyOn(console, "error").mockImplementation(() => undefined);
const errorSpy = loggerMocks.logger.error;

const { initSoroban, reconcileMissingStreams } = await import("./streamStore");
await initSoroban();
Expand All @@ -327,7 +338,7 @@ describe("reconcileMissingStreams – sync correctness", () => {
expect(errorSpy).toHaveBeenCalled();
expect(mockState.upsertedStreams).toHaveLength(0);

errorSpy.mockRestore();
errorSpy.mockClear();
});

it("is idempotent — running twice does not duplicate rows or events", async () => {
Expand Down
18 changes: 15 additions & 3 deletions backend/src/services/streamStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const mockState = vi.hoisted(() => ({
const dbMocks = vi.hoisted(() => ({
initDb: vi.fn(),
getDb: vi.fn(),
syncFtsIndex: vi.fn(),
}));

const eventHistoryMocks = vi.hoisted(() => ({
Expand All @@ -33,11 +34,21 @@ const eventHistoryMocks = vi.hoisted(() => ({
}),
}));

const loggerMocks = vi.hoisted(() => ({
logger: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
},
}));

vi.mock("./db", () => dbMocks);
vi.mock("./eventHistory", () => eventHistoryMocks);
vi.mock("./webhook", () => ({
triggerWebhook: vi.fn(),
}));
vi.mock("../logger", () => loggerMocks);

vi.mock("@stellar/stellar-sdk", () => {
class MockContract {
Expand Down Expand Up @@ -272,7 +283,7 @@ describe("reconcileMissingStreams", () => {
mockState.nextId = 3;
mockState.existingStreamIds = new Set(["1"]);

const errorSpy = vi.spyOn(console, "error").mockImplementation(() => undefined);
const errorSpy = loggerMocks.logger.error;

const { initSoroban, reconcileMissingStreams } = await import("./streamStore");

Expand All @@ -281,11 +292,12 @@ describe("reconcileMissingStreams", () => {

expect(repaired).toBe(0);
expect(errorSpy).toHaveBeenCalledWith(
"[reconciliation] missing stream 2 could not be fetched from chain",
{ streamId: 2 },
"missing stream could not be fetched from chain",
);
expect(eventHistoryMocks.recordEventWithDb).not.toHaveBeenCalled();

errorSpy.mockRestore();
errorSpy.mockClear();
});
});

Expand Down
Loading
Loading