feat: implement bulk insert repository methods for transactions and operations - bulk recorder #1928
feat: implement bulk insert repository methods for transactions and operations - bulk recorder #1928
Conversation
…ions with associated tests ✨
…mock database implementation 🧪
… in operations and transactions 🧪
|
This PR is very large (8 files, 1732 lines changed). Consider breaking it into smaller PRs for easier review. |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (4)
WalkthroughThis pull request introduces bulk insertion capabilities for transactions and operations in the PostgreSQL adapter layer. New Sequence DiagramsequenceDiagram
participant Caller
participant Repo as Repository<br/>(CreateBulk)
participant Internal as createBulkInternal
participant Chunk as insertChunk
participant DB as DBExecutor<br/>(ExecContext)
Caller->>Repo: CreateBulk(ctx, operations)
rect rgba(100, 150, 200, 0.5)
Note over Repo: Input Validation & Preparation
Repo->>Internal: Short-circuit on empty input?
alt Empty/Nil Input
Repo-->>Caller: BulkInsertResult{0, 0, 0}, nil
else Non-Empty
Repo->>Internal: createBulkInternal(ctx, db, operations)
end
end
rect rgba(150, 100, 200, 0.5)
Note over Internal: Sort & Chunk Processing
Internal->>Internal: Validate no nil elements
Internal->>Internal: Sort by ID in-place
Internal->>Internal: Set Attempted = len(slice)
loop For each chunk (max 1000 rows)
Internal->>Chunk: insertOperationChunk(ctx, db, chunk)
Chunk->>DB: ExecContext(INSERT ... ON CONFLICT)
DB-->>Chunk: RowsAffected
Chunk-->>Internal: inserted count
Internal->>Internal: Accumulate inserted
alt Chunk Error
Internal->>Internal: Break loop
else Context Canceled
Internal->>Internal: Check ctx.Done()
alt Canceled
Internal->>Internal: Break loop, save error
end
end
end
end
rect rgba(200, 150, 100, 0.5)
Note over Internal: Result Calculation
Internal->>Internal: Ignored = Attempted - Inserted
Internal-->>Repo: BulkInsertResult, error (if any)
end
Repo-->>Caller: BulkInsertResult{Attempted,<br/>Inserted, Ignored}, error
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 📝 Coding Plan
Comment |
🔒 Security Scan Results —
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@components/transaction/internal/adapters/postgres/operation/operation_createbulk_test.go`:
- Around line 62-63: Remove the redundant unused-var suppression by deleting the
blank identifier assignment "var _ = generateTestOperations" in the test file;
since generateTestOperations is already invoked in tests (calls around the
existing tests), simply remove that line to clean up the code and avoid an
unnecessary sentinel reference to the function.
In
`@components/transaction/internal/adapters/postgres/operation/operation.postgresql.go`:
- Around line 340-414: CreateBulk and CreateBulkTx duplicate most logic; extract
the shared validation, sorting, chunking, context-check and chunk-insert loop
into a helper like createBulkInternal/createBulkWithExecutor that accepts a
repository.DBExecutor (used by CreateBulk via r.getDB(ctx) and by CreateBulkTx
via the tx param) and a span name; move the common code from CreateBulk and
CreateBulkTx into that helper and have both methods call it (preserve existing
behavior: return partial results on error, set Attempted/Inserted/Ignored, and
keep span/tracing calls using the provided span name).
In
`@components/transaction/internal/adapters/postgres/transaction/transaction_createbulk_test.go`:
- Around line 53-54: Remove the unnecessary unused-variable suppression: delete
the line that assigns generateTestTransactions to the blank identifier (var _ =
generateTestTransactions). The helper function generateTestTransactions is
already referenced by tests (e.g., in the tests around lines where it's called),
so simply removing that suppression cleans up the file without affecting test
usage.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: cdfaf0aa-7375-4617-bb25-eabb0a6c51a2
📒 Files selected for processing (8)
components/transaction/internal/adapters/postgres/operation/operation.postgresql.gocomponents/transaction/internal/adapters/postgres/operation/operation.postgresql_mock.gocomponents/transaction/internal/adapters/postgres/operation/operation_createbulk_test.gocomponents/transaction/internal/adapters/postgres/transaction/transaction.postgresql.gocomponents/transaction/internal/adapters/postgres/transaction/transaction.postgresql_mock.gocomponents/transaction/internal/adapters/postgres/transaction/transaction_createbulk_test.gopkg/repository/bulk.gopkg/utils/metrics.go
components/transaction/internal/adapters/postgres/operation/operation_createbulk_test.go
Outdated
Show resolved
Hide resolved
components/transaction/internal/adapters/postgres/operation/operation.postgresql.go
Show resolved
Hide resolved
components/transaction/internal/adapters/postgres/transaction/transaction_createbulk_test.go
Outdated
Show resolved
Hide resolved
📊 Unit Test Coverage Report:
|
| Metric | Value |
|---|---|
| Overall Coverage | 85.4% ✅ PASS |
| Threshold | 85% |
Coverage by Package
| Package | Coverage |
|---|---|
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/grpc/in |
100.0% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/http/in |
78.5% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/mongodb |
66.7% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/assetrate |
100.0% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/balance |
100.0% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/operation |
90.0% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/operationroute |
100.0% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/transaction |
99.1% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/transactionroute |
100.0% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/rabbitmq |
93.1% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/redis/balance |
100.0% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/services/command |
90.4% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/services/query |
95.2% |
github.com/LerianStudio/midaz/v3/components/transaction/internal/services |
100.0% |
Generated by Go PR Analysis workflow
…ting shared logic into createBulkInternal function and improve input validation 🔨
…erations and transactions 🔨
|
This PR is very large (8 files, 1682 lines changed). Consider breaking it into smaller PRs for easier review. |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go (1)
217-247: 🧹 Nitpick | 🔵 TrivialSame redundant nil-element validation as in operation repository.
The nil-element validation at lines 228-236 duplicates the check in
createBulkInternal(lines 287-295). Consider the same cleanup suggested for the operation repository to maintain consistency across both implementations.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go` around lines 217 - 247, The nil-element validation in TransactionPostgreSQLRepository.CreateBulk duplicates the check already present in createBulkInternal; remove the loop that checks for nil transactions from CreateBulk (but keep the empty-slice early return) and rely on createBulkInternal to validate individual entries, so only createBulkInternal performs the per-item nil checks; update any error handling/logging in createBulkInternal if needed to preserve span/log context when called from CreateBulk.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@components/transaction/internal/adapters/postgres/operation/operation.postgresql.go`:
- Around line 267-296: Remove the redundant nil-element loop in CreateBulk and
rely on the existing validation in createBulkInternal; specifically, delete the
for i, op := range operations { if op == nil { ... } } block in CreateBulk so
CreateBulk only does the empty-slice early return and DB acquisition (getDB)
before delegating to createBulkInternal
(postgres.create_bulk_operations_internal), or alternatively factor the
nil-check into a shared helper and call it from both CreateBulk and
createBulkInternal if you want an explicit fast-fail before getDB.
---
Duplicate comments:
In
`@components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go`:
- Around line 217-247: The nil-element validation in
TransactionPostgreSQLRepository.CreateBulk duplicates the check already present
in createBulkInternal; remove the loop that checks for nil transactions from
CreateBulk (but keep the empty-slice early return) and rely on
createBulkInternal to validate individual entries, so only createBulkInternal
performs the per-item nil checks; update any error handling/logging in
createBulkInternal if needed to preserve span/log context when called from
CreateBulk.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f43c504b-9fc4-47b5-bec6-4a6256d17ffb
📒 Files selected for processing (5)
components/transaction/internal/adapters/postgres/operation/operation.postgresql.gocomponents/transaction/internal/adapters/postgres/operation/operation_createbulk_test.gocomponents/transaction/internal/adapters/postgres/transaction/transaction.postgresql.gocomponents/transaction/internal/adapters/postgres/transaction/transaction_createbulk_test.gopkg/repository/bulk.go
components/transaction/internal/adapters/postgres/operation/operation.postgresql.go
Show resolved
Hide resolved
|
This PR is very large (8 files, 1694 lines changed). Consider breaking it into smaller PRs for easier review. |
gandalf-at-lerian
left a comment
There was a problem hiding this comment.
Solid implementation. The design decisions are sound and the test coverage is thorough. A few observations worth discussing:
What's well done
DBExecutorinterface is the right abstraction — clean separation between direct DB and tx-controlled paths- Deadlock prevention via ID sorting is correct and the comment explains why
- Chunking logic correctly accounts for PostgreSQL's 65,535 parameter limit per entity (15 cols × 1000 = 15K, 30 cols × 1000 = 30K)
ON CONFLICT (id) DO NOTHINGmakes retries safe without extra coordination- The
BulkInsertResultinvariant (Attempted = Inserted + Ignored) is well-modeled and tested - Partial result on chunk failure is the right call — callers get what was committed + the error
- Test for context cancellation before first chunk is a good edge case
One concern: partial commit semantics
The docstring acknowledges it explicitly, but worth highlighting: CreateBulk (non-Tx) commits chunks independently. If chunk 1 succeeds and chunk 2 fails, those first 1,000 rows are durable with no rollback path. This is safe only because of the idempotency guarantee — a retry will re-insert the same rows and DO NOTHING handles them correctly.
Make sure the caller layer (bulk recorder) is aware of this and implements retry logic. If it ACKs the RabbitMQ batch before calling CreateBulk, a partial failure leaves un-ACK'd messages that will be redelivered — which is actually the desired behavior here. Just confirm the ACK strategy is explicit.
Minor: sort.Slice vs slices.SortFunc
Go 1.21+ ships slices.SortFunc which avoids the closure allocation. Since midaz is already on a recent Go version, this is a low-priority cleanup but worth a note:
slices.SortFunc(operations, func(a, b *Operation) int {
return strings.Compare(a.ID, b.ID)
})The createBulkInternal duplication
Transaction and Operation implementations are structurally identical. The only differences are the entity type and insertXxxChunk call. This is expected in Go given the lack of generics-friendly repository patterns here — acceptable for Phase 1. If a Phase 2 refactor is planned, a generic bulkInsertChunked[T] helper could centralize the chunking/sorting/telemetry loop.
CI: all green. ✅
Approved. Phase 1 foundation looks solid — the Phase 2 (consumer integration) is where the real behavior gets validated.
Summary
Implements bulk insert repository methods (
CreateBulkandCreateBulkTx) for both Transaction and Operation entities as part of the Bulk Recorder feature. This is Phase 1 of the transaction-layer bulk insert implementation, focusing on the repository layer foundation.Motivation
The current transaction layer processes RabbitMQ messages one at a time, with each transaction's operations inserted individually in a loop. For 50 transactions with 5 operations each, this results in 300 database round-trips and 50 individual RabbitMQ ACKs.
With bulk insert at the repository layer:
Expected gain: 10-100x performance improvement depending on bulk size.
Semantic Decision
Feature - Adds new
CreateBulkandCreateBulkTxmethods to Transaction and Operation repositories for multi-row INSERT operations with idempotency support viaON CONFLICT DO NOTHING.Changes
New Files
pkg/repository/bulk.goDBExecutorinterface andBulkInsertResultstruct for tracking attempted/inserted/ignored countscomponents/transaction/internal/adapters/postgres/operation/operation_createbulk_test.goCreateBulkandCreateBulkTxmethods (580+ lines)components/transaction/internal/adapters/postgres/transaction/transaction_createbulk_test.goCreateBulkandCreateBulkTxmethods (530+ lines)Modified Files
pkg/utils/metrics.gooperation.postgresql.goCreateBulk()andCreateBulkTx()methods with chunking, sorting, and idempotencyoperation.postgresql_mock.gotransaction.postgresql.goCreateBulk()andCreateBulkTx()methods with chunking, sorting, and idempotencytransaction.postgresql_mock.goOpenTelemetry Metrics
bulk_recorder_transactions_attempted_totalbulk_recorder_transactions_inserted_totalbulk_recorder_transactions_ignored_totalbulk_recorder_operations_attempted_totalbulk_recorder_operations_inserted_totalbulk_recorder_operations_ignored_totalbulk_recorder_bulk_sizebulk_recorder_bulk_duration_secondsbulk_recorder_fallback_totalKey Implementation Details
Deadlock Prevention:
Idempotency:
ON CONFLICT (id) DO NOTHINGfor all bulk insertsRowsAffected()correctly reports only newly inserted rowsChunking:
Transaction Support:
CreateBulkTx()accepts aDBExecutorinterfaceTest Plan
CreateBulkwith empty slice returns zero countsCreateBulkwith nil elements returns errorCreateBulksingle item successCreateBulkmultiple items successCreateBulkduplicate handling (ON CONFLICT DO NOTHING)CreateBulkchunking behavior (>1000 items)CreateBulkcontext cancellation between chunksCreateBulkdatabase error handlingCreateBulkTxwith external transactionBulkInsertResultcorrect counting (attempted = inserted + ignored)