Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions server/test/integ/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Integration tests

End-to-end tests that exercise the real Coop stack — Postgres, Scylla,
ClickHouse, Redis, and an inline item-processing worker. Unlike unit tests
(which mock the data warehouse), these run against the same services that
`npm start` uses.

These tests implement the scenarios filed under issue
[#288](https://github.com/roostorg/coop/issues/288).

## Running

From the repo root:

```bash
npm run up # boot postgres, clickhouse, scylla, redis, hma, otel
npm run db:update # apply Postgres + ClickHouse migrations
cd server && npm run test:integ
```

`npm run up` opens Jaeger at <http://localhost:16686>. Stop infra with
`npm run down` when done.

## Layout

| File | Purpose |
| --------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| `setupIntegrationServer.ts` | Boots the real IoC container, starts the express app and `ItemProcessingWorker` inline, returns a `supertest` agent + shutdown handle. |
| `wait.ts` | Polling helpers — `waitForItemInScylla`, `waitForItemInClickHouse`, generic `waitFor`. |
| `*.integ.test.ts` | The tests themselves. Picked up by `jest.integ.config.cjs`, excluded from the unit `jest.config.cjs`. |

Fixture helpers (`createOrg`, `createContentItemTypes`, ...) live in
`server/test/fixtureHelpers/` and are shared with unit tests.

## Conventions

- One `describe` per scenario; one or more `test()`s inside.
- Generate unique `orgId` / `itemId` per `describe` so concurrent runs don't
collide.
- `beforeAll` boots the harness and creates fixtures; `afterAll` cleans them
up and calls `harness.shutdown()`.
- Default timeouts: 60s for `beforeAll`/`test`, 30s for `afterAll`.
- Polls default to 250ms interval, 30s timeout — override per-call when a
scenario is known to be faster or slower.

## CI

Not yet wired. A follow-up PR will add a workflow that boots the
`docker-compose.yaml` services and runs `npm run test:integ`.
100 changes: 100 additions & 0 deletions server/test/integ/items-submission.integ.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Integration test for #339: end-to-end item submission.
*
* Submits an item via POST /api/v1/items/async against a real running stack
* (Postgres, Scylla, ClickHouse, Redis) and asserts the item lands in both
* Scylla (item_submission_by_thread) and ClickHouse (CONTENT_API_REQUESTS).
*
* Run with: npm run test:integ
* Requires: `npm run up && npm run db:update`
*/
import { ScalarTypes } from '@roostorg/types';
import { uid } from 'uid';

import createContentItemTypes from '../fixtureHelpers/createContentItemTypes.js';
import createOrg from '../fixtureHelpers/createOrg.js';
import {
makeIntegrationServer,
type IntegrationServer,
} from './setupIntegrationServer.js';
import { waitForItemInClickHouse, waitForItemInScylla } from './wait.js';

describe('Items submission (integration)', () => {
const orgId = uid();
let harness: IntegrationServer | undefined;
let apiKey: string;
let orgCleanup: (() => Promise<void>) | undefined;
let itemTypeCleanup: (() => Promise<void>) | undefined;
let itemTypeId: string;

beforeAll(async () => {
harness = await makeIntegrationServer();

const orgFixture = await createOrg(
{
KyselyPg: harness.deps.KyselyPg,
ModerationConfigService: harness.deps.ModerationConfigService,
ApiKeyService: harness.deps.ApiKeyService,
},
orgId,
);
apiKey = orgFixture.apiKey;
orgCleanup = orgFixture.cleanup;

const itemTypeFixture = await createContentItemTypes({
moderationConfigService: harness.deps.ModerationConfigService,
orgId,
extra: {
fields: [
{
name: 'text',
type: ScalarTypes.STRING,
required: true,
container: null,
},
],
},
});
itemTypeId = itemTypeFixture.itemTypes[0].id;
itemTypeCleanup = itemTypeFixture.cleanup;
}, 60_000);

afterAll(async () => {
// Guard each step so a failure in `beforeAll` doesn't trigger a second,
// misleading "X is not a function" error here that masks the root cause.
try {
await itemTypeCleanup?.();
await orgCleanup?.();
} finally {
await harness?.shutdown();
}
}, 30_000);

test('submitted item lands in Scylla and ClickHouse', async () => {
if (!harness) throw new Error('harness was not initialized');
const itemId = uid();

await harness.request
.post('/api/v1/items/async')
.set('x-api-key', apiKey)
.send({
items: [
{ id: itemId, typeId: itemTypeId, data: { text: 'hello integ' } },
],
})
.expect(202);

const scyllaRow = await waitForItemInScylla(harness.deps, {
orgId,
itemIdentifier: { id: itemId, typeId: itemTypeId },
});
expect(scyllaRow).toBeDefined();
expect(scyllaRow.org_id).toBe(orgId);

const chRow = await waitForItemInClickHouse(harness.deps, {
orgId,
itemIdentifier: { id: itemId, typeId: itemTypeId },
});
expect(chRow).toBeDefined();
}, 60_000);
});
96 changes: 96 additions & 0 deletions server/test/integ/setupIntegrationServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Integration test harness: boots the real IoC container against running infra
* (Postgres, Scylla, ClickHouse, Redis) and starts the item-processing worker
* inline so that submissions land in the data stores within the same process.
*
* Requires the docker-compose stack from `npm run up` and migrations applied
* via `npm run db:update`.
*/
// Load .env before any module that reads process.env (notably the IoC
// container). The unit-test `npm test` path goes through dotenv via its
// NODE_OPTIONS; `test:integ` does not, so we do it here.
import 'dotenv/config';

import * as superTest from 'supertest';

import getBottle, { type Dependencies } from '../../iocContainer/index.js';
import makeServer from '../../server.js';

export type IntegrationServer = {
deps: Dependencies;
request: ReturnType<typeof superTest.agent>;
shutdown: () => Promise<void>;
};

export async function makeIntegrationServer(): Promise<IntegrationServer> {
const bottle = await getBottle();
const deps = bottle.container as Dependencies;

const { app, shutdown: shutdownServer } = await makeServer(deps);
const request = superTest.agent(app);

const workerAbort = new AbortController();
// Run the worker in the background — its run() promise only settles on error
// or shutdown, so we don't await it here.
const workerRun = deps.ItemProcessingWorker.run(workerAbort.signal);
workerRun.catch((err) => {
console.error('ItemProcessingWorker exited with error', err);
});

return {
deps,
request,
async shutdown() {
// Best-effort teardown: run every step even if an earlier one throws,
// so we don't leak the server or shared resources into the next test.
workerAbort.abort();

const runStep = async (
fn: () => Promise<void>,
): Promise<unknown | null> => {
try {
await fn();
return null;
} catch (err) {
return err;
}
};

// Awaited left-to-right inside the array literal, so steps still run
// sequentially — closeSharedResourcesForShutdown depends on the worker
// having closed its Redis connection first.
const teardownErrors = [
await runStep(async () => {
await deps.ItemProcessingWorker.shutdown();
}),
await runStep(async () => {
await shutdownServer();
}),
await runStep(async () => {
// BullMQ's Worker.close() already closes the shared ioredis
// connection, which makes closeSharedResourcesForShutdown throw
// "Connection is closed" when it tries to quit() redis a second
// time. That specific error is benign — every shared resource is
// already torn down — so we swallow it here rather than leak the
// failure into afterAll.
await deps.closeSharedResourcesForShutdown().catch((err) => {
if (
err instanceof Error &&
err.message === 'Connection is closed.'
) {
return;
}
throw err;
});
}),
].filter((e): e is unknown => e !== null);

if (teardownErrors.length > 0) {
throw new AggregateError(
teardownErrors,
'Integration server shutdown failed',
);
}
},
};
}
107 changes: 107 additions & 0 deletions server/test/integ/wait.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Polling helpers for integration tests.
*
* Item submission is async (POST returns 202, worker processes off Redis,
* writes to Scylla and ClickHouse), so tests poll the data stores until the
* row appears or a timeout elapses.
*/
import { type ItemIdentifier } from '@roostorg/types';

import { type Dependencies } from '../../iocContainer/index.js';
import { itemIdentifierToScyllaItemIdentifier } from '../../scylla/index.js';

const DEFAULT_TIMEOUT_MS = 30_000;
const DEFAULT_INTERVAL_MS = 250;

export async function waitFor<T>(
what: string,
check: () => Promise<T | null | undefined>,
opts: { timeoutMs?: number; intervalMs?: number } = {},
): Promise<T> {
const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS;
const intervalMs = opts.intervalMs ?? DEFAULT_INTERVAL_MS;
const deadline = Date.now() + timeoutMs;

for (;;) {
const result = await check();
if (result != null) return result;
if (Date.now() >= deadline) {
throw new Error(`Timed out after ${timeoutMs}ms waiting for: ${what}`);
}
await new Promise((r) => setTimeout(r, intervalMs));
}
}

/**
* Query Scylla directly rather than going through ItemInvestigationService:
* that service falls back to the partial-items endpoint and the data warehouse
* if Scylla returns nothing, which would mask a real Scylla write failure as a
* passing test.
*
* Filtering by `item_identifier` alone matches the production lookup path
* (`ItemInvestigationService.getItemByIdentifier`), which leans on the
* secondary index on `item_identifier`. The table's partition key is
* `(org_id, synthetic_thread_id)`, so a partial-partition-key restriction
* (`org_id = ?` without `synthetic_thread_id`) would need `ALLOW FILTERING`.
* Callers should still assert on `org_id` after the row comes back to guard
* against the theoretical cross-org collision.
*
* Query errors are intentionally NOT swallowed — a structural Scylla error
* (bad query, schema drift) should surface immediately instead of polling
* itself into a misleading timeout.
*/
export async function waitForItemInScylla(
deps: Pick<Dependencies, 'Scylla'>,
opts: {
orgId: string;
itemIdentifier: ItemIdentifier;
timeoutMs?: number;
},
) {
return waitFor(
`item ${opts.itemIdentifier.id} in Scylla item_submission_by_thread`,
async () => {
const rows = await deps.Scylla.select({
from: 'item_submission_by_thread',
select: '*',
where: [
[
'item_identifier',
'=',
itemIdentifierToScyllaItemIdentifier(opts.itemIdentifier),
],
],
});
return rows.length > 0 ? rows[0] : null;
},
{ timeoutMs: opts.timeoutMs },
);
}
Comment on lines +53 to +79
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly due to how we had to get things working we have a fallback and ignore scylla errors. Given scylla data is ephemeral and only up to 6 months.

If we want a true e2e integration test we need to query scylla directly to avoid fallback showing a false positive.

export async function waitForItemInScylla(
  deps: Pick<Dependencies, 'Scylla'>,
  opts: { orgId: string; itemIdentifier: ItemIdentifier; timeoutMs?: number },
) {
  return waitFor(
    `item ${opts.itemIdentifier.id} in Scylla item_submission_by_thread`,
    async () => {
      const rows = await deps.Scylla
        .select({
          from: 'item_submission_by_thread',
          select: '*',
          where: [['item_identifier', '=', itemIdentifierToScyllaItemIdentifier(opts.itemIdentifier)]],
        })
        .catch(() => []);
      return rows.length > 0 ? rows[0] : null;
    },
    { timeoutMs: opts.timeoutMs },
  );
}

Something like this would be best.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, going to add that into waitForItemInScylla

Comment thread
coderabbitai[bot] marked this conversation as resolved.

export async function waitForItemInClickHouse(
deps: Pick<Dependencies, 'DataWarehouse' | 'Tracer'>,
opts: {
orgId: string;
itemIdentifier: ItemIdentifier;
timeoutMs?: number;
},
) {
const { orgId, itemIdentifier } = opts;
return waitFor(
`item ${itemIdentifier.id} in ClickHouse CONTENT_API_REQUESTS`,
async () => {
const rows = await deps.DataWarehouse.query(
`SELECT item_id, item_type_id, event
FROM analytics.CONTENT_API_REQUESTS
WHERE org_id = ?
AND item_id = ?
AND item_type_id = ?
LIMIT 1`,
deps.Tracer,
[orgId, itemIdentifier.id, itemIdentifier.typeId],
);
return rows.length > 0 ? rows[0] : null;
},
{ timeoutMs: opts.timeoutMs },
);
}
Loading