diff --git a/prover_v2/README.md b/prover_v2/README.md new file mode 100644 index 00000000..9ec8da71 --- /dev/null +++ b/prover_v2/README.md @@ -0,0 +1,300 @@ +# Pulsar Prover v2 + +ZK proof pipeline that syncs Cosmos (Tendermint) blocks, generates settlement proofs using o1js, and submits them to a Mina smart contract. + +## Architecture Overview + +``` +Cosmos Chain (gRPC) + │ + ▼ + Sync Module ──► storeBlock() ──► storeBlockInBlockEpoch() + │ + epoch full? ──yes──► BullMQ: block-prover + │ + ▼ + BlockProver Worker + (ZK leaf proof) + │ + sibling ready? ──yes──► BullMQ: aggregator + │ + ▼ + Aggregator Worker + (merge proof pair) + │ + is root? ──yes──► BullMQ: settler + no──► next aggregation │ + ▼ + Settler Worker + (submit to Mina chain) +``` + +The pipeline is **event-driven**: each stage triggers the next upon completion. There are no polling loops. + +## Core Concepts + +### Block Epoch + +A group of `BLOCK_EPOCH_SIZE` (8) consecutive blocks. When all 8 blocks are synced, a BlockProver job is enqueued. + +``` +BlockEpoch { height: 0, blocks: [block0, block1, ..., block7] } +BlockEpoch { height: 8, blocks: [block8, block9, ..., block15] } +``` + +### Proof Epoch & Binary Tree + +A ProofEpoch holds `PROOF_EPOCH_LEAF_COUNT` (4) leaf proofs and their aggregated parents in a binary tree: + +``` + [6] ← root (settlement index) + / \ + [4] [5] ← aggregated proofs + / \ / \ + [0] [1] [2] [3] ← leaf proofs (from BlockProver) +``` + +`proofs[]` array size = `LEAF_COUNT * 2 - 1` = 7 slots. + +When both siblings exist (e.g. [0] and [1]), an Aggregator job merges them into the parent ([4]). This continues up the tree until the root proof at index 6 is produced, which the Settler submits on-chain. + +### Deterministic Job IDs + +Every BullMQ job uses a deterministic ID: +- BlockProver: `bp:{height}` +- Aggregator: `agg:{height}:{index}` +- Settler: `settle:{height}` + +This prevents duplicate jobs. If a server crashes and restarts, the recovery sweep can safely re-enqueue without creating duplicates. + +## Pipeline Stages + +### 1. Sync (`modules/pulsar/`) + +Polls the Cosmos chain via gRPC for new blocks. For each block: + +1. Fetches block header, validator set, and vote extensions +2. Computes validator list hash (Poseidon hash over Mina public keys) +3. Calls `storeBlock()` — upserts block into MongoDB +4. Calls `storeBlockInBlockEpoch()` — places block reference in its epoch slot +5. If the epoch is full (all 8 slots filled), enqueues a `block-prover` BullMQ job + +**Key file:** `modules/pulsar/utils.ts` — `storePulsarBlock()` + +### 2. BlockProver (`modules/processors/block-prover/`) + +Generates a ZK leaf proof from 8 consecutive blocks. + +1. **Idempotency check:** If `proofEpoch.proofs[leafIndex]` already exists, skip ZK computation +2. Fetches the 8 blocks from MongoDB +3. For each consecutive pair, creates a `PulsarBlock` and collects `SignaturePublicKeyList` +4. Calls `GenerateSettlementProof()` (o1js ZK circuit) +5. Stores proof JSON in MongoDB, updates ProofEpoch +6. Calls `tryEnqueueAggregation()` — checks if sibling leaf is ready + +**Key file:** `modules/processors/block-prover/worker.ts` + +### 3. Aggregator (`modules/processors/aggregator/`) + +Merges two sibling proofs into their parent. + +1. **Idempotency check:** If `proofEpoch.proofs[parentIndex]` already exists, skip +2. Fetches left and right proof JSON from MongoDB +3. Calls `MergeSettlementProofs()` (o1js recursive proof merge) +4. Stores merged proof, updates ProofEpoch +5. If parent is the root (settlement index) → triggers Settler +6. Otherwise → triggers next level of aggregation via `tryEnqueueAggregation()` + +**Key file:** `modules/processors/aggregator/worker.ts` + +### 4. Settler (`modules/processors/settler/`) + +Submits the final root proof to the Mina blockchain. + +1. **Idempotency check:** If `proofEpoch.settled === true`, skip +2. Fetches the settlement proof from MongoDB +3. Connects to Mina network, instantiates `SettlementContract` +4. Calls `contractInstance.settle(settlementProof)` +5. Marks `settled = true` in MongoDB + +**Key file:** `modules/processors/settler/worker.ts` + +## BullMQ Configuration + +All queues share the same job options: + +| Setting | Value | Purpose | +|---------|-------|---------| +| `attempts` | 3 | Max retries before moving to failed set | +| `backoff` | exponential, 10s base | 10s → 20s → 40s between retries | +| `removeOnComplete` | 24h / 1000 jobs | Auto-cleanup of completed jobs | +| `removeOnFail` | 7 days | Keep failed jobs for debugging | +| `lockDuration` | 5 minutes | Worker must heartbeat within this window | +| `stalledInterval` | 5 seconds | How often BullMQ checks for stalled jobs | +| `concurrency` | 1 per worker | Each worker processes one job at a time | + +Worker counts: 10 block-prover, 10 aggregator, 2 settler. + +### Crash Recovery + +BullMQ handles most crash scenarios automatically: +- **Worker dies mid-job:** Lock expires after `lockDuration`, job is re-queued +- **Redis disconnects:** ioredis auto-reconnects +- **Job fails 3 times:** Moved to BullMQ's "failed" set, monitor alerts + +For edge cases (crash between proof storage and job enqueue), there's a **startup recovery sweep** (`modules/processors/recovery.ts`) that runs on every boot: + +1. Finds full BlockEpochs without corresponding leaf proofs → enqueues BlockProver +2. Finds ProofEpochs with sibling pairs but missing parent → enqueues Aggregator +3. Finds root proofs that aren't settled → enqueues Settler + +Safe to run repeatedly thanks to deterministic job IDs. + +## MongoDB Models + +### Block +``` +{ height, stateRoot, validators[], validatorListHash, voteExt[] } +``` +Raw block data from the Cosmos chain. + +### BlockEpoch +``` +{ height, blocks[8] } +``` +Groups 8 block references. `blocks[i]` is either a Block ObjectId or null. + +### ProofEpoch +``` +{ height, proofs[7], settled } +``` +Binary tree of proofs. `settled` marks whether the root proof has been submitted on-chain. + +### Proof +``` +{ data } +``` +Serialized ZK proof JSON. + +## Monitor (`modules/monitor/`) + +Polls BullMQ queue health every 30 seconds: +- Checks `getFailedCount()`, `getWaitingCount()`, `getActiveCount()` for each queue +- Logs warnings when failed jobs are detected + +## Trigger Logic (`modules/processors/triggers.ts`) + +Generic binary tree navigation used by both BlockProver and Aggregator: + +```typescript +siblingIndex = completedIndex % 2 === 0 ? completedIndex + 1 : completedIndex - 1 +parentIndex = PROOF_EPOCH_LEAF_COUNT + Math.floor(completedIndex / 2) +``` + +- `tryEnqueueAggregation(proofEpoch, completedIndex)` — checks sibling, enqueues merge +- `tryEnqueueSettlement(proofEpoch)` — enqueues settler if root proof exists and not yet settled + +## Key Constants (`modules/utils/constants.ts`) + +| Constant | Value | Description | +|----------|-------|-------------| +| `BLOCK_EPOCH_SIZE` | 8 | Blocks per epoch | +| `PROOF_EPOCH_LEAF_COUNT` | 4 | Leaf proofs per proof epoch | +| `PROOF_EPOCH_SETTLEMENT_INDEX` | 6 | Root proof slot in proofs[] | +| `WORKER_COUNT` | 10 | Workers per queue (except settler: 2) | +| `WORKER_TIMEOUT_MS` | 300,000 | 5 min lock duration | +| `STALLED_INTERVAL_MS` | 5,000 | Stalled check frequency | +| `POLL_INTERVAL_MS` | 5,000 | Cosmos chain polling interval | +| `MONITOR_INTERVAL_MS` | 30,000 | Queue health check interval | + +## Project Structure + +``` +src/modules/ +├── pulsar/ # Cosmos chain sync +│ ├── sync.ts # Block polling loop +│ └── utils.ts # gRPC helpers, storePulsarBlock() +├── processors/ +│ ├── block-prover/ +│ │ └── worker.ts # ZK leaf proof generation +│ ├── aggregator/ +│ │ └── worker.ts # Recursive proof merging +│ ├── settler/ +│ │ └── worker.ts # On-chain settlement +│ ├── triggers.ts # Event-driven stage transitions +│ ├── pipeline.ts # PipelineManager, worker lifecycle +│ ├── recovery.ts # Startup recovery sweep +│ └── utils/ +│ ├── queue.ts # BullMQ queue instances +│ ├── jobs.ts # Job type definitions +│ └── jobOptions.ts # Shared job config, deterministic IDs +├── db/ +│ ├── models/ +│ │ ├── block/ # Block schema + utils +│ │ ├── blockEpoch/ # BlockEpoch schema + utils +│ │ ├── proofEpoch/ # ProofEpoch schema + utils +│ │ └── proof/ # Proof schema + utils +│ └── index.ts # Re-exports +├── monitor/ +│ └── monitor.ts # BullMQ queue health monitoring +└── utils/ + ├── constants.ts + ├── interfaces.ts + └── functions.ts +``` + +## What Changed (Refactoring Summary) + +### Problem + +The previous architecture duplicated BullMQ's built-in capabilities in MongoDB: +- `status[]` arrays on BlockEpoch/ProofEpoch tracked "waiting"/"processing"/"done" — but BullMQ already manages job states +- `failCount` fields reimplemented retry logic — but BullMQ has `attempts` + `backoff` +- `timeoutAt` fields reimplemented timeout detection — but BullMQ has `lockDuration` + stalled detection +- Master classes polled MongoDB in `while(true)` loops looking for "waiting" records — creating unnecessary load and latency +- If a server crashed while a record was in "processing" status, it would stay stuck forever + +### Solution + +**MongoDB is now a pure data store.** All job orchestration is handled by BullMQ. + +| Before | After | +|--------|-------| +| Master polling loops | Event-driven push (worker triggers next stage) | +| MongoDB `status[]` fields | BullMQ job states | +| MongoDB `failCount` | BullMQ `attempts` + exponential backoff | +| MongoDB `timeoutAt` | BullMQ `lockDuration` + stalled detection | +| Manual crash recovery (none) | Automatic re-queue + startup recovery sweep | +| 15 hardcoded aggregation patterns | Generic binary tree formula | +| Mongoose transactions in workers | Idempotent upserts | + +### Deleted + +- `processors/base/Master.ts` — Base polling class +- `processors/block-prover/master.ts` — BlockProver polling loop +- `processors/aggregator/master.ts` — Aggregator polling loop +- `processors/settler/master.ts` — Settler polling loop +- `processors/block-prover/utils.ts` — Status registration helpers +- `db/types.ts` — `BlockStatus`, `ProofStatus`, `ProofKind` enums + +### Added + +- `processors/triggers.ts` — Event-driven stage transitions +- `processors/pipeline.ts` — PipelineManager (worker lifecycle + graceful shutdown) +- `processors/recovery.ts` — Startup recovery sweep +- `processors/utils/jobOptions.ts` — Shared BullMQ config + deterministic job ID generators + +### Simplified + +- **BlockEpoch schema:** Removed `status[]`, `epochStatus`, `failCount`, `timeoutAt` +- **ProofEpoch schema:** Removed `status[]`, `kind`, `failCount`, `timeoutAt`; added `settled: boolean` +- **Block schema:** Removed `status`, `timeoutAt` +- **Monitor:** Now checks BullMQ queue health instead of MongoDB failCount + +## Running Tests + +```bash +npx vitest run +``` + +12 test files, 72 tests covering all workers, triggers, recovery, monitor, and database utilities. diff --git a/prover_v2/src/modules/db/index.ts b/prover_v2/src/modules/db/index.ts index 42e96971..7f8f08e8 100644 --- a/prover_v2/src/modules/db/index.ts +++ b/prover_v2/src/modules/db/index.ts @@ -19,12 +19,7 @@ export { ProofEpochModel, type IProofEpoch, } from "./models/proofEpoch/ProofEpoch.js"; -export { - getProofEpoch, - storeProofInProofEpoch, - deleteProofEpoch, - incrementProofEpochFailCount, -} from "./models/proofEpoch/utils.js"; +export { getProofEpoch, deleteProofEpoch } from "./models/proofEpoch/utils.js"; // BlockEpoch export { @@ -34,10 +29,5 @@ export { export { getBlockEpoch, storeBlockInBlockEpoch, - updateBlockStatusInEpoch, deleteBlockEpoch, - incrementBlockEpochFailCount, } from "./models/blockEpoch/utils.js"; - -// Types -export type { ProofKind, ProofStatus, BlockStatus } from "./types.js"; diff --git a/prover_v2/src/modules/db/models/block/Block.ts b/prover_v2/src/modules/db/models/block/Block.ts index 47393c84..5d659a63 100644 --- a/prover_v2/src/modules/db/models/block/Block.ts +++ b/prover_v2/src/modules/db/models/block/Block.ts @@ -1,15 +1,12 @@ import mongoose, { Schema, Document } from "mongoose"; import { VoteExt } from "../../../utils/interfaces"; -import { BlockStatus } from "../../types.js"; export interface IBlock extends Document { height: number; - status: BlockStatus; stateRoot: string; validators: string[]; validatorListHash: string; voteExt: VoteExt[]; - timeoutAt?: Date; } const VoteExtSchema = new Schema( @@ -25,16 +22,10 @@ const VoteExtSchema = new Schema( const BlockSchema = new Schema( { height: { type: Number, required: true, unique: true, index: true }, - status: { - type: String, - enum: ["waiting", "processing", "done", "failed"], - default: "waiting", - }, stateRoot: { type: String, required: true }, validators: [{ type: String }], validatorListHash: { type: String, required: true }, voteExt: [VoteExtSchema], - timeoutAt: { type: Date }, }, { timestamps: true }, ); diff --git a/prover_v2/src/modules/db/models/block/utils.test.ts b/prover_v2/src/modules/db/models/block/utils.test.ts deleted file mode 100644 index 1e4fcf4c..00000000 --- a/prover_v2/src/modules/db/models/block/utils.test.ts +++ /dev/null @@ -1,135 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { - storeBlock, - getBlock, - fetchBlockRange, - fetchLastStoredBlock, - seedInitialBlocks, -} from "./utils.js"; -import { BlockModel } from "./Block.js"; - -vi.mock("./Block.js"); -vi.mock("../../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -describe("db block utils", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("storeBlock upserts block with timeout and waiting status on insert", async () => { - vi.mocked(BlockModel.updateOne).mockResolvedValue({} as any); - - const block = { - height: 10, - stateRoot: "root", - validators: ["v1", "v2"], - validatorListHash: "hash", - voteExt: [], - } as any; - - await storeBlock(block); - - expect(BlockModel.updateOne).toHaveBeenCalledWith( - { height: 10 }, - expect.objectContaining({ - $set: { - stateRoot: "root", - validators: ["v1", "v2"], - validatorListHash: "hash", - voteExt: [], - }, - $setOnInsert: expect.objectContaining({ - status: "waiting", - timeoutAt: expect.any(Date), - }), - }), - { upsert: true }, - ); - }); - - it("getBlock finds block by height", async () => { - const mockBlock = { height: 5 } as any; - vi.mocked(BlockModel.findOne).mockResolvedValue(mockBlock); - - const result = await getBlock(5); - - expect(BlockModel.findOne).toHaveBeenCalledWith({ height: 5 }); - expect(result).toBe(mockBlock); - }); - - it("fetchBlockRange queries by height range and sorts ascending", async () => { - const mockBlocks = [ - { height: 1 }, - { height: 2 }, - { height: 3 }, - ] as any[]; - const sortMock = vi.fn().mockResolvedValue(mockBlocks); - vi.mocked(BlockModel.find).mockReturnValue({ sort: sortMock } as any); - - const result = await fetchBlockRange(1, 3); - - expect(BlockModel.find).toHaveBeenCalledWith({ - height: { $gte: 1, $lte: 3 }, - }); - expect(sortMock).toHaveBeenCalledWith({ height: 1 }); - expect(result).toEqual(mockBlocks); - }); - - it("fetchBlockRange duplicates first block when rangeLow < 0", async () => { - const mockBlocks = [{ height: 0 }, { height: 1 }] as any[]; - const sortMock = vi.fn().mockResolvedValue([...mockBlocks]); - vi.mocked(BlockModel.find).mockReturnValue({ sort: sortMock } as any); - - const result = await fetchBlockRange(-1, 1); - - expect(result.length).toBe(3); - expect(result[0]).toEqual(mockBlocks[0]); - expect(result[1]).toEqual(mockBlocks[0]); - expect(result[2]).toEqual(mockBlocks[1]); - }); - - it("fetchLastStoredBlock returns null and logs warn when no block", async () => { - vi.mocked(BlockModel.findOne).mockReturnValue({ - sort: vi.fn().mockResolvedValue(null), - } as any); - - const result = await fetchLastStoredBlock(); - - expect(result).toBeNull(); - const logger = await import("../../../../logger.js"); - expect(logger.default.warn).toHaveBeenCalledWith( - "No blocks found in the database.", - ); - }); - - it("fetchLastStoredBlock returns last block and logs info", async () => { - const mockBlock = { height: 42 } as any; - vi.mocked(BlockModel.findOne).mockReturnValue({ - sort: vi.fn().mockResolvedValue(mockBlock), - } as any); - - const result = await fetchLastStoredBlock(); - - expect(result).toBe(mockBlock); - const logger = await import("../../../../logger.js"); - expect(logger.default.info).toHaveBeenCalledWith( - "Fetched last stored block at height 42.", - ); - }); - - it("seedInitialBlocks returns early when genesis block exists", async () => { - vi.mocked(BlockModel.exists).mockResolvedValue(true as any); - - await seedInitialBlocks(); - - expect(BlockModel.create).not.toHaveBeenCalled(); - }); -}); - diff --git a/prover_v2/src/modules/db/models/block/utils.ts b/prover_v2/src/modules/db/models/block/utils.ts index e5151ec5..78c20d61 100644 --- a/prover_v2/src/modules/db/models/block/utils.ts +++ b/prover_v2/src/modules/db/models/block/utils.ts @@ -1,11 +1,10 @@ import { BlockModel, IBlock } from "./Block.js"; import { BlockData } from "../../../utils/interfaces.js"; -import { TIMEOUT_TIME_MS } from "../../../utils/constants.js"; import logger from "../../../../logger.js"; import { Signature } from "o1js"; export async function storeBlock(block: BlockData) { - await BlockModel.updateOne( + const result = await BlockModel.findOneAndUpdate( { height: block.height }, { $set: { @@ -14,15 +13,13 @@ export async function storeBlock(block: BlockData) { validatorListHash: block.validatorListHash, voteExt: block.voteExt, }, - $setOnInsert: { - status: "waiting", - timeoutAt: new Date(Date.now() + TIMEOUT_TIME_MS), - }, }, - { upsert: true }, + { upsert: true, new: true }, ); logger.info(`Stored block at height ${block.height}.`); + + return result; } export async function getBlock(height: number) { @@ -66,7 +63,6 @@ export async function seedInitialBlocks() { await BlockModel.create({ height: 0, - status: "done", stateRoot: BigInt( "0x" + Buffer.from( @@ -82,7 +78,6 @@ export async function seedInitialBlocks() { await BlockModel.create({ height: 1, - status: "done", stateRoot: BigInt( "0x" + Buffer.from( diff --git a/prover_v2/src/modules/db/models/blockEpoch/BlockEpoch.ts b/prover_v2/src/modules/db/models/blockEpoch/BlockEpoch.ts index c7b9f41b..6e0c02a1 100644 --- a/prover_v2/src/modules/db/models/blockEpoch/BlockEpoch.ts +++ b/prover_v2/src/modules/db/models/blockEpoch/BlockEpoch.ts @@ -1,17 +1,8 @@ import mongoose, { Schema, Document, Types } from "mongoose"; -import { - BLOCK_EPOCH_SIZE, - WORKER_TIMEOUT_MS, -} from "../../../utils/constants.js"; -import { BlockStatus } from "../../types.js"; export interface IBlockEpoch extends Document { height: number; blocks: (Types.ObjectId | null)[]; - status: BlockStatus[]; - epochStatus: BlockStatus; - timeoutAt: Date; - failCount: number; } const BlockEpochSchema = new Schema( @@ -24,21 +15,6 @@ const BlockEpochSchema = new Schema( default: null, }, ], - status: { - type: [String], - enum: ["waiting", "processing", "done", "failed"], - default: Array(BLOCK_EPOCH_SIZE).fill("waiting" as BlockStatus), - }, - epochStatus: { - type: String, - enum: ["waiting", "processing", "done", "failed"], - default: "waiting" as BlockStatus, - }, - timeoutAt: { - type: Date, - default: new Date(Date.now() + WORKER_TIMEOUT_MS), - }, - failCount: { type: Number, default: 0 }, }, { timestamps: true }, ); diff --git a/prover_v2/src/modules/db/models/blockEpoch/utils.test.ts b/prover_v2/src/modules/db/models/blockEpoch/utils.test.ts deleted file mode 100644 index 1589973b..00000000 --- a/prover_v2/src/modules/db/models/blockEpoch/utils.test.ts +++ /dev/null @@ -1,145 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { Types } from "mongoose"; -import { - getBlockEpoch, - storeBlockInBlockEpoch, - updateBlockStatusInEpoch, - deleteBlockEpoch, - incrementBlockEpochFailCount, - seedInitialBlocks as seedBlockEpochs, -} from "./utils.js"; -import { BlockEpochModel } from "./BlockEpoch.js"; -import { BlockModel } from "../block/Block.js"; -import { BLOCK_EPOCH_SIZE } from "../../../utils/constants.js"; - -vi.mock("./BlockEpoch.js"); -vi.mock("../block/Block.js"); -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -describe("db blockEpoch utils", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("getBlockEpoch finds epoch by height", async () => { - const mockEpoch = { height: 16 } as any; - vi.mocked(BlockEpochModel.findOne).mockResolvedValue(mockEpoch); - - const result = await getBlockEpoch(16); - - expect(BlockEpochModel.findOne).toHaveBeenCalledWith({ height: 16 }); - expect(result).toBe(mockEpoch); - }); - - it("storeBlockInBlockEpoch throws when index is out of range", async () => { - const height = 10; - const blockId = new Types.ObjectId(); - - await expect( - storeBlockInBlockEpoch(height, blockId, -1), - ).rejects.toThrow("Index must be between 0 and"); - await expect( - storeBlockInBlockEpoch(height, blockId, BLOCK_EPOCH_SIZE), - ).rejects.toThrow("Index must be between 0 and"); - }); - - it("storeBlockInBlockEpoch upserts epoch and stores block at computed epoch height", async () => { - const height = 10; - const blockId = new Types.ObjectId(); - vi.mocked(BlockEpochModel.findOneAndUpdate).mockResolvedValue({ - height: 8, - } as any); - - const result = await storeBlockInBlockEpoch(height, blockId, 2); - - const expectedEpochHeight = - Math.floor(height / BLOCK_EPOCH_SIZE) * BLOCK_EPOCH_SIZE; - expect(BlockEpochModel.findOneAndUpdate).toHaveBeenCalledWith( - { height: expectedEpochHeight }, - expect.objectContaining({ - $setOnInsert: expect.objectContaining({ - height: expectedEpochHeight, - blocks: Array(BLOCK_EPOCH_SIZE).fill(null), - status: Array(BLOCK_EPOCH_SIZE).fill("waiting"), - failCount: 0, - timeoutAt: expect.any(Date), - }), - $set: expect.objectContaining({ - [`blocks.2`]: blockId, - }), - }), - { upsert: true, new: true }, - ); - expect(result).toEqual({ height: 8 }); - }); - - it("updateBlockStatusInEpoch updates status at given index", async () => { - vi.mocked(BlockEpochModel.findOneAndUpdate).mockResolvedValue({} as any); - - await updateBlockStatusInEpoch(8, 1, "processing"); - - expect(BlockEpochModel.findOneAndUpdate).toHaveBeenCalledWith( - { height: 8 }, - { - $set: { - ["status.1"]: "processing", - }, - }, - ); - }); - - it("updateBlockStatusInEpoch throws when index is out of range", async () => { - await expect( - updateBlockStatusInEpoch(8, -1, "processing"), - ).rejects.toThrow("Index must be between 0 and"); - await expect( - updateBlockStatusInEpoch(8, BLOCK_EPOCH_SIZE, "processing"), - ).rejects.toThrow("Index must be between 0 and"); - }); - - it("deleteBlockEpoch deletes epoch by height", async () => { - vi.mocked(BlockEpochModel.deleteOne).mockResolvedValue({} as any); - - await deleteBlockEpoch(8); - - expect(BlockEpochModel.deleteOne).toHaveBeenCalledWith({ height: 8 }); - }); - - it("incrementBlockEpochFailCount increments failCount and updates timeoutAt", async () => { - vi.mocked(BlockEpochModel.updateOne).mockResolvedValue({} as any); - - await incrementBlockEpochFailCount(8); - - const call = vi.mocked(BlockEpochModel.updateOne).mock.calls[0][1] as any; - expect(call.$inc).toEqual({ failCount: 1 }); - expect(call.$set.timeoutAt).toBeInstanceOf(Date); - }); - - it("seedInitialBlocks creates epoch when not exists and required blocks present", async () => { - vi.mocked(BlockEpochModel.exists).mockResolvedValue(false as any); - const genesis = { _id: new Types.ObjectId(), height: 0 } as any; - const first = { _id: new Types.ObjectId(), height: 1 } as any; - vi.mocked(BlockModel.findOne) - .mockResolvedValueOnce(genesis) - .mockResolvedValueOnce(first); - vi.mocked(BlockEpochModel.create).mockResolvedValue({} as any); - - await seedBlockEpochs(); - - expect(BlockEpochModel.create).toHaveBeenCalledWith( - expect.objectContaining({ - height: 0, - blocks: expect.arrayContaining([genesis._id, first._id]), - status: expect.any(Array), - }), - ); - }); -}); - diff --git a/prover_v2/src/modules/db/models/blockEpoch/utils.ts b/prover_v2/src/modules/db/models/blockEpoch/utils.ts index 23c5ccda..1d5ec83c 100644 --- a/prover_v2/src/modules/db/models/blockEpoch/utils.ts +++ b/prover_v2/src/modules/db/models/blockEpoch/utils.ts @@ -1,10 +1,6 @@ import { Types } from "mongoose"; -import { BlockEpochModel } from "./BlockEpoch.js"; -import { - BLOCK_EPOCH_SIZE, - WORKER_TIMEOUT_MS, -} from "../../../utils/constants.js"; -import { BlockStatus } from "../../types.js"; +import { BlockEpochModel, IBlockEpoch } from "./BlockEpoch.js"; +import { BLOCK_EPOCH_SIZE } from "../../../utils/constants.js"; import logger from "../../../../logger.js"; import { BlockModel } from "../../index.js"; @@ -16,7 +12,7 @@ export async function storeBlockInBlockEpoch( height: number, blockId: Types.ObjectId, index: number, -) { +): Promise { if (index < 0 || index >= BLOCK_EPOCH_SIZE) { throw new Error(`Index must be between 0 and ${BLOCK_EPOCH_SIZE - 1}`); } @@ -30,9 +26,6 @@ export async function storeBlockInBlockEpoch( $setOnInsert: { height: blockEpochHeight, blocks: Array(BLOCK_EPOCH_SIZE).fill(null), - status: Array(BLOCK_EPOCH_SIZE).fill("waiting" as BlockStatus), - failCount: 0, - timeoutAt: new Date(Date.now() + WORKER_TIMEOUT_MS), }, $set: { [`blocks.${index}`]: blockId, @@ -48,50 +41,16 @@ export async function storeBlockInBlockEpoch( return result; } -export async function updateBlockStatusInEpoch( - blockEpochHeight: number, - index: number, - status: BlockStatus, -) { - if (index < 0 || index >= BLOCK_EPOCH_SIZE) { - throw new Error(`Index must be between 0 and ${BLOCK_EPOCH_SIZE - 1}`); - } - - await BlockEpochModel.findOneAndUpdate( - { height: blockEpochHeight }, - { - $set: { - [`status.${index}`]: status, - }, - }, - ); - - logger.info( - `Updated block status in epoch ${blockEpochHeight} at index ${index} to ${status}.`, - ); -} - export async function deleteBlockEpoch(height: number) { await BlockEpochModel.deleteOne({ height }); logger.info(`Deleted block epoch at height ${height}.`); } -export async function incrementBlockEpochFailCount(height: number) { - await BlockEpochModel.updateOne( - { height }, - { - $inc: { failCount: 1 }, - $set: { timeoutAt: new Date(Date.now() + WORKER_TIMEOUT_MS) }, - }, - ); -} - export async function seedInitialBlocks() { const exists = await BlockEpochModel.exists({ height: 0 }); if (exists) return; - // Block koleksiyonundaki genesis bloklarını referans al const genesisBlock = await BlockModel.findOne({ height: 0 }); const firstBlock = await BlockModel.findOne({ height: 1 }); @@ -107,16 +66,9 @@ export async function seedInitialBlocks() { ...Array(BLOCK_EPOCH_SIZE - 2).fill(null), ]; - const status = [ - "done" as BlockStatus, - "done" as BlockStatus, - ...Array(BLOCK_EPOCH_SIZE - 2).fill("done" as BlockStatus), - ]; - await BlockEpochModel.create({ height: 0, blocks, - status, }); logger.info("Seeded initial blocks (height 0 and 1)."); diff --git a/prover_v2/src/modules/db/models/proofEpoch/ProofEpoch.ts b/prover_v2/src/modules/db/models/proofEpoch/ProofEpoch.ts index 0b6d0bff..552a3976 100644 --- a/prover_v2/src/modules/db/models/proofEpoch/ProofEpoch.ts +++ b/prover_v2/src/modules/db/models/proofEpoch/ProofEpoch.ts @@ -1,13 +1,9 @@ import mongoose, { Schema, Document, Types } from "mongoose"; -import { ProofKind, ProofStatus } from "../../types.js"; export interface IProofEpoch extends Document { height: number; proofs: (Types.ObjectId | null)[]; - status: ProofStatus[]; - timeoutAt: Date; - kind: ProofKind; - failCount: number; + settled: boolean; } const ProofEpochSchema = new Schema( @@ -20,19 +16,7 @@ const ProofEpochSchema = new Schema( default: null, }, ], - status: [ - { - type: String, - enum: ["waiting", "processing", "done", "failed"], - }, - ], - timeoutAt: { type: Date, required: true }, - kind: { - type: String, - enum: ["blockProof", "aggregation", "settlement", "done"], - required: true, - }, - failCount: { type: Number, default: 0 }, + settled: { type: Boolean, default: false }, }, { timestamps: true }, ); diff --git a/prover_v2/src/modules/db/models/proofEpoch/utils.test.ts b/prover_v2/src/modules/db/models/proofEpoch/utils.test.ts deleted file mode 100644 index 4cf4ea3d..00000000 --- a/prover_v2/src/modules/db/models/proofEpoch/utils.test.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { Types } from "mongoose"; -import { - getProofEpoch, - storeProofInProofEpoch, - deleteProofEpoch, - incrementProofEpochFailCount, -} from "./utils.js"; -import { ProofEpochModel } from "./ProofEpoch.js"; -import { - PROOF_EPOCH_LEAF_COUNT, - PROOF_EPOCH_SETTLEMENT_INDEX, -} from "../../../utils/constants.js"; - -vi.mock("./ProofEpoch.js"); -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -describe("db proofEpoch utils", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("getProofEpoch finds epoch by height", async () => { - const mockEpoch = { height: 16 } as any; - vi.mocked(ProofEpochModel.findOne).mockResolvedValue(mockEpoch); - - const result = await getProofEpoch(16); - - expect(ProofEpochModel.findOne).toHaveBeenCalledWith({ height: 16 }); - expect(result).toBe(mockEpoch); - }); - - it("storeProofInProofEpoch throws when index is out of range", async () => { - const height = 10; - const proofId = new Types.ObjectId(); - - await expect( - storeProofInProofEpoch(height, proofId, -1), - ).rejects.toThrow("Index must be between 0 and"); - await expect( - storeProofInProofEpoch( - height, - proofId, - PROOF_EPOCH_SETTLEMENT_INDEX + 1, - ), - ).rejects.toThrow("Index must be between 0 and"); - }); - - it("storeProofInProofEpoch sets proof at index and marks status as done for internal nodes", async () => { - const height = 10; - const proofId = new Types.ObjectId(); - vi.mocked(ProofEpochModel.findOneAndUpdate).mockResolvedValue({} as any); - - const leafIndex = 1; - await storeProofInProofEpoch(height, proofId, leafIndex); - - expect(ProofEpochModel.findOneAndUpdate).toHaveBeenCalledWith( - { height }, - { - $set: { - [`proofs.${leafIndex}`]: proofId, - }, - }, - ); - - const internalIndex = PROOF_EPOCH_LEAF_COUNT; - await storeProofInProofEpoch(height, proofId, internalIndex); - - expect(ProofEpochModel.findOneAndUpdate).toHaveBeenCalledWith( - { height }, - { - $set: { - [`proofs.${internalIndex}`]: proofId, - [`status.${internalIndex % PROOF_EPOCH_LEAF_COUNT}`]: - "done", - }, - }, - ); - }); - - it("deleteProofEpoch deletes epoch by height", async () => { - vi.mocked(ProofEpochModel.deleteOne).mockResolvedValue({} as any); - - await deleteProofEpoch(8); - - expect(ProofEpochModel.deleteOne).toHaveBeenCalledWith({ height: 8 }); - }); - - it("incrementProofEpochFailCount increments failCount and updates timeoutAt", async () => { - vi.mocked(ProofEpochModel.updateOne).mockResolvedValue({} as any); - - await incrementProofEpochFailCount(8); - - const call = vi.mocked(ProofEpochModel.updateOne).mock.calls[0][1] as any; - expect(call.$inc).toEqual({ failCount: 1 }); - expect(call.$set.timeoutAt).toBeInstanceOf(Date); - }); -}); - diff --git a/prover_v2/src/modules/db/models/proofEpoch/utils.ts b/prover_v2/src/modules/db/models/proofEpoch/utils.ts index c301853f..064aee6e 100644 --- a/prover_v2/src/modules/db/models/proofEpoch/utils.ts +++ b/prover_v2/src/modules/db/models/proofEpoch/utils.ts @@ -1,52 +1,12 @@ -import { Types } from "mongoose"; import { ProofEpochModel } from "./ProofEpoch.js"; -import { - PROOF_EPOCH_LEAF_COUNT, - PROOF_EPOCH_SETTLEMENT_INDEX, - WORKER_TIMEOUT_MS, -} from "../../../utils/constants.js"; import logger from "../../../../logger.js"; export async function getProofEpoch(height: number) { return ProofEpochModel.findOne({ height }); } -export async function storeProofInProofEpoch( - height: number, - proof: Types.ObjectId, - index: number, -) { - if (index < 0 || index > PROOF_EPOCH_SETTLEMENT_INDEX) { - throw new Error("Index must be between 0 and 31"); - } - - const update: Record = { - [`proofs.${index}`]: proof, - }; - - if (index > PROOF_EPOCH_LEAF_COUNT - 1) { - update[`status.${index % PROOF_EPOCH_LEAF_COUNT}`] = "done"; - } - - await ProofEpochModel.findOneAndUpdate({ height }, { $set: update }); - - logger.info( - `Stored proof ${proof.toHexString()} in proof epoch at height ${height} for index ${index}.`, - ); -} - export async function deleteProofEpoch(height: number) { await ProofEpochModel.deleteOne({ height }); logger.info(`Deleted proof epoch at height ${height}.`); } - -export async function incrementProofEpochFailCount(height: number) { - await ProofEpochModel.updateOne( - { height }, - { - $inc: { failCount: 1 }, - $set: { timeoutAt: new Date(Date.now() + WORKER_TIMEOUT_MS) }, - }, - ); -} diff --git a/prover_v2/src/modules/db/types.ts b/prover_v2/src/modules/db/types.ts deleted file mode 100644 index d9ffdb5f..00000000 --- a/prover_v2/src/modules/db/types.ts +++ /dev/null @@ -1,3 +0,0 @@ -export type ProofKind = "blockProof" | "aggregation" | "settlement" | "done"; -export type ProofStatus = "waiting" | "processing" | "done" | "failed"; -export type BlockStatus = "waiting" | "processing" | "done" | "failed"; diff --git a/prover_v2/src/modules/monitor/monitor.test.ts b/prover_v2/src/modules/monitor/monitor.test.ts deleted file mode 100644 index 20a3c0d0..00000000 --- a/prover_v2/src/modules/monitor/monitor.test.ts +++ /dev/null @@ -1,257 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { - startMonitor, - checkBlockEpochs, - checkProofEpochs, -} from "./monitor.js"; -import { BlockEpochModel } from "../db/models/blockEpoch/BlockEpoch.js"; -import { ProofEpochModel } from "../db/models/proofEpoch/ProofEpoch.js"; -import { MAX_FAIL_COUNT, MONITOR_INTERVAL_MS } from "../utils/constants.js"; -import * as functions from "../utils/functions.js"; - -vi.mock("../db/models/blockEpoch/BlockEpoch.js"); -vi.mock("../db/models/proofEpoch/ProofEpoch.js"); -vi.mock("../utils/functions.js"); -vi.mock("../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -describe("monitor", () => { - beforeEach(() => { - vi.clearAllMocks(); - vi.mocked(functions.sleep).mockResolvedValue(undefined); - }); - - describe("checkBlockEpochs", () => { - it("marks block epochs as failed when failCount exceeds MAX_FAIL_COUNT", async () => { - const mockEpochs = [ - { - height: 8, - failCount: MAX_FAIL_COUNT + 1, - epochStatus: "processing", - }, - { - height: 16, - failCount: MAX_FAIL_COUNT + 2, - epochStatus: "waiting", - }, - ]; - - vi.mocked(BlockEpochModel.find).mockResolvedValue(mockEpochs as any); - vi.mocked(BlockEpochModel.updateOne).mockResolvedValue({} as any); - - const count = await checkBlockEpochs(); - - expect(count).toBe(2); - expect(BlockEpochModel.find).toHaveBeenCalledWith({ - failCount: { $gt: MAX_FAIL_COUNT }, - epochStatus: { $ne: "failed" }, - }); - expect(BlockEpochModel.updateOne).toHaveBeenCalledTimes(2); - expect(BlockEpochModel.updateOne).toHaveBeenCalledWith( - { height: 8 }, - { $set: { epochStatus: "failed" } }, - ); - expect(BlockEpochModel.updateOne).toHaveBeenCalledWith( - { height: 16 }, - { $set: { epochStatus: "failed" } }, - ); - }); - - it("does not mark epochs that are already failed", async () => { - vi.mocked(BlockEpochModel.find).mockResolvedValue([]); - - const count = await checkBlockEpochs(); - - expect(count).toBe(0); - expect(BlockEpochModel.updateOne).not.toHaveBeenCalled(); - }); - - it("does not mark epochs with failCount <= MAX_FAIL_COUNT", async () => { - vi.mocked(BlockEpochModel.find).mockResolvedValue([]); - - const count = await checkBlockEpochs(); - - expect(count).toBe(0); - expect(BlockEpochModel.updateOne).not.toHaveBeenCalled(); - }); - }); - - describe("checkProofEpochs", () => { - it("marks proof epochs as failed when failCount exceeds MAX_FAIL_COUNT", async () => { - const mockEpochs = [ - { - height: 8, - failCount: MAX_FAIL_COUNT + 1, - status: ["waiting", "processing", "done"], - kind: "blockProof", - }, - { - height: 16, - failCount: MAX_FAIL_COUNT + 2, - status: ["waiting", "waiting"], - kind: "aggregation", - }, - ]; - - vi.mocked(ProofEpochModel.find).mockResolvedValue(mockEpochs as any); - vi.mocked(ProofEpochModel.updateOne).mockResolvedValue({} as any); - - const count = await checkProofEpochs(); - - expect(count).toBe(2); - expect(ProofEpochModel.find).toHaveBeenCalledWith({ - failCount: { $gt: MAX_FAIL_COUNT }, - status: { $not: { $all: ["failed"] } }, - }); - expect(ProofEpochModel.updateOne).toHaveBeenCalledTimes(2); - expect(ProofEpochModel.updateOne).toHaveBeenCalledWith( - { height: 8 }, - { $set: { status: ["failed", "failed", "failed"] } }, - ); - expect(ProofEpochModel.updateOne).toHaveBeenCalledWith( - { height: 16 }, - { $set: { status: ["failed", "failed"] } }, - ); - }); - - it("does not mark epochs that are already fully failed", async () => { - vi.mocked(ProofEpochModel.find).mockResolvedValue([]); - - const count = await checkProofEpochs(); - - expect(count).toBe(0); - expect(ProofEpochModel.updateOne).not.toHaveBeenCalled(); - }); - - it("does not mark epochs with failCount <= MAX_FAIL_COUNT", async () => { - vi.mocked(ProofEpochModel.find).mockResolvedValue([]); - - const count = await checkProofEpochs(); - - expect(count).toBe(0); - expect(ProofEpochModel.updateOne).not.toHaveBeenCalled(); - }); - }); - - describe("monitorLoop", () => { - it("runs checkBlockEpochs and checkProofEpochs in loop", async () => { - vi.mocked(BlockEpochModel.find).mockResolvedValue([]); - vi.mocked(ProofEpochModel.find).mockResolvedValue([]); - - let callCount = 0; - vi.mocked(functions.sleep).mockImplementation(async () => { - callCount++; - if (callCount > 1) { - throw new Error("Test iteration limit reached"); - } - return Promise.resolve(); - }); - - await expect(startMonitor()).rejects.toThrow("Test iteration limit reached"); - - expect(BlockEpochModel.find).toHaveBeenCalled(); - expect(ProofEpochModel.find).toHaveBeenCalled(); - expect(functions.sleep).toHaveBeenCalledWith(MONITOR_INTERVAL_MS); - }); - - it("logs when epochs are marked as failed", async () => { - const mockBlockEpochs = [ - { - height: 8, - failCount: MAX_FAIL_COUNT + 1, - epochStatus: "processing", - }, - ]; - const mockProofEpochs = [ - { - height: 8, - failCount: MAX_FAIL_COUNT + 1, - status: ["waiting"], - kind: "blockProof", - }, - ]; - - vi.mocked(BlockEpochModel.find).mockResolvedValue(mockBlockEpochs as any); - vi.mocked(ProofEpochModel.find).mockResolvedValue(mockProofEpochs as any); - vi.mocked(BlockEpochModel.updateOne).mockResolvedValue({} as any); - vi.mocked(ProofEpochModel.updateOne).mockResolvedValue({} as any); - - let callCount = 0; - vi.mocked(functions.sleep).mockImplementation(async () => { - callCount++; - if (callCount > 1) { - throw new Error("Test iteration limit reached"); - } - return Promise.resolve(); - }); - - await expect(startMonitor()).rejects.toThrow("Test iteration limit reached"); - - const logger = await import("../../logger.js"); - expect(logger.default.info).toHaveBeenCalledWith( - "Monitor check completed", - expect.objectContaining({ - failedBlockEpochs: 1, - failedProofEpochs: 1, - event: "monitor_check", - }), - ); - }); - - it("handles errors gracefully and continues loop", async () => { - vi.mocked(BlockEpochModel.find) - .mockRejectedValueOnce(new Error("DB error")) - .mockResolvedValue([]); - vi.mocked(ProofEpochModel.find).mockResolvedValue([]); - - let callCount = 0; - vi.mocked(functions.sleep).mockImplementation(async () => { - callCount++; - if (callCount > 2) { - throw new Error("Test iteration limit reached"); - } - return Promise.resolve(); - }); - - await expect(startMonitor()).rejects.toThrow("Test iteration limit reached"); - - expect(BlockEpochModel.find).toHaveBeenCalledTimes(3); - const logger = await import("../../logger.js"); - expect(logger.default.error).toHaveBeenCalled(); - }); - }); - - describe("startMonitor", () => { - it("logs start information", async () => { - vi.mocked(BlockEpochModel.find).mockResolvedValue([]); - vi.mocked(ProofEpochModel.find).mockResolvedValue([]); - - let callCount = 0; - vi.mocked(functions.sleep).mockImplementation(async () => { - callCount++; - if (callCount > 1) { - throw new Error("Test iteration limit reached"); - } - return Promise.resolve(); - }); - - await expect(startMonitor()).rejects.toThrow("Test iteration limit reached"); - - const logger = await import("../../logger.js"); - expect(logger.default.info).toHaveBeenCalledWith( - "Starting monitor", - expect.objectContaining({ - maxFailCount: MAX_FAIL_COUNT, - intervalMs: MONITOR_INTERVAL_MS, - event: "monitor_start", - }), - ); - }); - }); -}); diff --git a/prover_v2/src/modules/monitor/monitor.ts b/prover_v2/src/modules/monitor/monitor.ts index 75d5331c..42441705 100644 --- a/prover_v2/src/modules/monitor/monitor.ts +++ b/prover_v2/src/modules/monitor/monitor.ts @@ -1,72 +1,36 @@ import logger from "../../logger.js"; -import { BlockEpochModel } from "../db/models/blockEpoch/BlockEpoch.js"; -import { ProofEpochModel } from "../db/models/proofEpoch/ProofEpoch.js"; -import { MAX_FAIL_COUNT, MONITOR_INTERVAL_MS } from "../utils/constants.js"; -import { BlockStatus, ProofStatus } from "../db/types.js"; +import { MONITOR_INTERVAL_MS } from "../utils/constants.js"; +import { blockProverQ, aggregatorQ, settlerQ } from "../processors/utils/queue.js"; import { sleep } from "../utils/functions.js"; -export async function checkBlockEpochs() { - const failedEpochs = await BlockEpochModel.find({ - failCount: { $gt: MAX_FAIL_COUNT }, - epochStatus: { $ne: "failed" }, - }); - - for (const epoch of failedEpochs) { - await BlockEpochModel.updateOne( - { height: epoch.height }, - { $set: { epochStatus: "failed" as BlockStatus } }, - ); - - logger.warn("Block epoch marked as failed", { - height: epoch.height, - failCount: epoch.failCount, - event: "block_epoch_failed", - }); - } - - return failedEpochs.length; -} - -export async function checkProofEpochs() { - const failedEpochs = await ProofEpochModel.find({ - failCount: { $gt: MAX_FAIL_COUNT }, - status: { $not: { $all: ["failed"] } }, - }); - - for (const epoch of failedEpochs) { - const failedStatus: ProofStatus[] = epoch.status.map( - () => "failed" as ProofStatus, - ); - - await ProofEpochModel.updateOne( - { height: epoch.height }, - { $set: { status: failedStatus } }, - ); - - logger.warn("Proof epoch marked as failed", { - height: epoch.height, - failCount: epoch.failCount, - kind: epoch.kind, - event: "proof_epoch_failed", - }); +export async function checkQueueHealth() { + const queues = [ + { name: "block-prover", queue: blockProverQ }, + { name: "aggregator", queue: aggregatorQ }, + { name: "settler", queue: settlerQ }, + ]; + + for (const { name, queue } of queues) { + const failedCount = await queue.getFailedCount(); + const waitingCount = await queue.getWaitingCount(); + const activeCount = await queue.getActiveCount(); + + if (failedCount > 0) { + logger.warn(`Queue "${name}" has failed jobs`, { + queue: name, + failedCount, + waitingCount, + activeCount, + event: "queue_failed_jobs", + }); + } } - - return failedEpochs.length; } async function monitorLoop() { while (true) { try { - const blockEpochCount = await checkBlockEpochs(); - const proofEpochCount = await checkProofEpochs(); - - if (blockEpochCount > 0 || proofEpochCount > 0) { - logger.info("Monitor check completed", { - failedBlockEpochs: blockEpochCount, - failedProofEpochs: proofEpochCount, - event: "monitor_check", - }); - } + await checkQueueHealth(); } catch (error) { logger.error("Error during monitor check", error as Error, { event: "monitor_error", @@ -78,8 +42,7 @@ async function monitorLoop() { } export async function startMonitor() { - logger.info("Starting monitor", { - maxFailCount: MAX_FAIL_COUNT, + logger.info("Starting queue health monitor", { intervalMs: MONITOR_INTERVAL_MS, event: "monitor_start", }); diff --git a/prover_v2/src/modules/processors/aggregator/index.ts b/prover_v2/src/modules/processors/aggregator/index.ts index 6439807d..982b35f0 100644 --- a/prover_v2/src/modules/processors/aggregator/index.ts +++ b/prover_v2/src/modules/processors/aggregator/index.ts @@ -1,3 +1 @@ -import { masterRunner } from "./master"; - -export { masterRunner }; +export { worker } from "./worker.js"; diff --git a/prover_v2/src/modules/processors/aggregator/master.test.ts b/prover_v2/src/modules/processors/aggregator/master.test.ts deleted file mode 100644 index 8627cceb..00000000 --- a/prover_v2/src/modules/processors/aggregator/master.test.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { Types } from "mongoose"; -import { MASTER_SLEEP_INTERVAL_MS } from "../../utils/constants.js"; - -vi.mock("../../db/index.js", () => ({ - ProofEpochModel: { - findOne: vi.fn(), - updateOne: vi.fn(), - }, - incrementProofEpochFailCount: vi.fn(), -})); - -vi.mock("../utils/queue.js", () => ({ - aggregatorQ: { - add: vi.fn(), - }, -})); - -vi.mock("../utils/workerConnection.js", () => ({ - connection: {}, -})); - -vi.mock("./worker.js", () => ({ - worker: vi.fn(), -})); - -vi.mock("../../utils/functions.js", () => ({ - sleep: vi.fn(), -})); - -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -import { ProofEpochModel } from "../../db/index.js"; -import { aggregatorQ } from "../utils/queue.js"; -import { sleep } from "../../utils/functions.js"; -import { AggregatorMaster } from "./master.js"; - -describe("aggregator master", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("queues available aggregation jobs and marks status processing", async () => { - const left = new Types.ObjectId(); - const right = new Types.ObjectId(); - const id = new Types.ObjectId(); - vi.mocked(ProofEpochModel.findOne).mockResolvedValue({ - _id: id, - height: 10, - proofs: [left, right], - status: ["waiting"], - timeoutAt: new Date(Date.now() + 1000), - } as any); - vi.mocked(ProofEpochModel.updateOne).mockResolvedValue({ - modifiedCount: 1, - } as any); - - const m = new AggregatorMaster() as any; - await m.handleTask(); - - expect(ProofEpochModel.updateOne).toHaveBeenCalledTimes(1); - expect(aggregatorQ.add).toHaveBeenCalledWith("aggregator", { - height: 10, - index: 0, - left: left.toString(), - right: right.toString(), - }); - expect(ProofEpochModel.updateOne).toHaveBeenCalledWith( - { - _id: id, - "proofs.0": { $ne: null }, - "proofs.1": { $ne: null }, - "status.0": { $eq: "waiting" }, - }, - { $set: { "status.0": "processing" } }, - ); - expect(sleep).not.toHaveBeenCalled(); - }); - - it("sleeps when no epoch", async () => { - vi.mocked(ProofEpochModel.findOne).mockResolvedValue(null as any); - - const m = new AggregatorMaster() as any; - await m.handleTask(); - - expect(aggregatorQ.add).not.toHaveBeenCalled(); - expect(sleep).toHaveBeenCalledWith(MASTER_SLEEP_INTERVAL_MS); - }); - - it("rolls back status when queue add fails", async () => { - const left = new Types.ObjectId(); - const right = new Types.ObjectId(); - const id = new Types.ObjectId(); - vi.mocked(ProofEpochModel.findOne).mockResolvedValue({ - _id: id, - height: 10, - proofs: [left, right], - status: ["waiting"], - timeoutAt: new Date(Date.now() + 1000), - } as any); - vi.mocked(ProofEpochModel.updateOne).mockResolvedValueOnce({ - modifiedCount: 1, - } as any); - vi.mocked(aggregatorQ.add).mockRejectedValueOnce( - new Error("queue error"), - ); - - const m = new AggregatorMaster() as any; - await expect(m.handleTask()).rejects.toThrow("queue error"); - - const calls = vi.mocked(ProofEpochModel.updateOne).mock.calls; - expect(calls[1][0]).toEqual({ - _id: id, - "status.0": { $eq: "processing" }, - }); - expect(calls[1][1]).toEqual({ - $set: { "status.0": "waiting" }, - }); - }); -}); - diff --git a/prover_v2/src/modules/processors/aggregator/master.ts b/prover_v2/src/modules/processors/aggregator/master.ts deleted file mode 100644 index 82740837..00000000 --- a/prover_v2/src/modules/processors/aggregator/master.ts +++ /dev/null @@ -1,169 +0,0 @@ -import { Types } from "mongoose"; -import { - WORKER_COUNT, - WORKER_TIMEOUT_MS, - STALLED_INTERVAL_MS, - MASTER_SLEEP_INTERVAL_MS, -} from "../../utils/constants.js"; -import { - incrementProofEpochFailCount, - ProofEpochModel, -} from "../../db/index.js"; -import { Master } from "../base/Master.js"; -import { aggregatorQ } from "../utils/queue.js"; -import { AggregatorJob } from "../utils/jobs.js"; -import { connection } from "../utils/workerConnection.js"; -import { worker as processAggregation } from "./worker.js"; -import { sleep } from "../../utils/functions.js"; -import logger from "../../../logger.js"; - -export interface Aggregation { - left: Types.ObjectId; - right: Types.ObjectId; - index: number; -} - -const patterns = [ - { startNode: 0, aggregated: 0 }, - { startNode: 2, aggregated: 1 }, - { startNode: 4, aggregated: 2 }, - { startNode: 6, aggregated: 3 }, - { startNode: 8, aggregated: 4 }, - { startNode: 10, aggregated: 5 }, - { startNode: 12, aggregated: 6 }, - { startNode: 14, aggregated: 7 }, - { startNode: 16, aggregated: 8 }, - { startNode: 18, aggregated: 9 }, - { startNode: 20, aggregated: 10 }, - { startNode: 22, aggregated: 11 }, - { startNode: 24, aggregated: 12 }, - { startNode: 26, aggregated: 13 }, - { startNode: 28, aggregated: 14 }, -]; - -export class AggregatorMaster extends Master { - constructor() { - super({ - queueName: "aggregator", - workerLabel: "Aggregator", - connection, - workerCount: WORKER_COUNT, - lockDurationMs: WORKER_TIMEOUT_MS, - stalledIntervalMs: STALLED_INTERVAL_MS, - processJob: async (workerId, job) => { - const epoch = await ProofEpochModel.findOne({ - height: job.data.height, - }); - if (!epoch) { - logger.warn( - `Aggregator worker ${workerId} could not find epoch at height ${job.data.height}`, - ); - return; - } - const aggregation: Aggregation = { - left: new Types.ObjectId(job.data.left), - right: new Types.ObjectId(job.data.right), - index: job.data.index, - }; - await processAggregation(epoch, aggregation); - }, - onJobFailed: async (job) => { - if (job?.data.height) { - await incrementProofEpochFailCount(job.data.height); - } - }, - }); - } - - protected async handleTask(): Promise { - const orClauses = patterns.map((p) => ({ - $and: [ - { [`proofs.${p.startNode}`]: { $ne: null } }, - { [`proofs.${p.startNode + 1}`]: { $ne: null } }, - { [`status.${p.aggregated}`]: { $eq: "waiting" } }, - ], - })); - - const epoch = await ProofEpochModel.findOne( - { - $or: orClauses, - timeoutAt: { $gt: new Date() }, - }, - undefined, - { sort: { timeoutAt: 1 } }, - ); - - if (epoch) { - const availablePatterns = patterns.filter( - (p) => - epoch.proofs[p.startNode] && - epoch.proofs[p.startNode + 1] && - epoch.status[p.aggregated] === "waiting", - ); - - if (availablePatterns.length === 0) { - logger.warn( - `Epoch ${epoch.height} matched query but has no valid aggregation slots, skipping`, - ); - await sleep(MASTER_SLEEP_INTERVAL_MS); - } else { - for (const p of availablePatterns) { - const leftId = epoch.proofs[p.startNode] as Types.ObjectId; - const rightId = epoch.proofs[ - p.startNode + 1 - ] as Types.ObjectId; - const claimed = await ProofEpochModel.updateOne( - { - _id: epoch._id, - [`proofs.${p.startNode}`]: { $ne: null }, - [`proofs.${p.startNode + 1}`]: { $ne: null }, - [`status.${p.aggregated}`]: { $eq: "waiting" }, - }, - { $set: { [`status.${p.aggregated}`]: "processing" } }, - ); - - if (!claimed.modifiedCount) continue; - - try { - await aggregatorQ.add("aggregator", { - height: epoch.height, - index: p.aggregated, - left: leftId.toString(), - right: rightId.toString(), - }); - logger.debug( - `Pushed aggregator job for epoch ${epoch.height}, aggregation index ${p.aggregated}`, - { - epochHeight: epoch.height, - index: p.aggregated, - event: "aggregator_task_queued", - }, - ); - } catch (error) { - await ProofEpochModel.updateOne( - { - _id: epoch._id, - [`status.${p.aggregated}`]: { - $eq: "processing", - }, - }, - { - $set: { - [`status.${p.aggregated}`]: "waiting", - }, - }, - ); - throw error; - } - } - } - } else { - await sleep(MASTER_SLEEP_INTERVAL_MS); - } - } -} - -export async function masterRunner() { - const master = new AggregatorMaster(); - await master.run(); -} diff --git a/prover_v2/src/modules/processors/aggregator/worker.test.ts b/prover_v2/src/modules/processors/aggregator/worker.test.ts deleted file mode 100644 index 6097985d..00000000 --- a/prover_v2/src/modules/processors/aggregator/worker.test.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { Types } from "mongoose"; -import { PROOF_EPOCH_LEAF_COUNT } from "../../utils/constants.js"; - -vi.mock("../../db/models/proofEpoch/ProofEpoch.js", () => ({ - ProofEpochModel: { - findOneAndUpdate: vi.fn(), - }, -})); - -vi.mock("../../db/models/proof/utils.js", () => ({ - getProof: vi.fn(), - storeProof: vi.fn(), -})); - -vi.mock("pulsar-contracts", () => ({ - SettlementProof: { - fromJSON: vi.fn(async (j: any) => ({ j })), - }, - MergeSettlementProofs: vi.fn(async () => ({ - toJSON: () => ({ merged: true }), - })), -})); - -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -import { ProofEpochModel } from "../../db/models/proofEpoch/ProofEpoch.js"; -import { getProof, storeProof } from "../../db/models/proof/utils.js"; -import { worker } from "./worker.js"; - -describe("aggregator worker", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("skips when already done after failure", async () => { - const task: any = { height: 1, failCount: 1, status: ["done"] }; - const aggregation: any = { - left: new Types.ObjectId(), - right: new Types.ObjectId(), - index: 0, - }; - - await worker(task, aggregation); - - expect(getProof).not.toHaveBeenCalled(); - expect(storeProof).not.toHaveBeenCalled(); - expect(ProofEpochModel.findOneAndUpdate).not.toHaveBeenCalled(); - }); - - it("throws when one of proofs is missing", async () => { - vi.mocked(getProof).mockResolvedValueOnce(null as any); - vi.mocked(getProof).mockResolvedValueOnce({} as any); - - const task: any = { height: 1, failCount: 0, status: ["waiting"] }; - const aggregation: any = { - left: new Types.ObjectId(), - right: new Types.ObjectId(), - index: 0, - }; - - await expect(worker(task, aggregation)).rejects.toThrow( - "One of the proofs to aggregate is missing.", - ); - }); - - it("stores aggregated proof and marks status done", async () => { - const aggId = new Types.ObjectId(); - vi.mocked(getProof).mockResolvedValue({} as any); - vi.mocked(storeProof).mockResolvedValue(aggId as any); - vi.mocked(ProofEpochModel.findOneAndUpdate).mockResolvedValue({} as any); - - const task: any = { height: 10, failCount: 0, status: ["waiting"] }; - const aggregation: any = { - left: new Types.ObjectId(), - right: new Types.ObjectId(), - index: 0, - }; - - await worker(task, aggregation); - - expect(storeProof).toHaveBeenCalledWith(JSON.stringify({ merged: true })); - expect(ProofEpochModel.findOneAndUpdate).toHaveBeenCalledWith( - { height: 10 }, - { - $set: { - [`proofs.${PROOF_EPOCH_LEAF_COUNT + 0}`]: aggId, - [`status.0`]: "done", - }, - }, - ); - }); -}); - diff --git a/prover_v2/src/modules/processors/aggregator/worker.ts b/prover_v2/src/modules/processors/aggregator/worker.ts index 79bd6f8d..a6d3f68d 100644 --- a/prover_v2/src/modules/processors/aggregator/worker.ts +++ b/prover_v2/src/modules/processors/aggregator/worker.ts @@ -1,24 +1,41 @@ -import { - type IProofEpoch, - ProofEpochModel, -} from "../../db/models/proofEpoch/ProofEpoch.js"; +import { Types } from "mongoose"; + +import { ProofEpochModel } from "../../db/models/proofEpoch/ProofEpoch.js"; import { getProof, storeProof } from "../../db/models/proof/utils.js"; -import { ProofKind, ProofStatus } from "../../db/types.js"; import logger from "../../../logger.js"; -import { Aggregation } from "./master.js"; -import { PROOF_EPOCH_LEAF_COUNT } from "../../utils/constants.js"; +import { + PROOF_EPOCH_LEAF_COUNT, + PROOF_EPOCH_SETTLEMENT_INDEX, +} from "../../utils/constants.js"; +import { AggregatorJob } from "../utils/jobs.js"; +import { tryEnqueueAggregation, tryEnqueueSettlement } from "../triggers.js"; import { MergeSettlementProofs, SettlementProof } from "pulsar-contracts"; -export async function worker(task: IProofEpoch, aggregation: Aggregation) { - if (task.failCount > 0 && task.status[aggregation.index] === "done") { +export async function worker(task: AggregatorJob) { + const { height, index, left, right } = task; + const parentProofIndex = PROOF_EPOCH_LEAF_COUNT + index; + + // Idempotency: skip if this aggregation is already done + const proofEpoch = await ProofEpochModel.findOne({ height }); + if (!proofEpoch) { + throw new Error(`ProofEpoch at height ${height} not found.`); + } + + if (proofEpoch.proofs[parentProofIndex]) { logger.info( - `Skipping aggregation for epoch ${task.height}, index ${aggregation.index} because it is already done.`, + `Aggregation ${index} for epoch ${height} already done, skipping`, ); + // Still trigger next stage in case it was missed + if (parentProofIndex === PROOF_EPOCH_SETTLEMENT_INDEX) { + await tryEnqueueSettlement(proofEpoch); + } else { + await tryEnqueueAggregation(proofEpoch, parentProofIndex); + } return; } - const leftProofJson = await getProof(aggregation.left); - const rightProofJson = await getProof(aggregation.right); + const leftProofJson = await getProof(new Types.ObjectId(left)); + const rightProofJson = await getProof(new Types.ObjectId(right)); if (!leftProofJson || !rightProofJson) { throw new Error("One of the proofs to aggregate is missing."); @@ -31,29 +48,33 @@ export async function worker(task: IProofEpoch, aggregation: Aggregation) { const aggregatedProofId = await storeProof(aggregatedProofJson); - if (!aggregatedProofId) { - throw new Error("Failed to store aggregated proof."); - } - - await ProofEpochModel.findOneAndUpdate( - { height: task.height }, + const updatedEpoch = await ProofEpochModel.findOneAndUpdate( + { height }, { $set: { - [`proofs.${PROOF_EPOCH_LEAF_COUNT + aggregation.index}`]: - aggregatedProofId, - [`status.${aggregation.index}`]: "done" as ProofStatus, + [`proofs.${parentProofIndex}`]: aggregatedProofId, }, }, + { new: true }, ); logger.info( - `Aggregated proof for epoch at height ${task.height} stored in slot ${PROOF_EPOCH_LEAF_COUNT + aggregation.index}.`, + `Aggregated proof for epoch ${height} stored in slot ${parentProofIndex}`, { aggregatedProofId: aggregatedProofId.toHexString(), - index: PROOF_EPOCH_LEAF_COUNT + aggregation.index, + index: parentProofIndex, event: "aggregated_proof_stored", }, ); + + if (!updatedEpoch) return; + + // Trigger next stage + if (parentProofIndex === PROOF_EPOCH_SETTLEMENT_INDEX) { + await tryEnqueueSettlement(updatedEpoch); + } else { + await tryEnqueueAggregation(updatedEpoch, parentProofIndex); + } } async function generateAggregatedProof( diff --git a/prover_v2/src/modules/processors/base/Master.test.ts b/prover_v2/src/modules/processors/base/Master.test.ts deleted file mode 100644 index e70e5bbb..00000000 --- a/prover_v2/src/modules/processors/base/Master.test.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; - -vi.mock("bullmq", () => { - class WorkerMock { - static instances: any[] = []; - handlers: Record = {}; - constructor( - public queueName: string, - public processor: any, - public opts: any, - ) { - WorkerMock.instances.push(this); - } - on(event: string, handler: any) { - this.handlers[event] = handler; - return this; - } - } - return { Worker: WorkerMock }; -}); - -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -import { Worker } from "bullmq"; -import { Master } from "./Master.js"; - -class TestMaster extends Master<{ a: number }> { - public async init() { - return await (this as any).initializeWorkers(); - } - public async makeWorker(id: number) { - return await (this as any).createWorker(id); - } - protected async handleTask(): Promise {} -} - -describe("processors base Master", () => { - beforeEach(() => { - vi.clearAllMocks(); - (Worker as any).instances = []; - }); - - it("initializeWorkers creates workerCount workers", async () => { - const m = new TestMaster({ - queueName: "q", - workerLabel: "L", - connection: {} as any, - workerCount: 3, - lockDurationMs: 111, - stalledIntervalMs: 222, - processJob: vi.fn(async () => {}), - }); - - await m.init(); - - expect((Worker as any).instances.length).toBe(3); - expect((Worker as any).instances[0].opts.lockDuration).toBe(111); - expect((Worker as any).instances[0].opts.stalledInterval).toBe(222); - }); - - it("worker processor calls processJob", async () => { - const processJob = vi.fn(async () => {}); - const m = new TestMaster({ - queueName: "q", - workerLabel: "L", - connection: {} as any, - workerCount: 1, - lockDurationMs: 1, - stalledIntervalMs: 1, - processJob, - }); - - const w: any = await m.makeWorker(0); - await w.processor({ id: "1", data: { a: 1 } }); - - expect(processJob).toHaveBeenCalledWith(0, { id: "1", data: { a: 1 } }); - }); - - it("failed handler calls onJobFailed when provided", async () => { - const onJobFailed = vi.fn(async () => {}); - const m = new TestMaster({ - queueName: "q", - workerLabel: "L", - connection: {} as any, - workerCount: 1, - lockDurationMs: 1, - stalledIntervalMs: 1, - processJob: vi.fn(async () => {}), - onJobFailed, - }); - - const w: any = await m.makeWorker(7); - await w.handlers.failed({ id: "x", data: { a: 2 } }, new Error("e")); - - expect(onJobFailed).toHaveBeenCalled(); - }); -}); - diff --git a/prover_v2/src/modules/processors/base/Master.ts b/prover_v2/src/modules/processors/base/Master.ts deleted file mode 100644 index a4962303..00000000 --- a/prover_v2/src/modules/processors/base/Master.ts +++ /dev/null @@ -1,121 +0,0 @@ -import { Job, Worker } from "bullmq"; -import type { ConnectionOptions } from "bullmq"; -import logger from "../../../logger.js"; - -export interface MasterConfig { - // queue name (same as Worker queue name) - queueName: string; - // label for logs (e.g. "block-prover", "aggregator", "settler") - workerLabel: string; - connection: ConnectionOptions; - workerCount: number; - lockDurationMs: number; - stalledIntervalMs: number; - // process a single job (called by each worker) - processJob: ( - workerId: number, - job: Job, - ) => Promise; - // called when a job fails (e.g. increment fail count) - onJobFailed?: ( - job: Job | undefined, - ) => Promise; -} - -export abstract class Master { - protected readonly config: MasterConfig; - protected readonly workers: Worker[] = []; - - constructor(config: MasterConfig) { - this.config = config; - } - - protected abstract handleTask(): Promise; - - protected async createWorker( - workerId: number, - ): Promise> { - const { - queueName, - workerLabel, - connection, - lockDurationMs, - stalledIntervalMs, - processJob, - onJobFailed, - } = this.config; - - const worker = new Worker( - queueName, - async (job) => { - logger.info( - `${workerLabel} worker ${workerId} started job ${job.id} for job data`, - { jobId: job.id, data: job.data }, - ); - await processJob(workerId, job); - logger.info( - `${workerLabel} worker ${workerId} finished job ${job.id}`, - { jobId: job.id }, - ); - }, - { - connection, - concurrency: 1, - lockDuration: lockDurationMs, - stalledInterval: stalledIntervalMs, - }, - ); - - worker.on("completed", (job) => { - logger.info( - `${workerLabel} worker ${workerId} completed job ${job.id}`, - { jobId: job?.id }, - ); - }); - - worker.on("failed", async (job, err) => { - if (onJobFailed && job) await onJobFailed(job); - logger.error( - `${workerLabel} worker ${workerId} failed job ${job?.id}`, - err as Error, - { jobId: job?.id, data: job?.data }, - ); - }); - - worker.on("error", (err) => { - logger.error( - `${workerLabel} worker ${workerId} error`, - err as Error, - ); - }); - - worker.on("closed", async () => { - logger.warn( - `${workerLabel} worker ${workerId} closed (crashed or manually closed), creating replacement`, - ); - const index = this.workers.indexOf(worker); - if (index !== -1) this.workers.splice(index, 1); - await this.createWorker(workerId); - }); - - this.workers.push(worker); - return worker; - } - - protected async initializeWorkers(): Promise { - const { workerCount, workerLabel } = this.config; - for (let i = 0; i < workerCount; i++) { - await this.createWorker(i); - } - logger.info( - `Initialized ${workerCount} workers for ${workerLabel} queue`, - ); - } - - async run(): Promise { - await this.initializeWorkers(); - while (true) { - await this.handleTask(); - } - } -} diff --git a/prover_v2/src/modules/processors/base/index.ts b/prover_v2/src/modules/processors/base/index.ts deleted file mode 100644 index 667baa2b..00000000 --- a/prover_v2/src/modules/processors/base/index.ts +++ /dev/null @@ -1 +0,0 @@ -export { Master, type MasterConfig } from "./Master.js"; diff --git a/prover_v2/src/modules/processors/block-prover/index.ts b/prover_v2/src/modules/processors/block-prover/index.ts index 6439807d..982b35f0 100644 --- a/prover_v2/src/modules/processors/block-prover/index.ts +++ b/prover_v2/src/modules/processors/block-prover/index.ts @@ -1,3 +1 @@ -import { masterRunner } from "./master"; - -export { masterRunner }; +export { worker } from "./worker.js"; diff --git a/prover_v2/src/modules/processors/block-prover/master.test.ts b/prover_v2/src/modules/processors/block-prover/master.test.ts deleted file mode 100644 index 3bfcf113..00000000 --- a/prover_v2/src/modules/processors/block-prover/master.test.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { MASTER_SLEEP_INTERVAL_MS } from "../../utils/constants.js"; - -vi.mock("../../db/index.js", () => ({ - BlockEpochModel: { - findOneAndUpdate: vi.fn(), - updateOne: vi.fn(), - }, - incrementBlockEpochFailCount: vi.fn(), -})); - -vi.mock("../utils/queue.js", () => ({ - blockProverQ: { - add: vi.fn(), - }, -})); - -vi.mock("../utils/workerConnection.js", () => ({ - connection: {}, -})); - -vi.mock("./worker.js", () => ({ - worker: vi.fn(), -})); - -vi.mock("../../utils/functions.js", () => ({ - sleep: vi.fn(), -})); - -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -import { BlockEpochModel } from "../../db/index.js"; -import { blockProverQ } from "../utils/queue.js"; -import { sleep } from "../../utils/functions.js"; -import { BlockProverMaster } from "./master.js"; - -describe("block-prover master", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("queues exactly one job when epoch found", async () => { - vi.mocked(BlockEpochModel.findOneAndUpdate).mockResolvedValue({ - height: 8, - } as any); - - const m = new BlockProverMaster() as any; - await m.handleTask(); - - expect(blockProverQ.add).toHaveBeenCalledTimes(1); - expect(blockProverQ.add).toHaveBeenCalledWith("block-prover", { - height: 8, - }); - expect(sleep).not.toHaveBeenCalled(); - }); - - it("sleeps when no epoch", async () => { - vi.mocked(BlockEpochModel.findOneAndUpdate).mockResolvedValue( - null as any, - ); - - const m = new BlockProverMaster() as any; - await m.handleTask(); - - expect(blockProverQ.add).not.toHaveBeenCalled(); - expect(sleep).toHaveBeenCalledWith(MASTER_SLEEP_INTERVAL_MS); - }); - - it("rolls back epochStatus when queue add fails", async () => { - vi.mocked(BlockEpochModel.findOneAndUpdate).mockResolvedValue({ - height: 8, - } as any); - vi.mocked(blockProverQ.add).mockRejectedValueOnce( - new Error("queue error"), - ); - - const m = new BlockProverMaster() as any; - await expect(m.handleTask()).rejects.toThrow("queue error"); - - expect(BlockEpochModel.updateOne).toHaveBeenCalledWith( - { height: 8, epochStatus: "processing" }, - { $set: { epochStatus: "waiting" } }, - ); - }); -}); diff --git a/prover_v2/src/modules/processors/block-prover/master.ts b/prover_v2/src/modules/processors/block-prover/master.ts deleted file mode 100644 index aec9e840..00000000 --- a/prover_v2/src/modules/processors/block-prover/master.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { - WORKER_COUNT, - WORKER_TIMEOUT_MS, - STALLED_INTERVAL_MS, - MASTER_SLEEP_INTERVAL_MS, -} from "../../utils/constants.js"; -import { BlockEpochModel, incrementBlockEpochFailCount } from "../../db/index.js"; -import { Master } from "../base/Master.js"; -import { connection } from "../utils/workerConnection.js"; -import { blockProverQ } from "../utils/queue.js"; -import { BlockProverJob } from "../utils/jobs.js"; -import { sleep } from "../../utils/functions.js"; -import logger from "../../../logger.js"; -import { worker as processTask } from "./worker.js"; - -export class BlockProverMaster extends Master { - constructor() { - super({ - queueName: "block-prover", - workerLabel: "Block-prover", - connection, - workerCount: WORKER_COUNT, - lockDurationMs: WORKER_TIMEOUT_MS, - stalledIntervalMs: STALLED_INTERVAL_MS, - processJob: async (_, job) => { - await processTask({ height: job.data.height }); - }, - onJobFailed: async (job) => { - if (job?.data.height) { - await incrementBlockEpochFailCount(job.data.height); - } - }, - }); - } - - protected async handleTask(): Promise { - const epoch = await BlockEpochModel.findOneAndUpdate( - { - blocks: { $not: { $elemMatch: { $eq: null } } }, - epochStatus: { $eq: "waiting" }, - }, - { - $set: { epochStatus: "processing" }, - }, - { - sort: { height: 1 }, - new: true, - }, - ); - - if (epoch) { - try { - await blockProverQ.add("block-prover", { height: epoch.height }); - logger.debug( - `Pushed epoch task to queue: epoch starting at height ${epoch.height}`, - { epochHeight: epoch.height, event: "epoch_task_queued" }, - ); - } catch (error) { - await BlockEpochModel.updateOne( - { height: epoch.height, epochStatus: "processing" }, - { $set: { epochStatus: "waiting" } }, - ); - throw error; - } - } else { - await sleep(MASTER_SLEEP_INTERVAL_MS); - } - } -} - -export async function masterRunner() { - const master = new BlockProverMaster(); - await master.run(); -} diff --git a/prover_v2/src/modules/processors/block-prover/utils.ts b/prover_v2/src/modules/processors/block-prover/utils.ts deleted file mode 100644 index 3806c106..00000000 --- a/prover_v2/src/modules/processors/block-prover/utils.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { BlockStatus } from "../../db/types.js"; -import logger from "../../../logger.js"; -import { BlockEpochModel } from "../../db/models/blockEpoch/BlockEpoch.js"; - -async function registerBlock(blockEpochHeight: number, index: number) { - const result = await BlockEpochModel.updateOne( - { - height: blockEpochHeight, - [`status.${index}`]: "waiting" as BlockStatus, - }, - { $set: { [`status.${index}`]: "processing" as BlockStatus } }, - ); - - if (!result.matchedCount) { - throw new Error( - `Block at index ${index} in epoch ${blockEpochHeight} is not in 'waiting' status.`, - ); - } - - logger.info( - `Registered block at index ${index} in epoch ${blockEpochHeight} as 'processing'.`, - ); -} - -export { registerBlock }; diff --git a/prover_v2/src/modules/processors/block-prover/worker.test.ts b/prover_v2/src/modules/processors/block-prover/worker.test.ts deleted file mode 100644 index b2735805..00000000 --- a/prover_v2/src/modules/processors/block-prover/worker.test.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; - -vi.mock("mongoose", () => { - const startSession = vi.fn(async () => ({ - withTransaction: async (fn: any) => await fn(), - endSession: async () => {}, - })); - return { default: { startSession }, startSession }; -}); - -vi.mock("../../db/index.js", () => ({ - ProofEpochModel: { - findOne: vi.fn(), - }, - BlockEpochModel: { - findOne: vi.fn(), - findOneAndUpdate: vi.fn(), - }, - storeProof: vi.fn(), - fetchBlockRange: vi.fn(), -})); - -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -import { BlockEpochModel, ProofEpochModel, fetchBlockRange, storeProof } from "../../db/index.js"; -import { worker } from "./worker.js"; - -describe("block-prover worker", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("throws when epoch not found", async () => { - vi.mocked(BlockEpochModel.findOne).mockResolvedValue(null as any); - - await expect(worker({ height: 8 } as any)).rejects.toThrow( - "BlockEpoch at height 8 not found.", - ); - }); - - it("skips proof generation when proofs already exist after failures", async () => { - vi.mocked(BlockEpochModel.findOne).mockResolvedValue({ - height: 8, - failCount: 1, - epochStatus: "processing", - } as any); - vi.mocked(ProofEpochModel.findOne).mockResolvedValue({ - height: 8, - kind: "blockProof", - proofs: [1, null], - } as any); - - await worker({ height: 8 } as any); - - expect(fetchBlockRange).not.toHaveBeenCalled(); - expect(storeProof).not.toHaveBeenCalled(); - expect(BlockEpochModel.findOneAndUpdate).not.toHaveBeenCalled(); - }); -}); - diff --git a/prover_v2/src/modules/processors/block-prover/worker.ts b/prover_v2/src/modules/processors/block-prover/worker.ts index 5603dffe..dd8fa365 100644 --- a/prover_v2/src/modules/processors/block-prover/worker.ts +++ b/prover_v2/src/modules/processors/block-prover/worker.ts @@ -1,20 +1,15 @@ -import { Types } from "mongoose"; -import mongoose from "mongoose"; - import { ProofEpochModel, - BlockEpochModel, - storeProof, fetchBlockRange, + storeProof, } from "../../db/index.js"; import { - WORKER_TIMEOUT_MS, BLOCK_EPOCH_SIZE, PROOF_EPOCH_LEAF_COUNT, } from "../../utils/constants.js"; -import { BlockStatus, ProofKind, ProofStatus } from "../../db/types.js"; import logger from "../../../logger.js"; import { BlockProverJob } from "../utils/jobs.js"; +import { tryEnqueueAggregation } from "../triggers.js"; import { GeneratePulsarBlock, GenerateSettlementProof, @@ -24,51 +19,44 @@ import { Field, PublicKey, Signature } from "o1js"; export async function worker(task: BlockProverJob) { const blockEpochHeight = task.height; + const leafIndex = + (blockEpochHeight / BLOCK_EPOCH_SIZE) % PROOF_EPOCH_LEAF_COUNT; + + // Idempotency: skip if leaf proof already exists + const existing = await ProofEpochModel.findOne({ + height: blockEpochHeight, + }); + if (existing?.proofs[leafIndex]) { + logger.info( + `Block proof for epoch ${blockEpochHeight} already exists at leaf ${leafIndex}, skipping`, + ); + // Still trigger next stage in case it was missed + await tryEnqueueAggregation(existing, leafIndex); + return; + } - const session = await mongoose.startSession(); - try { - await session.withTransaction(async () => { - const epoch = await BlockEpochModel.findOne({ + const proofId = await createProof(blockEpochHeight); + + const proofEpoch = await ProofEpochModel.findOneAndUpdate( + { height: blockEpochHeight }, + { + $setOnInsert: { height: blockEpochHeight, - epochStatus: { $eq: "processing" as BlockStatus }, - }); - - if (!epoch) { - throw new Error( - `BlockEpoch at height ${blockEpochHeight} not found.`, - ); - } - - if (epoch.failCount > 0) { - const proofEpoch = await ProofEpochModel.findOne({ - height: blockEpochHeight, - kind: "blockProof" as ProofKind, - }); - - if (proofEpoch && proofEpoch.proofs.some((p) => p !== null)) { - logger.info( - `Skipping block proof generation for epoch starting at height ${blockEpochHeight} because proofs already exist after previous failures.`, - ); - return; - } - } - - const proofId = await createProof(blockEpochHeight); - - await createOrUpdateProofEpoch(epoch.height, proofId); - - await BlockEpochModel.findOneAndUpdate( - { height: blockEpochHeight }, - { $set: { epochStatus: "done" as BlockStatus } }, - ); - - logger.info( - `Processed block epoch starting at height ${blockEpochHeight} and stored proofs in proof epochs.`, - ); - }); - } finally { - await session.endSession(); - } + proofs: Array(PROOF_EPOCH_LEAF_COUNT * 2 - 1).fill(null), + settled: false, + }, + $set: { + [`proofs.${leafIndex}`]: proofId, + }, + }, + { upsert: true, new: true }, + ); + + logger.info( + `Stored block proof for epoch ${blockEpochHeight} at leaf index ${leafIndex}`, + ); + + await tryEnqueueAggregation(proofEpoch, leafIndex); } async function createProof(height: number) { @@ -79,11 +67,7 @@ async function createProof(height: number) { if (blockDocs.length !== BLOCK_EPOCH_SIZE) { throw new Error( - `Expected ${ - BLOCK_EPOCH_SIZE - } blocks for proof starting at height ${height}, but got ${ - blockDocs.length - }`, + `Expected ${BLOCK_EPOCH_SIZE} blocks for proof starting at height ${height}, but got ${blockDocs.length}`, ); } @@ -126,37 +110,3 @@ async function createProof(height: number) { return proofId; } - -/** - * Creates a new block epoch document if it does not exist and sets the block at the given height - */ -async function createOrUpdateProofEpoch( - height: number, - proofId: Types.ObjectId, -) { - const result = await ProofEpochModel.findOneAndUpdate( - { height: height }, - { - $setOnInsert: { - height: height, - kind: "blockProof" as ProofKind, - proofs: Array(PROOF_EPOCH_LEAF_COUNT * 2 - 1).fill(null), - status: Array(PROOF_EPOCH_LEAF_COUNT - 1).fill( - "waiting" as ProofStatus, - ), - failCount: 0, - timeoutAt: new Date(Date.now() + WORKER_TIMEOUT_MS), - }, - $set: { - [`proofs.${height % BLOCK_EPOCH_SIZE}`]: proofId, - }, - }, - { upsert: true, new: true }, - ); - - logger.info( - `Created proof epoch for first height ${height} with proof for block ${height}`, - ); - - return result; -} diff --git a/prover_v2/src/modules/processors/pipeline.ts b/prover_v2/src/modules/processors/pipeline.ts new file mode 100644 index 00000000..44c3f1cc --- /dev/null +++ b/prover_v2/src/modules/processors/pipeline.ts @@ -0,0 +1,101 @@ +import { Worker } from "bullmq"; + +import { + WORKER_COUNT, + WORKER_TIMEOUT_MS, + STALLED_INTERVAL_MS, +} from "../utils/constants.js"; +import { connection } from "./utils/workerConnection.js"; +import { BlockProverJob, AggregatorJob, SettlerJob } from "./utils/jobs.js"; +import { worker as processBlockProver } from "./block-prover/worker.js"; +import { worker as processAggregation } from "./aggregator/worker.js"; +import { worker as processSettlement } from "./settler/worker.js"; +import { runStartupRecovery } from "./recovery.js"; +import logger from "../../logger.js"; + +export class PipelineManager { + private workers: Worker[] = []; + + async start(): Promise { + await runStartupRecovery(); + + this.createWorkers( + "block-prover", + WORKER_COUNT, + async (job) => processBlockProver(job.data), + ); + + this.createWorkers( + "aggregator", + WORKER_COUNT, + async (job) => processAggregation(job.data), + ); + + this.createWorkers( + "settler", + 2, + async (job) => processSettlement(job.data), + ); + + logger.info( + `Pipeline started: ${WORKER_COUNT} block-prover, ${WORKER_COUNT} aggregator, 2 settler workers`, + ); + } + + private createWorkers( + queueName: string, + count: number, + processor: (job: any) => Promise, + ): void { + for (let i = 0; i < count; i++) { + const worker = new Worker(queueName, processor, { + connection, + concurrency: 1, + lockDuration: WORKER_TIMEOUT_MS, + stalledInterval: STALLED_INTERVAL_MS, + }); + + worker.on("completed", (job) => { + logger.info(`${queueName} worker ${i} completed job ${job.id}`); + }); + + worker.on("failed", (job, err) => { + logger.error( + `${queueName} worker ${i} failed job ${job?.id} (attempt ${job?.attemptsMade}/${job?.opts.attempts})`, + err as Error, + { jobId: job?.id, data: job?.data }, + ); + }); + + worker.on("error", (err) => { + logger.error( + `${queueName} worker ${i} error`, + err as Error, + ); + }); + + this.workers.push(worker); + } + } + + async shutdown(): Promise { + logger.info("Shutting down pipeline workers..."); + await Promise.all(this.workers.map((w) => w.close())); + logger.info("All pipeline workers shut down."); + } +} + +export async function startPipeline(): Promise { + const manager = new PipelineManager(); + + const shutdown = async () => { + logger.info("Received shutdown signal"); + await manager.shutdown(); + process.exit(0); + }; + + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); + + await manager.start(); +} diff --git a/prover_v2/src/modules/processors/recovery.ts b/prover_v2/src/modules/processors/recovery.ts new file mode 100644 index 00000000..f4202228 --- /dev/null +++ b/prover_v2/src/modules/processors/recovery.ts @@ -0,0 +1,113 @@ +import { BlockEpochModel, ProofEpochModel } from "../db/index.js"; +import { + BLOCK_EPOCH_SIZE, + PROOF_EPOCH_LEAF_COUNT, + PROOF_EPOCH_SETTLEMENT_INDEX, +} from "../utils/constants.js"; +import { blockProverQ, aggregatorQ, settlerQ } from "./utils/queue.js"; +import { + DEFAULT_JOB_OPTIONS, + blockProverJobId, + aggregatorJobId, + settlerJobId, +} from "./utils/jobOptions.js"; +import logger from "../../logger.js"; + +/** + * Startup recovery sweep. + * + * Scans MongoDB for work that should have been enqueued but wasn't + * (e.g., server crashed between proof storage and job enqueue). + * + * Safe to run at every startup because deterministic job IDs prevent duplicates. + */ +export async function runStartupRecovery(): Promise { + logger.info("Running startup recovery sweep..."); + + let blockProverJobs = 0; + let aggregatorJobs = 0; + let settlerJobs = 0; + + // 1. Find full BlockEpochs that don't have a completed ProofEpoch + const fullBlockEpochs = await BlockEpochModel.find({ + blocks: { $not: { $elemMatch: { $eq: null } } }, + }); + + for (const epoch of fullBlockEpochs) { + const leafIndex = + (epoch.height / BLOCK_EPOCH_SIZE) % PROOF_EPOCH_LEAF_COUNT; + const proofEpoch = await ProofEpochModel.findOne({ + height: epoch.height, + }); + + if (!proofEpoch || !proofEpoch.proofs[leafIndex]) { + await blockProverQ.add( + "block-prover", + { height: epoch.height }, + { + jobId: blockProverJobId(epoch.height), + ...DEFAULT_JOB_OPTIONS, + }, + ); + blockProverJobs++; + } + } + + // 2. Find ProofEpochs with incomplete aggregations + const proofEpochs = await ProofEpochModel.find({ settled: false }); + + for (const pe of proofEpochs) { + // Check sibling pairs for missing parent aggregations + for (let i = 0; i < pe.proofs.length - 1; i += 2) { + if (pe.proofs[i] && pe.proofs[i + 1]) { + const parentIndex = + PROOF_EPOCH_LEAF_COUNT + Math.floor(i / 2); + + if ( + parentIndex <= PROOF_EPOCH_SETTLEMENT_INDEX && + !pe.proofs[parentIndex] + ) { + const aggIndex = parentIndex - PROOF_EPOCH_LEAF_COUNT; + await aggregatorQ.add( + "aggregator", + { + height: pe.height, + index: aggIndex, + left: pe.proofs[i]!.toString(), + right: pe.proofs[i + 1]!.toString(), + }, + { + jobId: aggregatorJobId(pe.height, aggIndex), + ...DEFAULT_JOB_OPTIONS, + }, + ); + aggregatorJobs++; + } + } + } + + // 3. Check if root proof exists but not settled + if (pe.proofs[PROOF_EPOCH_SETTLEMENT_INDEX] && !pe.settled) { + await settlerQ.add( + "settler", + { + height: pe.height, + settlementProofId: + pe.proofs[PROOF_EPOCH_SETTLEMENT_INDEX]!.toString(), + }, + { + jobId: settlerJobId(pe.height), + ...DEFAULT_JOB_OPTIONS, + }, + ); + settlerJobs++; + } + } + + logger.info("Startup recovery sweep completed", { + blockProverJobs, + aggregatorJobs, + settlerJobs, + event: "recovery_sweep_complete", + }); +} diff --git a/prover_v2/src/modules/processors/settler/index.ts b/prover_v2/src/modules/processors/settler/index.ts index 6439807d..982b35f0 100644 --- a/prover_v2/src/modules/processors/settler/index.ts +++ b/prover_v2/src/modules/processors/settler/index.ts @@ -1,3 +1 @@ -import { masterRunner } from "./master"; - -export { masterRunner }; +export { worker } from "./worker.js"; diff --git a/prover_v2/src/modules/processors/settler/master.test.ts b/prover_v2/src/modules/processors/settler/master.test.ts deleted file mode 100644 index f06c916b..00000000 --- a/prover_v2/src/modules/processors/settler/master.test.ts +++ /dev/null @@ -1,119 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { Types } from "mongoose"; -import { - MASTER_SLEEP_INTERVAL_MS, - PROOF_EPOCH_SETTLEMENT_INDEX, -} from "../../utils/constants.js"; - -vi.mock("../../db/index.js", () => ({ - ProofEpochModel: { - findOneAndUpdate: vi.fn(), - updateOne: vi.fn(), - }, - incrementProofEpochFailCount: vi.fn(), -})); - -vi.mock("../utils/queue.js", () => ({ - settlerQ: { - add: vi.fn(), - }, -})); - -vi.mock("../utils/workerConnection.js", () => ({ - connection: {}, -})); - -vi.mock("./worker.js", () => ({ - worker: vi.fn(), -})); - -vi.mock("../../utils/functions.js", () => ({ - sleep: vi.fn(), -})); - -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -import { ProofEpochModel } from "../../db/index.js"; -import { settlerQ } from "../utils/queue.js"; -import { sleep } from "../../utils/functions.js"; -import { SettlerMaster } from "./master.js"; - -describe("settler master", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("queues settlement job when epoch found", async () => { - const settlementProofId = new Types.ObjectId(); - const proofs = Array(PROOF_EPOCH_SETTLEMENT_INDEX + 1).fill(null); - proofs[PROOF_EPOCH_SETTLEMENT_INDEX] = settlementProofId; - - vi.mocked(ProofEpochModel.findOneAndUpdate).mockResolvedValue({ - height: 20, - proofs, - kind: "blockProof", - } as any); - - const m = new SettlerMaster() as any; - await m.handleTask(); - - expect(settlerQ.add).toHaveBeenCalledWith("settler", { - height: 20, - settlementProofId: settlementProofId.toString(), - }); - expect(sleep).not.toHaveBeenCalled(); - }); - - it("sleeps when no epoch", async () => { - vi.mocked(ProofEpochModel.findOneAndUpdate).mockResolvedValue(null as any); - - const m = new SettlerMaster() as any; - await m.handleTask(); - - expect(settlerQ.add).not.toHaveBeenCalled(); - expect(sleep).toHaveBeenCalledWith(MASTER_SLEEP_INTERVAL_MS); - }); - - it("rolls back kind when queue add fails", async () => { - const settlementProofId = new Types.ObjectId(); - const proofs = Array(PROOF_EPOCH_SETTLEMENT_INDEX + 1).fill(null); - proofs[PROOF_EPOCH_SETTLEMENT_INDEX] = settlementProofId; - - vi.mocked(ProofEpochModel.findOneAndUpdate).mockResolvedValue({ - height: 20, - proofs, - kind: "blockProof", - } as any); - vi.mocked(settlerQ.add).mockRejectedValueOnce(new Error("queue error")); - - const m = new SettlerMaster() as any; - await expect(m.handleTask()).rejects.toThrow("queue error"); - - expect(ProofEpochModel.updateOne).toHaveBeenCalledWith( - { height: 20, kind: "settlement" }, - { $set: { kind: "blockProof" } }, - ); - }); - - it("sleeps when settlement proof id is missing", async () => { - const proofs = Array(PROOF_EPOCH_SETTLEMENT_INDEX + 1).fill(null); - vi.mocked(ProofEpochModel.findOneAndUpdate).mockResolvedValue({ - height: 20, - proofs, - } as any); - - const m = new SettlerMaster() as any; - await m.handleTask(); - - expect(settlerQ.add).not.toHaveBeenCalled(); - expect(sleep).toHaveBeenCalledWith(MASTER_SLEEP_INTERVAL_MS); - }); -}); - diff --git a/prover_v2/src/modules/processors/settler/master.ts b/prover_v2/src/modules/processors/settler/master.ts deleted file mode 100644 index 4fc236fd..00000000 --- a/prover_v2/src/modules/processors/settler/master.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { - PROOF_EPOCH_SETTLEMENT_INDEX, - WORKER_COUNT, - WORKER_TIMEOUT_MS, - STALLED_INTERVAL_MS, - MASTER_SLEEP_INTERVAL_MS, -} from "../../utils/constants.js"; -import { ProofKind } from "../../db/types.js"; -import { - incrementProofEpochFailCount, - ProofEpochModel, -} from "../../db/index.js"; -import { Master } from "../base/Master.js"; -import { settlerQ } from "../utils/queue.js"; -import { SettlerJob } from "../utils/jobs.js"; -import { connection } from "../utils/workerConnection.js"; -import { worker as processSettlement } from "./worker.js"; -import { sleep } from "../../utils/functions.js"; -import logger from "../../../logger.js"; - -export class SettlerMaster extends Master { - constructor() { - super({ - queueName: "settler", - workerLabel: "Settler", - connection, - workerCount: WORKER_COUNT, - lockDurationMs: WORKER_TIMEOUT_MS, - stalledIntervalMs: STALLED_INTERVAL_MS, - processJob: async (_workerId, job) => { - await processSettlement(job.data); - }, - onJobFailed: async (job) => { - if (job?.data.height) { - await incrementProofEpochFailCount(job.data.height); - } - }, - }); - } - - protected async handleTask(): Promise { - const epoch = await ProofEpochModel.findOneAndUpdate( - { - [`proofs.${PROOF_EPOCH_SETTLEMENT_INDEX}`]: { $ne: null }, - kind: { $nin: ["settlement", "done"] as ProofKind[] }, - timeoutAt: { $gt: new Date() }, - }, - { - $set: { kind: "settlement" as ProofKind }, - }, - { - sort: { timeoutAt: 1 }, - new: false, - }, - ); - - if (epoch) { - const settlementProofId = - epoch.proofs[PROOF_EPOCH_SETTLEMENT_INDEX]; - if (!settlementProofId) { - await sleep(MASTER_SLEEP_INTERVAL_MS); - return; - } - - try { - await settlerQ.add("settler", { - height: epoch.height, - settlementProofId: settlementProofId.toString(), - }); - logger.debug( - `Pushed settler job to queue for epoch at height ${epoch.height}`, - { epochHeight: epoch.height, event: "settler_task_queued" }, - ); - } catch (error) { - await ProofEpochModel.updateOne( - { height: epoch.height, kind: "settlement" as ProofKind }, - { $set: { kind: epoch.kind } }, - ); - throw error; - } - } else { - await sleep(MASTER_SLEEP_INTERVAL_MS); - } - } -} - -export async function masterRunner() { - const master = new SettlerMaster(); - await master.run(); -} diff --git a/prover_v2/src/modules/processors/settler/worker.test.ts b/prover_v2/src/modules/processors/settler/worker.test.ts deleted file mode 100644 index 2fba754d..00000000 --- a/prover_v2/src/modules/processors/settler/worker.test.ts +++ /dev/null @@ -1,123 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; - -vi.mock("../../db/models/proofEpoch/ProofEpoch.js", () => ({ - ProofEpochModel: { - findOne: vi.fn(), - findOneAndUpdate: vi.fn(), - }, -})); - -vi.mock("../../db/models/proof/utils.js", () => ({ - getProof: vi.fn(), -})); - -vi.mock("pulsar-contracts", () => ({ - SettlementContract: function SettlementContract(this: any) { - this.settle = vi.fn().mockResolvedValue(undefined); - }, - SettlementProof: { - fromJSON: vi.fn(async () => ({})), - }, -})); - -vi.mock("o1js", () => ({ - Mina: { - Network: vi.fn(() => ({})), - setActiveInstance: vi.fn(), - }, - PublicKey: { - fromBase58: vi.fn(() => ({})), - }, - fetchAccount: vi.fn().mockResolvedValue(undefined), -})); - -vi.mock("dotenv", () => ({ - default: { config: vi.fn() }, -})); - -vi.mock("../../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -import { ProofEpochModel } from "../../db/models/proofEpoch/ProofEpoch.js"; -import { getProof } from "../../db/models/proof/utils.js"; -import { worker } from "./worker.js"; - -describe("settler worker", () => { - beforeEach(() => { - vi.clearAllMocks(); - process.env.CONTRACT_ADDRESS = "B62qtest"; - process.env.REMOTE_SERVER_URL = "remote"; - }); - - it("throws when epoch not found", async () => { - vi.mocked(ProofEpochModel.findOne).mockResolvedValue(null as any); - - await expect( - worker({ height: 10, settlementProofId: "507f1f77bcf86cd799439011" } as any), - ).rejects.toThrow("ProofEpoch at height 10 not found."); - }); - - it("skips when already done after failure", async () => { - vi.mocked(ProofEpochModel.findOne).mockResolvedValue({ - height: 10, - failCount: 1, - kind: "done", - } as any); - - await worker({ height: 10, settlementProofId: "507f1f77bcf86cd799439011" } as any); - - expect(getProof).not.toHaveBeenCalled(); - expect(ProofEpochModel.findOneAndUpdate).not.toHaveBeenCalled(); - }); - - it("throws when settlement proof is missing", async () => { - vi.mocked(ProofEpochModel.findOne).mockResolvedValue({ - height: 10, - failCount: 0, - kind: "settlement", - } as any); - vi.mocked(getProof).mockResolvedValue(null as any); - - await expect( - worker({ height: 10, settlementProofId: "507f1f77bcf86cd799439011" } as any), - ).rejects.toThrow("Settlement proof is missing."); - }); - - it("throws when contract address missing", async () => { - delete process.env.CONTRACT_ADDRESS; - vi.mocked(ProofEpochModel.findOne).mockResolvedValue({ - height: 10, - failCount: 0, - kind: "settlement", - } as any); - vi.mocked(getProof).mockResolvedValue({} as any); - - await expect( - worker({ height: 10, settlementProofId: "507f1f77bcf86cd799439011" } as any), - ).rejects.toThrow("Contract address is not specified"); - }); - - it("sets epoch done after successful settlement", async () => { - vi.mocked(ProofEpochModel.findOne).mockResolvedValue({ - height: 10, - failCount: 0, - kind: "settlement", - } as any); - vi.mocked(getProof).mockResolvedValue({} as any); - vi.mocked(ProofEpochModel.findOneAndUpdate).mockResolvedValue({} as any); - - await worker({ height: 10, settlementProofId: "507f1f77bcf86cd799439011" } as any); - - expect(ProofEpochModel.findOneAndUpdate).toHaveBeenCalledWith( - { height: 10, kind: "settlement" }, - { $set: { kind: "done" } }, - ); - }); -}); - diff --git a/prover_v2/src/modules/processors/settler/worker.ts b/prover_v2/src/modules/processors/settler/worker.ts index e68b8833..14fa7b7c 100644 --- a/prover_v2/src/modules/processors/settler/worker.ts +++ b/prover_v2/src/modules/processors/settler/worker.ts @@ -1,7 +1,6 @@ import { Types } from "mongoose"; import { ProofEpochModel } from "../../db/models/proofEpoch/ProofEpoch.js"; import { getProof } from "../../db/models/proof/utils.js"; -import { ProofKind } from "../../db/types.js"; import { SettlementContract, SettlementProof } from "pulsar-contracts"; import { Mina, PublicKey, fetchAccount } from "o1js"; import dotenv from "dotenv"; @@ -16,9 +15,10 @@ export async function worker(task: SettlerJob) { throw new Error(`ProofEpoch at height ${task.height} not found.`); } - if (epoch.failCount > 0 && epoch.kind === "done") { + // Idempotency: skip if already settled + if (epoch.settled) { logger.info( - `Skipping settlement for epoch at height ${task.height} because it is already marked as done.`, + `Skipping settlement for epoch at height ${task.height} because it is already settled.`, ); return; } @@ -52,43 +52,18 @@ export async function worker(task: SettlerJob) { await fetchAccount({ publicKey: contractInstance.address }); - await contractInstance - .settle(settlementProof) - .then(async () => { - logger.info( - `Settlement proof for epoch at height ${task.height} submitted to the contract.`, - ); + await contractInstance.settle(settlementProof); - await setProofEpochDone(task.height); - }) - .catch((error) => { - logger.error( - `Failed to submit settlement proof for epoch at height ${task.height}: ${error}`, - ); - throw error; - }); -} - -async function setProofEpochDone(height: number) { - const result = await ProofEpochModel.findOneAndUpdate( - { - height, - kind: "settlement" as ProofKind, - }, - { - $set: { - kind: "done" as ProofKind, - }, - }, + logger.info( + `Settlement proof for epoch at height ${task.height} submitted to the contract.`, ); - if (!result) { - throw new Error( - `Proof epoch at height ${height} not found or not in settlement state.`, - ); - } + await ProofEpochModel.updateOne( + { height: task.height }, + { $set: { settled: true } }, + ); logger.info( - `Proof epoch at height ${height} marked as done after settlement.`, + `Proof epoch at height ${task.height} marked as settled.`, ); } diff --git a/prover_v2/src/modules/processors/triggers.ts b/prover_v2/src/modules/processors/triggers.ts new file mode 100644 index 00000000..d2bd12ab --- /dev/null +++ b/prover_v2/src/modules/processors/triggers.ts @@ -0,0 +1,99 @@ +import { Types } from "mongoose"; + +import { PROOF_EPOCH_LEAF_COUNT, PROOF_EPOCH_SETTLEMENT_INDEX } from "../utils/constants.js"; +import { aggregatorQ, settlerQ } from "./utils/queue.js"; +import { DEFAULT_JOB_OPTIONS, aggregatorJobId, settlerJobId } from "./utils/jobOptions.js"; +import type { IProofEpoch } from "../db/index.js"; +import logger from "../../logger.js"; + +/** + * After a proof is stored at `completedIndex` in the ProofEpoch, + * check if its sibling also exists. If so, enqueue the aggregation job + * that merges them into the parent node. + * + * Works for both leaf proofs (indices 0..LEAF_COUNT-1) and + * internal aggregated proofs (indices LEAF_COUNT..SETTLEMENT_INDEX-1). + */ +export async function tryEnqueueAggregation( + proofEpoch: IProofEpoch, + completedIndex: number, +): Promise { + const siblingIndex = + completedIndex % 2 === 0 ? completedIndex + 1 : completedIndex - 1; + + if (!proofEpoch.proofs[siblingIndex]) { + return; + } + + const parentProofIndex = + PROOF_EPOCH_LEAF_COUNT + Math.floor(completedIndex / 2); + + if (parentProofIndex > PROOF_EPOCH_SETTLEMENT_INDEX) { + return; + } + + const aggregationIndex = parentProofIndex - PROOF_EPOCH_LEAF_COUNT; + + const leftIndex = Math.min(completedIndex, siblingIndex); + const rightIndex = Math.max(completedIndex, siblingIndex); + + const leftId = proofEpoch.proofs[leftIndex] as Types.ObjectId; + const rightId = proofEpoch.proofs[rightIndex] as Types.ObjectId; + + await aggregatorQ.add( + "aggregator", + { + height: proofEpoch.height, + index: aggregationIndex, + left: leftId.toString(), + right: rightId.toString(), + }, + { + jobId: aggregatorJobId(proofEpoch.height, aggregationIndex), + ...DEFAULT_JOB_OPTIONS, + }, + ); + + logger.debug( + `Enqueued aggregation job for epoch ${proofEpoch.height}, index ${aggregationIndex}`, + { + epochHeight: proofEpoch.height, + aggregationIndex, + event: "aggregation_triggered", + }, + ); +} + +/** + * After the root proof is produced (at PROOF_EPOCH_SETTLEMENT_INDEX), + * enqueue the settler job to submit it on-chain. + */ +export async function tryEnqueueSettlement( + proofEpoch: IProofEpoch, +): Promise { + const rootProofId = proofEpoch.proofs[PROOF_EPOCH_SETTLEMENT_INDEX]; + + if (!rootProofId || proofEpoch.settled) { + return; + } + + await settlerQ.add( + "settler", + { + height: proofEpoch.height, + settlementProofId: rootProofId.toString(), + }, + { + jobId: settlerJobId(proofEpoch.height), + ...DEFAULT_JOB_OPTIONS, + }, + ); + + logger.debug( + `Enqueued settler job for epoch ${proofEpoch.height}`, + { + epochHeight: proofEpoch.height, + event: "settlement_triggered", + }, + ); +} diff --git a/prover_v2/src/modules/processors/utils/jobOptions.ts b/prover_v2/src/modules/processors/utils/jobOptions.ts new file mode 100644 index 00000000..638dcecc --- /dev/null +++ b/prover_v2/src/modules/processors/utils/jobOptions.ts @@ -0,0 +1,28 @@ +import type { JobsOptions } from "bullmq"; + +export const DEFAULT_JOB_OPTIONS: JobsOptions = { + attempts: 3, + backoff: { + type: "exponential", + delay: 10_000, + }, + removeOnComplete: { + age: 24 * 3600, + count: 1000, + }, + removeOnFail: { + age: 7 * 24 * 3600, + }, +}; + +export function blockProverJobId(height: number): string { + return `bp:${height}`; +} + +export function aggregatorJobId(height: number, index: number): string { + return `agg:${height}:${index}`; +} + +export function settlerJobId(height: number): string { + return `settle:${height}`; +} diff --git a/prover_v2/src/modules/pulsar/utils.test.ts b/prover_v2/src/modules/pulsar/utils.test.ts deleted file mode 100644 index df707d15..00000000 --- a/prover_v2/src/modules/pulsar/utils.test.ts +++ /dev/null @@ -1,572 +0,0 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; -import { PublicKey } from "o1js"; -import { - parseTendermintBlockResponse, - decodeMinaSignature, - parseValidatorSetResponse, - computeValidatorListHash, - getLatestHeight, - getBlockData, - getMinaPubKeyFromCosmosAddress, - getCosmosValidatorSet, - getValidatorSet, - getVoteExt, - storePulsarBlock, -} from "./utils.js"; -import * as db from "../db/index.js"; - -vi.mock("../db/index.js"); -vi.mock("../../logger.js", () => ({ - default: { - info: vi.fn(), - error: vi.fn(), - warn: vi.fn(), - debug: vi.fn(), - }, -})); - -describe("pulsar utils", () => { - describe("parseTendermintBlockResponse", () => { - it("parses block header and returns blockHash, height, chainId", () => { - const res = { - block: { - header: { - height: "42", - chain_id: "pulsar-test", - proposer_address: "cosmos1abc", - app_hash: - "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", - time: { seconds: "1000", nanos: 0 }, - data_hash: "", - validators_hash: "", - consensus_hash: "", - evidence_hash: "", - last_commit_hash: "", - last_results_hash: "", - next_validators_hash: "", - }, - data: { txs: [] }, - last_commit: { signatures: [] }, - }, - block_id: { hash: "deadbeef" }, - }; - - const result = parseTendermintBlockResponse(res); - - expect(result.height).toBe(42); - expect(result.blockHash).toBe("0"); - expect(result.chainId).toBe("pulsar-test"); - expect(result.proposerAddress).toBe("cosmos1abc"); - expect(result.blockId).toBe("deadbeef"); - expect(result.txs).toEqual([]); - }); - - it("handles missing optional fields", () => { - const res = { - block: { - header: { - height: "1", - app_hash: - "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", - }, - data: {}, - last_commit: {}, - }, - }; - - const result = parseTendermintBlockResponse(res); - - expect(result.height).toBe(1); - expect(result.chainId).toBe(""); - expect(result.proposerAddress).toBe(""); - }); - }); - - describe("parseValidatorSetResponse", () => { - it("extracts validator addresses from response", () => { - const res = { - validators: [ - { address: "cosmos1addr1" }, - { address: "cosmos1addr2" }, - ], - }; - - const result = parseValidatorSetResponse(res); - - expect(result).toEqual(["cosmos1addr1", "cosmos1addr2"]); - }); - - it("returns empty array for empty validators", () => { - const res = { validators: [] }; - const result = parseValidatorSetResponse(res); - expect(result).toEqual([]); - }); - }); - - describe("decodeMinaSignature", () => { - it("decodes 64-byte hex to base58 signature", () => { - const rHex = "0".repeat(64); - const sHex = "1".repeat(64); - const sigHex = rHex + sHex; - - const result = decodeMinaSignature(sigHex); - - expect(typeof result).toBe("string"); - expect(result.length).toBeGreaterThan(0); - }); - }); - - describe("computeValidatorListHash", () => { - it("returns hash string for validator list", () => { - const validators = [ - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ]; - - const result = computeValidatorListHash(validators); - - expect(typeof result).toBe("string"); - expect(result.length).toBeGreaterThan(0); - }); - - it("returns same hash for same validators", () => { - const validators = [ - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ]; - - const a = computeValidatorListHash(validators); - const b = computeValidatorListHash(validators); - - expect(a).toBe(b); - }); - }); - - describe("getLatestHeight", () => { - it("returns latest block height from Tendermint client", async () => { - const mockTmClient = { - GetLatestBlock: vi.fn((req, callback) => { - callback(null, { - block: { - header: { - height: "100", - app_hash: - "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", - }, - data: { txs: [] }, - last_commit: { signatures: [] }, - }, - }); - }), - }; - - const height = await getLatestHeight(mockTmClient); - - expect(height).toBe(100); - expect(mockTmClient.GetLatestBlock).toHaveBeenCalledWith( - {}, - expect.any(Function), - ); - }); - - it("rejects on gRPC error", async () => { - const mockTmClient = { - GetLatestBlock: vi.fn((req, callback) => { - callback(new Error("gRPC error"), null); - }), - }; - - await expect(getLatestHeight(mockTmClient)).rejects.toThrow( - "gRPC error", - ); - }); - }); - - describe("getCosmosValidatorSet", () => { - it("returns validator addresses from Tendermint client", async () => { - const mockTmClient = { - GetValidatorSetByHeight: vi.fn((req, callback) => { - callback(null, { - validators: [ - { address: "cosmos1addr1" }, - { address: "cosmos1addr2" }, - ], - }); - }), - }; - - const validators = await getCosmosValidatorSet(mockTmClient, 100); - - expect(validators).toEqual(["cosmos1addr1", "cosmos1addr2"]); - expect(mockTmClient.GetValidatorSetByHeight).toHaveBeenCalledWith( - { height: "100" }, - expect.any(Function), - ); - }); - - it("rejects on gRPC error", async () => { - const mockTmClient = { - GetValidatorSetByHeight: vi.fn((req, callback) => { - callback(new Error("gRPC error"), null); - }), - }; - - await expect( - getCosmosValidatorSet(mockTmClient, 100), - ).rejects.toThrow("gRPC error"); - }); - }); - - describe("getMinaPubKeyFromCosmosAddress", () => { - it("retrieves Mina public key for Cosmos address", async () => { - const mockPubkey = PublicKey.fromBase58( - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ); - const mockMkClient = { - KeyStore: vi.fn((req, callback) => { - callback(null, { - keyStore: { - minaPublicKey: "encoded_address", - }, - }); - }), - GetMinaPubkey: vi.fn((req, callback) => { - callback(null, { - x: mockPubkey.toFields()[0].toString(), - is_odd: - mockPubkey.toFields()[1].toString() === "1" - ? "true" - : "false", - }); - }), - }; - - const pubkey = await getMinaPubKeyFromCosmosAddress( - mockMkClient, - "cosmos1addr", - ); - - expect(typeof pubkey).toBe("string"); - expect(pubkey.length).toBeGreaterThan(0); - expect(mockMkClient.KeyStore).toHaveBeenCalledWith( - { index: "cosmos1addr" }, - expect.any(Function), - ); - }); - - it("rejects when no Mina public key found", async () => { - const mockMkClient = { - KeyStore: vi.fn((req, callback) => { - callback(null, { - keyStore: {}, - }); - }), - }; - - await expect( - getMinaPubKeyFromCosmosAddress(mockMkClient, "cosmos1addr"), - ).rejects.toThrow("No Mina public key found"); - }); - }); - - describe("getValidatorSet", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("converts Cosmos validators to Mina public keys", async () => { - const mockPubkey = PublicKey.fromBase58( - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ); - const mockTmClient = { - GetValidatorSetByHeight: vi.fn((req, callback) => { - callback(null, { - validators: [ - { address: "cosmos1addr1" }, - { address: "cosmos1addr2" }, - ], - }); - }), - }; - const mockMkClient = { - KeyStore: vi.fn((req, callback) => { - callback(null, { - keyStore: { - minaPublicKey: "encoded_address", - }, - }); - }), - GetMinaPubkey: vi.fn((req, callback) => { - callback(null, { - x: mockPubkey.toFields()[0].toString(), - is_odd: - mockPubkey.toFields()[1].toString() === "1" - ? "true" - : "false", - }); - }), - }; - - const validators = await getValidatorSet( - mockTmClient, - mockMkClient, - 100, - ); - - expect(validators).toHaveLength(2); - expect(validators.every((v) => typeof v === "string")).toBe(true); - }); - - it("throws error when no validators found", async () => { - const mockTmClient = { - GetValidatorSetByHeight: vi.fn((req, callback) => { - callback(null, { - validators: [], - }); - }), - }; - const mockMkClient = {}; - - await expect( - getValidatorSet(mockTmClient, mockMkClient, 100), - ).rejects.toThrow("No validators found"); - }); - - it("continues when individual validator key retrieval fails", async () => { - const mockPubkey = PublicKey.fromBase58( - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ); - const mockTmClient = { - GetValidatorSetByHeight: vi.fn((req, callback) => { - callback(null, { - validators: [ - { address: "cosmos1addr1" }, - { address: "cosmos1addr2" }, - ], - }); - }), - }; - const mockMkClient = { - KeyStore: vi.fn((req, callback) => { - if (req.index === "cosmos1addr1") { - callback(new Error("Key not found"), null); - } else { - callback(null, { - keyStore: { - minaPublicKey: "encoded_address", - }, - }); - } - }), - GetMinaPubkey: vi.fn((req, callback) => { - callback(null, { - x: mockPubkey.toFields()[0].toString(), - is_odd: - mockPubkey.toFields()[1].toString() === "1" - ? "true" - : "false", - }); - }), - }; - - const validators = await getValidatorSet( - mockTmClient, - mockMkClient, - 100, - ); - - expect(validators.length).toBeGreaterThanOrEqual(0); - }); - }); - - describe("getVoteExt", () => { - it("retrieves vote extensions with pagination", async () => { - const mockPubkey = PublicKey.fromBase58( - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ); - let callCount = 0; - const mockMkClient = { - VoteExtByHeight: vi.fn((req, callback) => { - callCount++; - if (callCount === 1) { - callback(null, { - voteExt: [ - { - index: "0", - height: "100", - validatorAddr: "encoded1", - signature: "0".repeat(128), - }, - ], - pagination: { next_key: Buffer.from("next") }, - }); - } else { - callback(null, { - voteExt: [ - { - index: "1", - height: "100", - validatorAddr: "encoded2", - signature: "1".repeat(128), - }, - ], - pagination: { next_key: null }, - }); - } - }), - GetMinaPubkey: vi.fn((req, callback) => { - callback(null, { - x: mockPubkey.toFields()[0].toString(), - is_odd: - mockPubkey.toFields()[1].toString() === "1" - ? "true" - : "false", - }); - }), - }; - - const voteExt = await getVoteExt(mockMkClient, 100); - - expect(voteExt.length).toBeGreaterThanOrEqual(1); - expect(mockMkClient.VoteExtByHeight).toHaveBeenCalledTimes(2); - }); - - it("handles empty vote extensions", async () => { - const mockMkClient = { - VoteExtByHeight: vi.fn((req, callback) => { - callback(null, { - voteExt: [], - pagination: { next_key: null }, - }); - }), - }; - - const voteExt = await getVoteExt(mockMkClient, 100); - - expect(voteExt).toEqual([]); - }); - - it("rejects on gRPC error", async () => { - const mockMkClient = { - VoteExtByHeight: vi.fn((req, callback) => { - callback(new Error("gRPC error"), null); - }), - }; - - await expect(getVoteExt(mockMkClient, 100)).rejects.toThrow( - "gRPC error", - ); - }); - }); - - describe("getBlockData", () => { - it("retrieves complete block data", async () => { - const mockPubkey = PublicKey.fromBase58( - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ); - const mockTmClient = { - GetBlockByHeight: vi.fn((req, callback) => { - callback(null, { - block: { - header: { - height: "100", - app_hash: - "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", - }, - data: { txs: [] }, - last_commit: { signatures: [] }, - }, - }); - }), - GetValidatorSetByHeight: vi.fn((req, callback) => { - callback(null, { - validators: [{ address: "cosmos1addr" }], - }); - }), - }; - const mockMkClient = { - KeyStore: vi.fn((req, callback) => { - callback(null, { - keyStore: { - minaPublicKey: "encoded_address", - }, - }); - }), - GetMinaPubkey: vi.fn((req, callback) => { - callback(null, { - x: mockPubkey.toFields()[0].toString(), - is_odd: - mockPubkey.toFields()[1].toString() === "1" - ? "true" - : "false", - }); - }), - VoteExtByHeight: vi.fn((req, callback) => { - callback(null, { - voteExt: [], - pagination: { next_key: null }, - }); - }), - }; - - const blockData = await getBlockData( - mockTmClient, - mockMkClient, - 100, - ); - - expect(blockData.height).toBe(100); - expect(blockData.stateRoot).toBeDefined(); - expect(Array.isArray(blockData.validators)).toBe(true); - expect(Array.isArray(blockData.voteExt)).toBe(true); - }); - }); - - describe("storePulsarBlock", () => { - beforeEach(() => { - vi.clearAllMocks(); - }); - - it("stores block with validator list hash", async () => { - const blockData = { - height: 100, - stateRoot: "0x123", - validators: [ - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ], - voteExt: [], - }; - - vi.mocked(db.storeBlock).mockResolvedValue(undefined); - - await storePulsarBlock(blockData); - - expect(db.storeBlock).toHaveBeenCalledWith( - expect.objectContaining({ - height: 100, - stateRoot: "0x123", - validators: blockData.validators, - validatorListHash: expect.any(String), - }), - ); - }); - - it("computes validator list hash correctly", async () => { - const validators = [ - "B62qmiWoAewYZuz7tUL1yV8r718dyLhp7Ck83ckuPAhPioERpTTMNNb", - ]; - const blockData = { - height: 100, - stateRoot: "0x123", - validators, - voteExt: [], - }; - - vi.mocked(db.storeBlock).mockResolvedValue(undefined); - - await storePulsarBlock(blockData); - - const callArgs = vi.mocked(db.storeBlock).mock.calls[0][0]; - const expectedHash = computeValidatorListHash(validators); - expect(callArgs.validatorListHash).toBe(expectedHash); - }); - }); -}); diff --git a/prover_v2/src/modules/pulsar/utils.ts b/prover_v2/src/modules/pulsar/utils.ts index 1c480e0c..bbe7fa2d 100644 --- a/prover_v2/src/modules/pulsar/utils.ts +++ b/prover_v2/src/modules/pulsar/utils.ts @@ -4,7 +4,10 @@ import { List } from "pulsar-contracts"; import { Poseidon, PublicKey, Signature } from "o1js"; import logger from "../../logger.js"; -import { storeBlock } from "../db/index.js"; +import { storeBlock, storeBlockInBlockEpoch } from "../db/index.js"; +import { BLOCK_EPOCH_SIZE } from "../utils/constants.js"; +import { blockProverQ } from "../processors/utils/queue.js"; +import { DEFAULT_JOB_OPTIONS, blockProverJobId } from "../processors/utils/jobOptions.js"; import { BlockParserResult, BlockData, @@ -402,12 +405,39 @@ async function storePulsarBlock(blockData: BlockData) { const validatorListHash = computeValidatorListHash(validators); - await storeBlock({ + const blockDoc = await storeBlock({ ...rest, validators, validatorListHash, }); + // Store block into its epoch + const index = blockData.height % BLOCK_EPOCH_SIZE; + const epoch = await storeBlockInBlockEpoch( + blockData.height, + blockDoc._id, + index, + ); + + // If epoch is full, trigger block-prover + const isEpochFull = epoch.blocks.every((b) => b !== null); + if (isEpochFull) { + const epochHeight = + Math.floor(blockData.height / BLOCK_EPOCH_SIZE) * BLOCK_EPOCH_SIZE; + await blockProverQ.add( + "block-prover", + { height: epochHeight }, + { + jobId: blockProverJobId(epochHeight), + ...DEFAULT_JOB_OPTIONS, + }, + ); + logger.info("Epoch full, block-prover job enqueued", { + epochHeight, + event: "block_prover_triggered", + }); + } + logger.info("Stored Pulsar block", { blockHeight: blockData.height, validatorsCount: validators.length, diff --git a/prover_v2/src/modules/utils/constants.ts b/prover_v2/src/modules/utils/constants.ts index 24825558..bb2488ef 100644 --- a/prover_v2/src/modules/utils/constants.ts +++ b/prover_v2/src/modules/utils/constants.ts @@ -1,5 +1,4 @@ // Processors constants -export const MASTER_SLEEP_INTERVAL_MS = 1000; // 1 second export const WORKER_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes export const STALLED_INTERVAL_MS = 5000; // 5 seconds export const BLOCK_EPOCH_SIZE = 8; @@ -14,7 +13,6 @@ export const TENDERMINT_SERVICE_NAME = "cosmos.base.tendermint.v1beta1.Service"; export const MINA_KEYS_SERVICE_NAME = "interchain_security.minakeys.Query"; // Monitor constants -export const MAX_FAIL_COUNT = 3; export const MONITOR_INTERVAL_MS = 30_000; // 30 seconds // Cleanup constants