-
Notifications
You must be signed in to change notification settings - Fork 83
Description
Context
The Midaz transaction layer processes RabbitMQ messages for async transaction persistence. Each worker consumes messages from the queue and persists transactions and their operations to PostgreSQL. The system uses Squirrel query builder for database operations and supports configurable workers and prefetch settings.
Problem/Motivation
The current implementation processes messages one transaction at a time, with each transaction's operations inserted individually in a loop. This creates significant database round-trip overhead that limits throughput.
Example: A bulk of 50 transactions with 5 operations each requires:
- 50 transaction INSERTs
- 250 operation INSERTs
- 300 total database round-trips
- 50 individual RabbitMQ ACKs
Current Behavior
| Behavior | Impact |
|---|---|
| 1 message processed per worker iteration | Limited throughput |
| 1 INSERT per transaction | N database round-trips |
| 1 INSERT per operation (loop) | N × M round-trips for M operations |
| Individual ACK per message | RabbitMQ overhead |
Goal
Implement bulk insert at the transaction layer by collecting multiple messages at the consumer level and performing bulk database inserts:
| Target Behavior | Benefit |
|---|---|
| N messages collected before processing | Higher throughput |
| 1 multi-row INSERT for N transactions | Reduced round-trips |
| 1 multi-row INSERT for all operations | Reduced round-trips |
| 1 bulk ACK for N messages | Reduced RabbitMQ overhead |
Key Design Decisions:
- Bulk mode activates only when
RABBITMQ_TRANSACTION_ASYNC=true - Bulk size defaults to
workers × prefetch(harmony constraint) - Uses Squirrel query builder with
ON CONFLICT (id) DO NOTHINGfor idempotency - Fallback to individual inserts on bulk failure
- Metrics track
attempted,inserted,ignoredcounts
Configuration (Environment Variables):
| Variable | Type | Default | Description |
|---|---|---|---|
BULK_RECORDER_ENABLED |
bool | true |
Enable bulk mode |
BULK_RECORDER_SIZE |
int | (derived) | Messages per bulk; defaults to workers × prefetch |
BULK_RECORDER_FLUSH_TIMEOUT_MS |
int | 100 |
Max wait before flushing incomplete bulk |
BULK_RECORDER_MAX_ROWS_PER_INSERT |
int | 1000 |
Max rows per INSERT statement |
BULK_RECORDER_FALLBACK_ENABLED |
bool | true |
Fall back to individual inserts on failure |
Success Metrics
| Metric | Baseline | Target | Measurement |
|---|---|---|---|
| DB round-trips per 50 txns | 300 | 2-5 | Query count |
| Processing time per 50 txns | ~500ms | ~50ms | bulk_recorder_bulk_duration_seconds |
| Throughput (txns/sec) | ~100 | ~1,000+ | bulk_recorder_transactions_inserted_total rate |
| Duplicate detection | N/A | 100% | ignored / attempted when duplicates exist |
Expected Gain: 10x+ performance improvement.