diff --git a/server/test/integ/README.md b/server/test/integ/README.md new file mode 100644 index 00000000..062ee546 --- /dev/null +++ b/server/test/integ/README.md @@ -0,0 +1,49 @@ +# Integration tests + +End-to-end tests that exercise the real Coop stack — Postgres, Scylla, +ClickHouse, Redis, and an inline item-processing worker. Unlike unit tests +(which mock the data warehouse), these run against the same services that +`npm start` uses. + +These tests implement the scenarios filed under issue +[#288](https://github.com/roostorg/coop/issues/288). + +## Running + +From the repo root: + +```bash +npm run up # boot postgres, clickhouse, scylla, redis, hma, otel +npm run db:update # apply Postgres + ClickHouse migrations +cd server && npm run test:integ +``` + +`npm run up` opens Jaeger at . Stop infra with +`npm run down` when done. + +## Layout + +| File | Purpose | +| --------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- | +| `setupIntegrationServer.ts` | Boots the real IoC container, starts the express app and `ItemProcessingWorker` inline, returns a `supertest` agent + shutdown handle. | +| `wait.ts` | Polling helpers — `waitForItemInScylla`, `waitForItemInClickHouse`, generic `waitFor`. | +| `*.integ.test.ts` | The tests themselves. Picked up by `jest.integ.config.cjs`, excluded from the unit `jest.config.cjs`. | + +Fixture helpers (`createOrg`, `createContentItemTypes`, ...) live in +`server/test/fixtureHelpers/` and are shared with unit tests. + +## Conventions + +- One `describe` per scenario; one or more `test()`s inside. +- Generate unique `orgId` / `itemId` per `describe` so concurrent runs don't + collide. +- `beforeAll` boots the harness and creates fixtures; `afterAll` cleans them + up and calls `harness.shutdown()`. +- Default timeouts: 60s for `beforeAll`/`test`, 30s for `afterAll`. +- Polls default to 250ms interval, 30s timeout — override per-call when a + scenario is known to be faster or slower. + +## CI + +Not yet wired. A follow-up PR will add a workflow that boots the +`docker-compose.yaml` services and runs `npm run test:integ`. diff --git a/server/test/integ/items-submission.integ.test.ts b/server/test/integ/items-submission.integ.test.ts new file mode 100644 index 00000000..9de811ec --- /dev/null +++ b/server/test/integ/items-submission.integ.test.ts @@ -0,0 +1,100 @@ +/** + * Integration test for #339: end-to-end item submission. + * + * Submits an item via POST /api/v1/items/async against a real running stack + * (Postgres, Scylla, ClickHouse, Redis) and asserts the item lands in both + * Scylla (item_submission_by_thread) and ClickHouse (CONTENT_API_REQUESTS). + * + * Run with: npm run test:integ + * Requires: `npm run up && npm run db:update` + */ +import { ScalarTypes } from '@roostorg/types'; +import { uid } from 'uid'; + +import createContentItemTypes from '../fixtureHelpers/createContentItemTypes.js'; +import createOrg from '../fixtureHelpers/createOrg.js'; +import { + makeIntegrationServer, + type IntegrationServer, +} from './setupIntegrationServer.js'; +import { waitForItemInClickHouse, waitForItemInScylla } from './wait.js'; + +describe('Items submission (integration)', () => { + const orgId = uid(); + let harness: IntegrationServer | undefined; + let apiKey: string; + let orgCleanup: (() => Promise) | undefined; + let itemTypeCleanup: (() => Promise) | undefined; + let itemTypeId: string; + + beforeAll(async () => { + harness = await makeIntegrationServer(); + + const orgFixture = await createOrg( + { + KyselyPg: harness.deps.KyselyPg, + ModerationConfigService: harness.deps.ModerationConfigService, + ApiKeyService: harness.deps.ApiKeyService, + }, + orgId, + ); + apiKey = orgFixture.apiKey; + orgCleanup = orgFixture.cleanup; + + const itemTypeFixture = await createContentItemTypes({ + moderationConfigService: harness.deps.ModerationConfigService, + orgId, + extra: { + fields: [ + { + name: 'text', + type: ScalarTypes.STRING, + required: true, + container: null, + }, + ], + }, + }); + itemTypeId = itemTypeFixture.itemTypes[0].id; + itemTypeCleanup = itemTypeFixture.cleanup; + }, 60_000); + + afterAll(async () => { + // Guard each step so a failure in `beforeAll` doesn't trigger a second, + // misleading "X is not a function" error here that masks the root cause. + try { + await itemTypeCleanup?.(); + await orgCleanup?.(); + } finally { + await harness?.shutdown(); + } + }, 30_000); + + test('submitted item lands in Scylla and ClickHouse', async () => { + if (!harness) throw new Error('harness was not initialized'); + const itemId = uid(); + + await harness.request + .post('/api/v1/items/async') + .set('x-api-key', apiKey) + .send({ + items: [ + { id: itemId, typeId: itemTypeId, data: { text: 'hello integ' } }, + ], + }) + .expect(202); + + const scyllaRow = await waitForItemInScylla(harness.deps, { + orgId, + itemIdentifier: { id: itemId, typeId: itemTypeId }, + }); + expect(scyllaRow).toBeDefined(); + expect(scyllaRow.org_id).toBe(orgId); + + const chRow = await waitForItemInClickHouse(harness.deps, { + orgId, + itemIdentifier: { id: itemId, typeId: itemTypeId }, + }); + expect(chRow).toBeDefined(); + }, 60_000); +}); diff --git a/server/test/integ/setupIntegrationServer.ts b/server/test/integ/setupIntegrationServer.ts new file mode 100644 index 00000000..79d0bca6 --- /dev/null +++ b/server/test/integ/setupIntegrationServer.ts @@ -0,0 +1,96 @@ +/** + * Integration test harness: boots the real IoC container against running infra + * (Postgres, Scylla, ClickHouse, Redis) and starts the item-processing worker + * inline so that submissions land in the data stores within the same process. + * + * Requires the docker-compose stack from `npm run up` and migrations applied + * via `npm run db:update`. + */ +// Load .env before any module that reads process.env (notably the IoC +// container). The unit-test `npm test` path goes through dotenv via its +// NODE_OPTIONS; `test:integ` does not, so we do it here. +import 'dotenv/config'; + +import * as superTest from 'supertest'; + +import getBottle, { type Dependencies } from '../../iocContainer/index.js'; +import makeServer from '../../server.js'; + +export type IntegrationServer = { + deps: Dependencies; + request: ReturnType; + shutdown: () => Promise; +}; + +export async function makeIntegrationServer(): Promise { + const bottle = await getBottle(); + const deps = bottle.container as Dependencies; + + const { app, shutdown: shutdownServer } = await makeServer(deps); + const request = superTest.agent(app); + + const workerAbort = new AbortController(); + // Run the worker in the background — its run() promise only settles on error + // or shutdown, so we don't await it here. + const workerRun = deps.ItemProcessingWorker.run(workerAbort.signal); + workerRun.catch((err) => { + console.error('ItemProcessingWorker exited with error', err); + }); + + return { + deps, + request, + async shutdown() { + // Best-effort teardown: run every step even if an earlier one throws, + // so we don't leak the server or shared resources into the next test. + workerAbort.abort(); + + const runStep = async ( + fn: () => Promise, + ): Promise => { + try { + await fn(); + return null; + } catch (err) { + return err; + } + }; + + // Awaited left-to-right inside the array literal, so steps still run + // sequentially — closeSharedResourcesForShutdown depends on the worker + // having closed its Redis connection first. + const teardownErrors = [ + await runStep(async () => { + await deps.ItemProcessingWorker.shutdown(); + }), + await runStep(async () => { + await shutdownServer(); + }), + await runStep(async () => { + // BullMQ's Worker.close() already closes the shared ioredis + // connection, which makes closeSharedResourcesForShutdown throw + // "Connection is closed" when it tries to quit() redis a second + // time. That specific error is benign — every shared resource is + // already torn down — so we swallow it here rather than leak the + // failure into afterAll. + await deps.closeSharedResourcesForShutdown().catch((err) => { + if ( + err instanceof Error && + err.message === 'Connection is closed.' + ) { + return; + } + throw err; + }); + }), + ].filter((e): e is unknown => e !== null); + + if (teardownErrors.length > 0) { + throw new AggregateError( + teardownErrors, + 'Integration server shutdown failed', + ); + } + }, + }; +} diff --git a/server/test/integ/wait.ts b/server/test/integ/wait.ts new file mode 100644 index 00000000..ebb0c6e2 --- /dev/null +++ b/server/test/integ/wait.ts @@ -0,0 +1,107 @@ +/** + * Polling helpers for integration tests. + * + * Item submission is async (POST returns 202, worker processes off Redis, + * writes to Scylla and ClickHouse), so tests poll the data stores until the + * row appears or a timeout elapses. + */ +import { type ItemIdentifier } from '@roostorg/types'; + +import { type Dependencies } from '../../iocContainer/index.js'; +import { itemIdentifierToScyllaItemIdentifier } from '../../scylla/index.js'; + +const DEFAULT_TIMEOUT_MS = 30_000; +const DEFAULT_INTERVAL_MS = 250; + +export async function waitFor( + what: string, + check: () => Promise, + opts: { timeoutMs?: number; intervalMs?: number } = {}, +): Promise { + const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const intervalMs = opts.intervalMs ?? DEFAULT_INTERVAL_MS; + const deadline = Date.now() + timeoutMs; + + for (;;) { + const result = await check(); + if (result != null) return result; + if (Date.now() >= deadline) { + throw new Error(`Timed out after ${timeoutMs}ms waiting for: ${what}`); + } + await new Promise((r) => setTimeout(r, intervalMs)); + } +} + +/** + * Query Scylla directly rather than going through ItemInvestigationService: + * that service falls back to the partial-items endpoint and the data warehouse + * if Scylla returns nothing, which would mask a real Scylla write failure as a + * passing test. + * + * Filtering by `item_identifier` alone matches the production lookup path + * (`ItemInvestigationService.getItemByIdentifier`), which leans on the + * secondary index on `item_identifier`. The table's partition key is + * `(org_id, synthetic_thread_id)`, so a partial-partition-key restriction + * (`org_id = ?` without `synthetic_thread_id`) would need `ALLOW FILTERING`. + * Callers should still assert on `org_id` after the row comes back to guard + * against the theoretical cross-org collision. + * + * Query errors are intentionally NOT swallowed — a structural Scylla error + * (bad query, schema drift) should surface immediately instead of polling + * itself into a misleading timeout. + */ +export async function waitForItemInScylla( + deps: Pick, + opts: { + orgId: string; + itemIdentifier: ItemIdentifier; + timeoutMs?: number; + }, +) { + return waitFor( + `item ${opts.itemIdentifier.id} in Scylla item_submission_by_thread`, + async () => { + const rows = await deps.Scylla.select({ + from: 'item_submission_by_thread', + select: '*', + where: [ + [ + 'item_identifier', + '=', + itemIdentifierToScyllaItemIdentifier(opts.itemIdentifier), + ], + ], + }); + return rows.length > 0 ? rows[0] : null; + }, + { timeoutMs: opts.timeoutMs }, + ); +} + +export async function waitForItemInClickHouse( + deps: Pick, + opts: { + orgId: string; + itemIdentifier: ItemIdentifier; + timeoutMs?: number; + }, +) { + const { orgId, itemIdentifier } = opts; + return waitFor( + `item ${itemIdentifier.id} in ClickHouse CONTENT_API_REQUESTS`, + async () => { + const rows = await deps.DataWarehouse.query( + `SELECT item_id, item_type_id, event + FROM analytics.CONTENT_API_REQUESTS + WHERE org_id = ? + AND item_id = ? + AND item_type_id = ? + LIMIT 1`, + deps.Tracer, + [orgId, itemIdentifier.id, itemIdentifier.typeId], + ); + return rows.length > 0 ? rows[0] : null; + }, + { timeoutMs: opts.timeoutMs }, + ); +}