Skip to content

Fluxora-Org/Fluxora-Backend

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

453 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Contract Event Indexer - Batch Replay Implementation

High-performance contract event indexer with optimized batch processing and PostgreSQL indexing for efficient historical event replay.

πŸš€ Features

  • Batch Insert Processing: 50x faster than single-row inserts
  • Optimized Database Indexes: Composite and partial indexes for replay queries
  • Real-time Progress Tracking: Monitor replay status with estimated completion times
  • Transaction Safety: Full ACID compliance with automatic rollback
  • Security Hardened: Parameterized queries, input validation, concurrent operation prevention
  • Comprehensive Testing: 80%+ code coverage with edge case handling

πŸ“‹ Requirements

  • Node.js 18+
  • PostgreSQL 12+
  • pnpm (or npm/yarn)

πŸ› οΈ Installation

# Install dependencies
pnpm install

# Copy environment configuration
cp .env.example .env

# Edit .env with your database credentials
# DATABASE_URL=postgresql://user:password@localhost:5432/indexer_db

πŸ—„οΈ Database Setup

# Run migrations to create tables and indexes
pnpm run migrate

This executes tsx src/db/migrate.ts, which uses node-pg-migrate as the single migration runner. All files in migrations/ that begin with a numeric timestamp prefix are applied in ascending numeric order and recorded in the pgmigrations table.

Migration file naming convention

<timestamp>_<description>.ts
Range Purpose
1000000000000 – 1000000000999 Bootstrapping tables converted from the legacy PoolClient runner (000_*, 001_*, 002_*)
1774715131962 + Streams, audit, webhook-outbox, DLQ, and PII tables
20260601000000 + Calendar-style additions (pgcrypto, pagination indexes, …)

Files without a leading digit (e.g. run.ts) are ignored by the scanner and will never appear in the pgmigrations ledger.

Adding a new migration

# Pick the current Unix-ms timestamp as the prefix
date +%s%3N   # e.g. 1750000000000

# Create the file
touch migrations/1750000000000_your_description.ts

Implement the up(pgm: MigrationBuilder) and down(pgm: MigrationBuilder) functions using the node-pg-migrate API. The run.ts file in migrations/ is a tombstone that exits with an error β€” do not use it directly.

This creates:

  • historical_events table (source data)
  • contract_events table (replay destination)
  • Optimized indexes for replay queries
  • Composite stream pagination indexes (idx_streams_status_id, idx_streams_sender_id, idx_streams_contract_id, idx_streams_status_created_at_desc) β€” see docs/STREAMS.md

πŸƒ Running the Service

# Development mode
pnpm run dev

# Production build
pnpm run build
node dist/src/index.js

The service will start on port 3000 (configurable via PORT environment variable).

πŸ“‘ API Usage

Start a Replay

curl -X POST http://localhost:3000/internal/indexer/events/replay \
  -H "Content-Type: application/json" \
  -d '{
    "contract_id": "contract-abc-123",
    "ledger": 1,
    "from_block": 1000,
    "to_block": 2000
  }'

Response:

{
  "message": "Replay started",
  "status": {
    "isReplaying": true,
    "rowsReplayed": 0,
    "rowsRemaining": 1500,
    "totalRows": 1500,
    "estimatedCompletion": "2026-05-28T15:30:00.000Z",
    "startedAt": "2026-05-28T15:00:00.000Z"
  }
}

Check Replay Status

curl http://localhost:3000/internal/indexer/status

Response:

{
  "isReplaying": true,
  "rowsReplayed": 750,
  "rowsRemaining": 750,
  "totalRows": 1500,
  "estimatedCompletion": "2026-05-28T15:30:00.000Z",
  "startedAt": "2026-05-28T15:00:00.000Z",
  "contractId": "contract-abc-123",
  "ledger": 1
}

πŸ§ͺ Testing

# Run all tests
pnpm test

# Run with coverage report
pnpm test:coverage

# Run specific test file
pnpm test tests/indexer/service.replay.test.ts

Test Coverage

The test suite includes:

  • βœ… Input validation (invalid parameters)
  • βœ… Empty replay sets
  • βœ… Batch processing with various sizes
  • βœ… Batch boundary alignment
  • βœ… Duplicate event handling (ON CONFLICT)
  • βœ… Concurrent replay prevention
  • βœ… Transaction rollback on errors
  • βœ… Progress tracking and estimation
  • βœ… Block range filtering
  • βœ… SQL injection prevention

βš™οΈ Configuration

Environment Variables

Variable Default Description
DATABASE_URL - PostgreSQL connection string (required)
REPLAY_BATCH_SIZE 1000 Number of events per batch insert
PORT 3000 HTTP server port

Batch Size Tuning

  • Small (100-500): Lower memory, more round-trips
  • Medium (1000-2000): Balanced performance ⭐ recommended
  • Large (5000+): Faster bulk operations, higher memory

πŸ”’ Security

Implemented Protections

  1. SQL Injection Prevention: All queries use parameterized statements
  2. Input Validation: Strict validation of all request parameters
  3. Concurrent Operation Prevention: Only one replay at a time
  4. Transaction Safety: Automatic rollback on errors
  5. Webhook Delivery Logging: Outbound webhook dispatch logs use the shared structured logger and include only stable identifiers (deliveryId, eventType, attemptNumber) plus statusCode when available. Webhook secrets, raw payloads, signatures, and endpoint URLs are excluded from log metadata.

Webhook Delivery Logging

The class-based WebhookDispatcher imports the shared structured logger from src/lib/logger.ts and uses the same (message, correlationId?, meta?) signature as other services. Dispatch outcomes log only safe delivery metadata:

  • Success: deliveryId, eventType, attemptNumber, statusCode
  • Retryable HTTP failure: deliveryId, eventType, attemptNumber, statusCode
  • Permanent HTTP failure: deliveryId, eventType, attemptNumber, statusCode
  • Network failure: deliveryId, eventType, attemptNumber

Target URLs, webhook secrets, signatures, and raw payloads are intentionally omitted from log records to avoid leaking credentials or signed delivery content.

Production Recommendations

⚠️ The /internal/indexer/* endpoints are not authenticated by default.

Before deploying to production:

// Add authentication middleware
import { authenticate } from './middleware/auth';

app.use('/internal', authenticate);
app.use('/internal/indexer', indexerRouter);

Additional recommendations:

  • Implement IP whitelisting
  • Add rate limiting
  • Use API keys or JWT tokens
  • Enable HTTPS/TLS

πŸ“Š Performance

Batch Insert Performance

With REPLAY_BATCH_SIZE=1000:

  • Single inserts: ~100-200 events/second
  • Batch inserts: ~5,000-10,000 events/second

50x improvement in throughput.

Index Impact

For a table with 10M events:

  • Unindexed query: ~30-60 seconds
  • Indexed query: ~10-50 milliseconds

πŸ“š Documentation

See docs/indexer.md for comprehensive documentation including:

  • Detailed API reference
  • Database schema and indexes
  • Security considerations
  • Troubleshooting guide
  • Monitoring recommendations

πŸ—οΈ Architecture

src/
β”œβ”€β”€ config/          # Configuration management
β”œβ”€β”€ db/              # Database client and connection pooling
β”œβ”€β”€ indexer/         # Core replay service logic
β”œβ”€β”€ routes/          # Express route handlers
└── types/           # TypeScript type definitions

migrations/
β”œβ”€β”€ 000_initial_schema.ts              # Create tables
└── 001_add_contract_events_replay_indexes.ts  # Add indexes

tests/
└── indexer/
    └── service.replay.test.ts         # Comprehensive test suite

docs/
└── indexer.md                         # Full documentation

πŸ”„ Development Workflow

Suggested Execution

  1. Fork and branch:

    git checkout -b feature/indexer-replay-batching
  2. Implement changes: βœ… Complete

    • βœ… Batch insert logic in src/indexer/service.ts
    • βœ… Index migration in migrations/001_add_contract_events_replay_indexes.ts
    • βœ… Progress API in src/routes/indexer.ts
    • βœ… Comprehensive tests in tests/indexer/service.replay.test.ts
    • βœ… Documentation in docs/indexer.md
  3. Test:

    pnpm test:coverage
  4. Commit:

    git add .
    git commit -m "perf: batch contract-event replay inserts and add targeted DB indexes"

πŸ› Troubleshooting

Replay Times Out

  • Reduce REPLAY_BATCH_SIZE
  • Run during off-peak hours
  • Add database resources

High Memory Usage

  • Reduce REPLAY_BATCH_SIZE
  • Increase application heap size

Concurrent Replay Error

  • Check status: GET /internal/indexer/status
  • Wait for current replay to complete

See docs/indexer.md for detailed troubleshooting.

Webhook resilience

Outbound webhook retries use two Redis-backed per-consumer controls:

  • Rate limiting (src/redis/webhookRateLimit.ts): sliding-window cap via WEBHOOK_RETRY_RPS (default 10/s).
  • Circuit breaker (src/redis/webhookCircuitBreakerStore.ts): shared closed β†’ open β†’ half-open state keyed by SHA-256 hash of the consumer URL. After circuitBreakerThreshold consecutive failures, deliveries are deferred until circuitBreakerResetMs, then a single cross-instance probe is allowed.

attemptWebhookDeliveryWithRateLimit in src/webhooks/retry.ts applies both gates before each delivery. State transitions emit fluxora_webhook_circuit_breaker_transitions_total. See docs/webhooks.md for details.

Webhook Causal Ordering Guarantee

To ensure per-stream consumers always observe events in chain order, Fluxora guarantees a deterministic tiebreaker for webhook outbox deliveries. Items sharing the exact same scheduledFor timestamp are processed sequentially by applying a secondary sort on their ledger sequence (extracted from the JSON payload) and a tertiary sort on the eventId. This causal ordering is maintained securely across memory queues (WebhookDeliveryStore) and persistent database reads (PostgreSQL webhook_outbox query path), even when large sets of identical timestamps occur.

Indexer Stall State and Recovery

The service health checks monitor indexer freshness to ensure downstream clients do not receive dangerously stale chain state. If the sync lag exceeds the configured threshold (DEFAULT_INDEXER_STALL_THRESHOLD_MS), the system latches an isStalled flag and degrades /health/ready. Even if the indexer catches back up, the stall flag remains latched to alert operators of the transient instability. Operators can acknowledge and reset this flag via an admin endpoint:

curl -X POST http://localhost:3000/api/admin/indexer/stall/clear \
  -H "Authorization: Bearer <ADMIN_API_KEY>"

The endpoint returns 409 Conflict if the indexer is still actively lagging behind the freshness threshold. Successful resets emit an INDEXER_STALL_CLEARED audit log.

πŸ“ License

MIT

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes with tests
  4. Ensure tests pass: pnpm test
  5. Submit a pull request

Note: This implementation prioritizes performance, security, and maintainability. All code includes comprehensive comments and follows TypeScript best practices.

About

No description, website, or topics provided.

Resources

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages