diff --git a/components/transaction/internal/adapters/postgres/operation/operation.postgresql.go b/components/transaction/internal/adapters/postgres/operation/operation.postgresql.go index 9ec834033..8b706f6d6 100644 --- a/components/transaction/internal/adapters/postgres/operation/operation.postgresql.go +++ b/components/transaction/internal/adapters/postgres/operation/operation.postgresql.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "reflect" + "sort" "strings" "time" @@ -22,6 +23,7 @@ import ( "github.com/LerianStudio/midaz/v3/pkg" "github.com/LerianStudio/midaz/v3/pkg/constant" "github.com/LerianStudio/midaz/v3/pkg/net/http" + "github.com/LerianStudio/midaz/v3/pkg/repository" "github.com/Masterminds/squirrel" "github.com/bxcodec/dbresolver/v2" "github.com/google/uuid" @@ -44,6 +46,8 @@ type OperationFilter struct { type Repository interface { Create(ctx context.Context, operation *Operation) (*Operation, error) + CreateBulk(ctx context.Context, operations []*Operation) (*repository.BulkInsertResult, error) + CreateBulkTx(ctx context.Context, tx repository.DBExecutor, operations []*Operation) (*repository.BulkInsertResult, error) FindAll(ctx context.Context, organizationID, ledgerID, transactionID uuid.UUID, filter http.Pagination) ([]*Operation, libHTTP.CursorPagination, error) FindAllByAccount(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, opFilter OperationFilter, filter http.Pagination) ([]*Operation, libHTTP.CursorPagination, error) Find(ctx context.Context, organizationID, ledgerID, transactionID, id uuid.UUID) (*Operation, error) @@ -250,6 +254,209 @@ func (r *OperationPostgreSQLRepository) Create(ctx context.Context, operation *O return record.ToEntity(), nil } +// CreateBulk inserts multiple operations in bulk using multi-row INSERT with ON CONFLICT DO NOTHING. +// Returns BulkInsertResult with counts of attempted, inserted, and ignored (duplicate) rows. +// Operations are sorted by ID before insert to prevent deadlocks in concurrent scenarios. +// Large bulks are automatically chunked to stay within PostgreSQL's parameter limits. +// +// NOTE: Chunks are committed independently. If chunk N fails, chunks 1 to N-1 remain committed. +// On error, partial results are returned along with the error. Retry is safe due to idempotency. +// On error, only Inserted is reliable; Ignored remains 0 since unprocessed chunks are not duplicates. +// +// NOTE: The input slice is sorted in-place by ID. Callers should not rely on original order after this call. +func (r *OperationPostgreSQLRepository) CreateBulk(ctx context.Context, operations []*Operation) (*repository.BulkInsertResult, error) { + logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) + + ctx, span := tracer.Start(ctx, "postgres.create_bulk_operations") + defer span.End() + + // Early return for empty input before acquiring DB connection + if len(operations) == 0 { + return &repository.BulkInsertResult{}, nil + } + + db, err := r.getDB(ctx) + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to get database connection", err) + logger.Log(ctx, libLog.LevelError, fmt.Sprintf("Failed to get database connection: %v", err)) + + return nil, err + } + + return r.createBulkInternal(ctx, db, operations, "postgres.create_bulk_operations_internal", "") +} + +// CreateBulkTx inserts multiple operations in bulk using a caller-provided transaction. +// This allows the caller to control transaction boundaries for atomic multi-table operations. +// Returns BulkInsertResult with counts of attempted, inserted, and ignored (duplicate) rows. +// Operations are sorted by ID before insert to prevent deadlocks in concurrent scenarios. +// Large bulks are automatically chunked to stay within PostgreSQL's parameter limits. +// +// NOTE: The caller is responsible for calling Commit() or Rollback() on the transaction. +// On error, partial results are returned along with the error. The caller should rollback. +// On error, only Inserted is reliable; Ignored remains 0 since unprocessed chunks are not duplicates. +// +// NOTE: The input slice is sorted in-place by ID. Callers should not rely on original order after this call. +func (r *OperationPostgreSQLRepository) CreateBulkTx(ctx context.Context, tx repository.DBExecutor, operations []*Operation) (*repository.BulkInsertResult, error) { + if tx == nil { + return nil, repository.ErrNilDBExecutor + } + + return r.createBulkInternal(ctx, tx, operations, "postgres.create_bulk_operations_tx", " (tx)") +} + +// createBulkInternal contains the shared logic for CreateBulk and CreateBulkTx. +// It validates input, sorts operations by ID to prevent deadlocks, and inserts in chunks. +// Returns partial results on error with Attempted/Inserted counts. +func (r *OperationPostgreSQLRepository) createBulkInternal( + ctx context.Context, + db repository.DBExecutor, + operations []*Operation, + spanName string, + logSuffix string, +) (*repository.BulkInsertResult, error) { + logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) + + ctx, span := tracer.Start(ctx, spanName) + defer span.End() + + if len(operations) == 0 { + return &repository.BulkInsertResult{}, nil + } + + // Validate no nil elements to prevent panic during sort or insert + for i, op := range operations { + if op == nil { + err := fmt.Errorf("nil operation at index %d", i) + libOpentelemetry.HandleSpanError(span, "Invalid input: nil operation", err) + + return nil, err + } + } + + // Sort by ID (string UUID) to prevent deadlocks in concurrent bulk operations + sort.Slice(operations, func(i, j int) bool { + return operations[i].ID < operations[j].ID + }) + + result := &repository.BulkInsertResult{Attempted: int64(len(operations))} + + // Chunk into bulks of ~1,000 rows to stay within PostgreSQL's parameter limit + // Operation has 30 columns, so 1000 rows = 30,000 parameters (under 65,535 limit) + const chunkSize = 1000 + + for i := 0; i < len(operations); i += chunkSize { + // Check for context cancellation between chunks + select { + case <-ctx.Done(): + libOpentelemetry.HandleSpanError(span, "Context cancelled during bulk insert", ctx.Err()) + logger.Log(ctx, libLog.LevelWarn, fmt.Sprintf("Context cancelled during bulk insert: %v", ctx.Err())) + + // Return partial result; Ignored stays 0 since remaining items were not processed + return result, ctx.Err() + default: + } + + end := min(i+chunkSize, len(operations)) + + chunkInserted, err := r.insertOperationChunk(ctx, db, operations[i:end]) + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to insert operation chunk", err) + logger.Log(ctx, libLog.LevelError, fmt.Sprintf("Failed to insert operation chunk: %v", err)) + + // Return partial result; Ignored stays 0 since remaining items were not processed (not duplicates) + return result, err + } + + result.Inserted += chunkInserted + } + + result.Ignored = result.Attempted - result.Inserted + + logger.Log(ctx, libLog.LevelInfo, fmt.Sprintf("Bulk insert operations%s: attempted=%d, inserted=%d, ignored=%d", + logSuffix, result.Attempted, result.Inserted, result.Ignored)) + + return result, nil +} + +// insertOperationChunk inserts a chunk of operations using multi-row INSERT. +// Uses repository.DBExecutor to work with both dbresolver.DB and dbresolver.Tx. +func (r *OperationPostgreSQLRepository) insertOperationChunk(ctx context.Context, db repository.DBExecutor, operations []*Operation) (int64, error) { + logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) + + ctx, span := tracer.Start(ctx, "postgres.insert_operation_chunk") + defer span.End() + + logger.Log(ctx, libLog.LevelDebug, fmt.Sprintf("Inserting chunk of %d operations", len(operations))) + + builder := squirrel.Insert(r.tableName). + Columns(operationColumnList...). + PlaceholderFormat(squirrel.Dollar) + + for _, op := range operations { + record := &OperationPostgreSQLModel{} + record.FromEntity(op) + + builder = builder.Values( + record.ID, + record.TransactionID, + record.Description, + record.Type, + record.AssetCode, + record.Amount, + record.AvailableBalance, + record.OnHoldBalance, + record.AvailableBalanceAfter, + record.OnHoldBalanceAfter, + record.Status, + record.StatusDescription, + record.AccountID, + record.AccountAlias, + record.BalanceID, + record.ChartOfAccounts, + record.OrganizationID, + record.LedgerID, + record.CreatedAt, + record.UpdatedAt, + record.DeletedAt, + record.Route, + record.BalanceAffected, + record.BalanceKey, + record.VersionBalance, + record.VersionBalanceAfter, + record.Direction, + record.RouteID, + record.RouteCode, + record.RouteDescription, + ) + } + + builder = builder.Suffix("ON CONFLICT (id) DO NOTHING") + + query, args, err := builder.ToSql() + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to build bulk insert query", err) + + return 0, err + } + + execResult, err := db.ExecContext(ctx, query, args...) + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to execute bulk insert", err) + + return 0, err + } + + rowsAffected, err := execResult.RowsAffected() + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to get rows affected", err) + + return 0, err + } + + return rowsAffected, nil +} + // FindAll retrieves Operations entities from the database. func (r *OperationPostgreSQLRepository) FindAll(ctx context.Context, organizationID, ledgerID, transactionID uuid.UUID, filter http.Pagination) ([]*Operation, libHTTP.CursorPagination, error) { logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) diff --git a/components/transaction/internal/adapters/postgres/operation/operation.postgresql_mock.go b/components/transaction/internal/adapters/postgres/operation/operation.postgresql_mock.go index e65bda18b..3b1a555d9 100644 --- a/components/transaction/internal/adapters/postgres/operation/operation.postgresql_mock.go +++ b/components/transaction/internal/adapters/postgres/operation/operation.postgresql_mock.go @@ -3,7 +3,7 @@ // // Generated by this command: // -// mockgen --destination=operation.postgresql_mock.go --package=operation . Repository +// mockgen --destination=components/transaction/internal/adapters/postgres/operation/operation.postgresql_mock.go --package=operation github.com/LerianStudio/midaz/v3/components/transaction/internal/adapters/postgres/operation Repository // // Package operation is a generated GoMock package. @@ -16,6 +16,7 @@ import ( http "github.com/LerianStudio/lib-commons/v4/commons/net/http" http0 "github.com/LerianStudio/midaz/v3/pkg/net/http" + repository "github.com/LerianStudio/midaz/v3/pkg/repository" uuid "github.com/google/uuid" gomock "go.uber.org/mock/gomock" ) @@ -59,6 +60,36 @@ func (mr *MockRepositoryMockRecorder) Create(ctx, operation any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockRepository)(nil).Create), ctx, operation) } +// CreateBulk mocks base method. +func (m *MockRepository) CreateBulk(ctx context.Context, operations []*Operation) (*repository.BulkInsertResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateBulk", ctx, operations) + ret0, _ := ret[0].(*repository.BulkInsertResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateBulk indicates an expected call of CreateBulk. +func (mr *MockRepositoryMockRecorder) CreateBulk(ctx, operations any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateBulk", reflect.TypeOf((*MockRepository)(nil).CreateBulk), ctx, operations) +} + +// CreateBulkTx mocks base method. +func (m *MockRepository) CreateBulkTx(ctx context.Context, tx repository.DBExecutor, operations []*Operation) (*repository.BulkInsertResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateBulkTx", ctx, tx, operations) + ret0, _ := ret[0].(*repository.BulkInsertResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateBulkTx indicates an expected call of CreateBulkTx. +func (mr *MockRepositoryMockRecorder) CreateBulkTx(ctx, tx, operations any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateBulkTx", reflect.TypeOf((*MockRepository)(nil).CreateBulkTx), ctx, tx, operations) +} + // Delete mocks base method. func (m *MockRepository) Delete(ctx context.Context, organizationID, ledgerID, id uuid.UUID) error { m.ctrl.T.Helper() diff --git a/components/transaction/internal/adapters/postgres/operation/operation_createbulk_test.go b/components/transaction/internal/adapters/postgres/operation/operation_createbulk_test.go new file mode 100644 index 000000000..8406ba966 --- /dev/null +++ b/components/transaction/internal/adapters/postgres/operation/operation_createbulk_test.go @@ -0,0 +1,597 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package operation + +import ( + "context" + "database/sql" + "database/sql/driver" + "errors" + "testing" + "time" + + tmcore "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/core" + "github.com/LerianStudio/midaz/v3/pkg/repository" + "github.com/bxcodec/dbresolver/v2" + "github.com/google/uuid" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// generateTestOperation creates a test operation with the given ID or generates a new UUID. +func generateTestOperation(id string) *Operation { + if id == "" { + id = uuid.NewString() + } + + now := time.Now().UTC() + amount := decimal.NewFromInt(100) + + return &Operation{ + ID: id, + TransactionID: uuid.NewString(), + Description: "Test operation " + id[:8], + Type: "DEBIT", + AssetCode: "USD", + Amount: Amount{Value: &amount}, + Status: Status{Code: "ACTIVE"}, + AccountID: uuid.NewString(), + AccountAlias: "@test", + BalanceID: uuid.NewString(), + ChartOfAccounts: "1000", + OrganizationID: uuid.NewString(), + LedgerID: uuid.NewString(), + CreatedAt: now, + UpdatedAt: now, + } +} + +// generateTestOperations creates n test operations with unique UUIDs. +func generateTestOperations(n int) []*Operation { + operations := make([]*Operation, n) + for i := range n { + operations[i] = generateTestOperation("") + } + + return operations +} + +func TestOperationCreateBulk_EmptyInput(t *testing.T) { + t.Parallel() + + repo := &OperationPostgreSQLRepository{ + connection: nil, // Will return empty result before DB call + tableName: "operation", + } + + result, err := repo.CreateBulk(context.Background(), []*Operation{}) + + require.NoError(t, err, "empty input should not error") + assert.Equal(t, int64(0), result.Attempted) + assert.Equal(t, int64(0), result.Inserted) + assert.Equal(t, int64(0), result.Ignored) +} + +func TestOperationCreateBulk_NilInput(t *testing.T) { + t.Parallel() + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + } + + result, err := repo.CreateBulk(context.Background(), nil) + + require.NoError(t, err, "nil input should be treated as empty") + assert.Equal(t, int64(0), result.Attempted) + assert.Equal(t, int64(0), result.Inserted) + assert.Equal(t, int64(0), result.Ignored) +} + +func TestOperationCreateBulk_NilElementInSlice(t *testing.T) { + t.Parallel() + + mockDB := &mockOperationDB{} + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + operations := []*Operation{ + generateTestOperation(""), + nil, // nil element + generateTestOperation(""), + } + + result, err := repo.CreateBulk(ctx, operations) + + require.Error(t, err, "should error on nil element") + assert.Nil(t, result) + assert.Contains(t, err.Error(), "nil operation at index 1") +} + +func TestOperationCreateBulk_NilElementAtStart(t *testing.T) { + t.Parallel() + + mockDB := &mockOperationDB{} + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + operations := []*Operation{ + nil, // nil at index 0 + generateTestOperation(""), + } + + result, err := repo.CreateBulk(ctx, operations) + + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "nil operation at index 0") +} + +func TestOperationCreateBulk_NilElementAtEnd(t *testing.T) { + t.Parallel() + + mockDB := &mockOperationDB{} + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + operations := []*Operation{ + generateTestOperation(""), + generateTestOperation(""), + nil, // nil at end + } + + result, err := repo.CreateBulk(ctx, operations) + + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "nil operation at index 2") +} + +func TestOperationCreateBulk_SortsInputByID(t *testing.T) { + t.Parallel() + + // Create operations with IDs that will sort differently + op1 := generateTestOperation("ffffffff-ffff-ffff-ffff-ffffffffffff") // Highest + op2 := generateTestOperation("00000000-0000-0000-0000-000000000001") // Lowest + op3 := generateTestOperation("88888888-8888-8888-8888-888888888888") // Middle + + input := []*Operation{op1, op2, op3} + + // Verify initial order: op1 (highest) is first + assert.Equal(t, op1.ID, input[0].ID, "original order should have op1 first") + + // Verify lexicographic ordering assumption + assert.True(t, op2.ID < op3.ID, "op2 should be less than op3") + assert.True(t, op3.ID < op1.ID, "op3 should be less than op1") + + // Create mock DB that returns success + mockDB := &mockOperationDB{ + rowsAffected: 3, + } + + // Inject mock DB into context using tenant manager + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + // Call CreateBulk which sorts the slice in-place before inserting + result, err := repo.CreateBulk(ctx, input) + + require.NoError(t, err) + require.NotNil(t, result) + + // Verify the slice was sorted in-place by ID (ascending) + // Expected order after sort: op2 (lowest) -> op3 (middle) -> op1 (highest) + assert.Equal(t, op2.ID, input[0].ID, "after CreateBulk, first element should be op2 (lowest ID)") + assert.Equal(t, op3.ID, input[1].ID, "after CreateBulk, second element should be op3 (middle ID)") + assert.Equal(t, op1.ID, input[2].ID, "after CreateBulk, third element should be op1 (highest ID)") +} + +func TestOperationCreateBulk_ChunkingBoundaryConditions(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + inputCount int + expectedChunks int + }{ + {"single_item", 1, 1}, + {"exactly_999", 999, 1}, + {"exactly_1000", 1000, 1}, + {"exactly_1001", 1001, 2}, + {"exactly_2000", 2000, 2}, + {"exactly_2001", 2001, 3}, + {"exactly_3000", 3000, 3}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // Verify the chunking math + const chunkSize = 1000 + chunks := (tt.inputCount + chunkSize - 1) / chunkSize + assert.Equal(t, tt.expectedChunks, chunks, "chunk count should match") + }) + } +} + +func TestOperationBulkInsertResult_Invariant(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + attempted int64 + inserted int64 + }{ + {"all_inserted", 100, 100}, + {"all_ignored", 100, 0}, + {"partial", 100, 75}, + {"single_inserted", 1, 1}, + {"single_ignored", 1, 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + result := &repository.BulkInsertResult{ + Attempted: tt.attempted, + Inserted: tt.inserted, + Ignored: tt.attempted - tt.inserted, + } + + // Verify invariant: Attempted = Inserted + Ignored + assert.Equal(t, result.Attempted, result.Inserted+result.Ignored, + "invariant failed: Attempted should equal Inserted + Ignored") + }) + } +} + +func TestOperationBulkInsertResult_ZeroValues(t *testing.T) { + t.Parallel() + + result := &repository.BulkInsertResult{} + + assert.Equal(t, int64(0), result.Attempted) + assert.Equal(t, int64(0), result.Inserted) + assert.Equal(t, int64(0), result.Ignored) +} + +// mockOperationDB implements dbresolver.DB for testing +type mockOperationDB struct { + execErr error + rowsAffected int64 + rowsAffectedErr error +} + +func (m *mockOperationDB) Begin() (dbresolver.Tx, error) { + return nil, errors.New("not implemented") +} + +func (m *mockOperationDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (dbresolver.Tx, error) { + return nil, errors.New("not implemented") +} + +func (m *mockOperationDB) Close() error { + return nil +} + +func (m *mockOperationDB) Conn(ctx context.Context) (dbresolver.Conn, error) { + return nil, errors.New("not implemented") +} + +func (m *mockOperationDB) Driver() driver.Driver { + return nil +} + +func (m *mockOperationDB) Exec(query string, args ...any) (sql.Result, error) { + return m.ExecContext(context.Background(), query, args...) +} + +func (m *mockOperationDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + if m.execErr != nil { + return nil, m.execErr + } + + return &mockOperationResult{rowsAffected: m.rowsAffected, rowsAffectedErr: m.rowsAffectedErr}, nil +} + +func (m *mockOperationDB) Ping() error { + return nil +} + +func (m *mockOperationDB) PingContext(ctx context.Context) error { + return nil +} + +func (m *mockOperationDB) Prepare(query string) (dbresolver.Stmt, error) { + return nil, errors.New("not implemented") +} + +func (m *mockOperationDB) PrepareContext(ctx context.Context, query string) (dbresolver.Stmt, error) { + return nil, errors.New("not implemented") +} + +func (m *mockOperationDB) Query(query string, args ...any) (*sql.Rows, error) { + return nil, errors.New("not implemented") +} + +func (m *mockOperationDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { + return nil, errors.New("not implemented") +} + +func (m *mockOperationDB) QueryRow(query string, args ...any) *sql.Row { + return nil +} + +func (m *mockOperationDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { + return nil +} + +func (m *mockOperationDB) SetConnMaxIdleTime(d time.Duration) {} + +func (m *mockOperationDB) SetConnMaxLifetime(d time.Duration) {} + +func (m *mockOperationDB) SetMaxIdleConns(n int) {} + +func (m *mockOperationDB) SetMaxOpenConns(n int) {} + +func (m *mockOperationDB) PrimaryDBs() []*sql.DB { + return nil +} + +func (m *mockOperationDB) ReplicaDBs() []*sql.DB { + return nil +} + +func (m *mockOperationDB) Stats() sql.DBStats { + return sql.DBStats{} +} + +type mockOperationResult struct { + rowsAffected int64 + rowsAffectedErr error +} + +func (m *mockOperationResult) LastInsertId() (int64, error) { + return 0, nil +} + +func (m *mockOperationResult) RowsAffected() (int64, error) { + if m.rowsAffectedErr != nil { + return 0, m.rowsAffectedErr + } + + return m.rowsAffected, nil +} + +func TestInsertOperationChunk_ColumnCount(t *testing.T) { + t.Parallel() + + // Verify that operationColumnList has expected number of columns + // This ensures the bulk insert won't have column/value mismatch + expectedColumns := 30 // Based on operationColumnList definition + assert.Equal(t, expectedColumns, len(operationColumnList), + "operationColumnList should have %d columns", expectedColumns) +} + +func TestInsertOperationChunk_ParameterLimitCalculation(t *testing.T) { + t.Parallel() + + // Verify that 1000 rows * 30 columns stays under PostgreSQL's 65,535 limit + const chunkSize = 1000 + const columnCount = 30 // operationColumnList length + const postgresLimit = 65535 + + parametersPerChunk := chunkSize * columnCount + assert.Less(t, parametersPerChunk, postgresLimit, + "parameters per chunk (%d) should be less than PostgreSQL limit (%d)", + parametersPerChunk, postgresLimit) +} + +func TestOperationCreateBulk_AllNilElements(t *testing.T) { + t.Parallel() + + mockDB := &mockOperationDB{} + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + operations := []*Operation{nil, nil, nil} + + result, err := repo.CreateBulk(ctx, operations) + + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "nil operation at index 0") +} + +func TestOperationCreateBulk_MultipleNilElements(t *testing.T) { + t.Parallel() + + mockDB := &mockOperationDB{} + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + operations := []*Operation{ + generateTestOperation(""), + nil, + generateTestOperation(""), + nil, + } + + result, err := repo.CreateBulk(ctx, operations) + + require.Error(t, err) + assert.Nil(t, result) + // Should fail on first nil (index 1) + assert.Contains(t, err.Error(), "nil operation at index 1") +} + +// mockOperationDBSequence tracks call count and returns different results per call +type mockOperationDBSequence struct { + mockOperationDB + callCount int + resultsPerCall []mockCallResult +} + +type mockCallResult struct { + err error + rowsAffected int64 +} + +func (m *mockOperationDBSequence) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + if m.callCount < len(m.resultsPerCall) { + result := m.resultsPerCall[m.callCount] + m.callCount++ + + if result.err != nil { + return nil, result.err + } + + return &mockOperationResult{rowsAffected: result.rowsAffected}, nil + } + + m.callCount++ + + return &mockOperationResult{rowsAffected: 0}, nil +} + +func TestOperationCreateBulk_ChunkFailure_PartialResult(t *testing.T) { + t.Parallel() + + // Create 2001 operations to trigger 3 chunks (1000 + 1000 + 1) + operations := generateTestOperations(2001) + + // Mock: chunk 1 succeeds (1000 rows), chunk 2 fails + dbErr := errors.New("database connection lost") + mockDB := &mockOperationDBSequence{ + resultsPerCall: []mockCallResult{ + {rowsAffected: 1000}, // Chunk 1: success + {err: dbErr}, // Chunk 2: failure + }, + } + + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + result, err := repo.CreateBulk(ctx, operations) + + // Should return error + require.Error(t, err) + assert.Equal(t, dbErr, err) + + // Should return partial result + require.NotNil(t, result) + assert.Equal(t, int64(2001), result.Attempted, "Attempted should be total count") + assert.Equal(t, int64(1000), result.Inserted, "Inserted should reflect chunk 1 only") + assert.Equal(t, int64(0), result.Ignored, "Ignored should be 0 on error (unprocessed items are not duplicates)") +} + +func TestOperationCreateBulk_FirstChunkFailure(t *testing.T) { + t.Parallel() + + operations := generateTestOperations(500) + + // Mock: first chunk fails immediately + dbErr := errors.New("connection refused") + mockDB := &mockOperationDBSequence{ + resultsPerCall: []mockCallResult{ + {err: dbErr}, // Chunk 1: failure + }, + } + + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + result, err := repo.CreateBulk(ctx, operations) + + require.Error(t, err) + assert.Equal(t, dbErr, err) + + require.NotNil(t, result) + assert.Equal(t, int64(500), result.Attempted) + assert.Equal(t, int64(0), result.Inserted, "No rows should be inserted when first chunk fails") + assert.Equal(t, int64(0), result.Ignored, "Ignored should be 0 on error") +} + +func TestOperationCreateBulk_ContextCancellation(t *testing.T) { + t.Parallel() + + // Create enough operations to require multiple chunks + operations := generateTestOperations(2500) + + // Mock: chunk 1 succeeds, then context is cancelled before chunk 2 + mockDB := &mockOperationDBSequence{ + resultsPerCall: []mockCallResult{ + {rowsAffected: 1000}, // Chunk 1: success + {rowsAffected: 1000}, // Chunk 2: would succeed but context cancelled + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + ctx = tmcore.ContextWithModulePGConnection(ctx, "transaction", mockDB) + + repo := &OperationPostgreSQLRepository{ + connection: nil, + tableName: "operation", + requireTenant: false, + } + + // Cancel context before calling CreateBulk + cancel() + + result, err := repo.CreateBulk(ctx, operations) + + // Should return context error + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) + + // Should return partial result (0 since cancelled before first chunk) + require.NotNil(t, result) + assert.Equal(t, int64(2500), result.Attempted) + assert.Equal(t, int64(0), result.Inserted, "No rows inserted when context cancelled before first chunk") + assert.Equal(t, int64(0), result.Ignored) +} diff --git a/components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go b/components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go index 09ebd4bc1..1208ab4d5 100644 --- a/components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go +++ b/components/transaction/internal/adapters/postgres/transaction/transaction.postgresql.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "reflect" + "sort" "strconv" "strings" "time" @@ -26,6 +27,7 @@ import ( "github.com/LerianStudio/midaz/v3/pkg" "github.com/LerianStudio/midaz/v3/pkg/constant" "github.com/LerianStudio/midaz/v3/pkg/net/http" + "github.com/LerianStudio/midaz/v3/pkg/repository" "github.com/Masterminds/squirrel" "github.com/bxcodec/dbresolver/v2" "github.com/google/uuid" @@ -74,6 +76,8 @@ var transactionColumnListPrefixed = []string{ // It defines methods for creating, retrieving, updating, and deleting transactions. type Repository interface { Create(ctx context.Context, transaction *Transaction) (*Transaction, error) + CreateBulk(ctx context.Context, transactions []*Transaction) (*repository.BulkInsertResult, error) + CreateBulkTx(ctx context.Context, tx repository.DBExecutor, transactions []*Transaction) (*repository.BulkInsertResult, error) FindAll(ctx context.Context, organizationID, ledgerID uuid.UUID, filter http.Pagination) ([]*Transaction, libHTTP.CursorPagination, error) Find(ctx context.Context, organizationID, ledgerID, id uuid.UUID) (*Transaction, error) FindByParentID(ctx context.Context, organizationID, ledgerID, parentID uuid.UUID) (*Transaction, error) @@ -200,6 +204,194 @@ func (r *TransactionPostgreSQLRepository) Create(ctx context.Context, transactio return record.ToEntity(), nil } +// CreateBulk inserts multiple transactions in bulk using multi-row INSERT with ON CONFLICT DO NOTHING. +// Returns BulkInsertResult with counts of attempted, inserted, and ignored (duplicate) rows. +// Transactions are sorted by ID before insert to prevent deadlocks in concurrent scenarios. +// Large bulks are automatically chunked to stay within PostgreSQL's parameter limits. +// +// NOTE: Chunks are committed independently. If chunk N fails, chunks 1 to N-1 remain committed. +// On error, partial results are returned along with the error. Retry is safe due to idempotency. +// On error, only Inserted is reliable; Ignored remains 0 since unprocessed chunks are not duplicates. +// +// NOTE: The input slice is sorted in-place by ID. Callers should not rely on original order after this call. +func (r *TransactionPostgreSQLRepository) CreateBulk(ctx context.Context, transactions []*Transaction) (*repository.BulkInsertResult, error) { + logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) + + ctx, span := tracer.Start(ctx, "postgres.create_bulk_transactions") + defer span.End() + + // Early return for empty input before acquiring DB connection + if len(transactions) == 0 { + return &repository.BulkInsertResult{}, nil + } + + db, err := r.getDB(ctx) + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to get database connection", err) + logger.Log(ctx, libLog.LevelError, fmt.Sprintf("Failed to get database connection: %v", err)) + + return nil, err + } + + return r.createBulkInternal(ctx, db, transactions, "postgres.create_bulk_transactions_internal", "") +} + +// CreateBulkTx inserts multiple transactions in bulk using a caller-provided transaction. +// This allows the caller to control transaction boundaries for atomic multi-table operations. +// Returns BulkInsertResult with counts of attempted, inserted, and ignored (duplicate) rows. +// Transactions are sorted by ID before insert to prevent deadlocks in concurrent scenarios. +// Large bulks are automatically chunked to stay within PostgreSQL's parameter limits. +// +// NOTE: The caller is responsible for calling Commit() or Rollback() on the transaction. +// On error, partial results are returned along with the error. The caller should rollback. +// On error, only Inserted is reliable; Ignored remains 0 since unprocessed chunks are not duplicates. +// +// NOTE: The input slice is sorted in-place by ID. Callers should not rely on original order after this call. +func (r *TransactionPostgreSQLRepository) CreateBulkTx(ctx context.Context, tx repository.DBExecutor, transactions []*Transaction) (*repository.BulkInsertResult, error) { + if tx == nil { + return nil, repository.ErrNilDBExecutor + } + + return r.createBulkInternal(ctx, tx, transactions, "postgres.create_bulk_transactions_tx", " (tx)") +} + +// createBulkInternal contains the shared logic for CreateBulk and CreateBulkTx. +// It validates input, sorts transactions by ID to prevent deadlocks, and inserts in chunks. +// Returns partial results on error with Attempted/Inserted counts. +func (r *TransactionPostgreSQLRepository) createBulkInternal( + ctx context.Context, + db repository.DBExecutor, + transactions []*Transaction, + spanName string, + logSuffix string, +) (*repository.BulkInsertResult, error) { + logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) + + ctx, span := tracer.Start(ctx, spanName) + defer span.End() + + if len(transactions) == 0 { + return &repository.BulkInsertResult{}, nil + } + + // Validate no nil elements to prevent panic during sort or insert + for i, txn := range transactions { + if txn == nil { + err := fmt.Errorf("nil transaction at index %d", i) + libOpentelemetry.HandleSpanError(span, "Invalid input: nil transaction", err) + + return nil, err + } + } + + // Sort by ID (string UUID) to prevent deadlocks in concurrent bulk operations + sort.Slice(transactions, func(i, j int) bool { + return transactions[i].ID < transactions[j].ID + }) + + result := &repository.BulkInsertResult{Attempted: int64(len(transactions))} + + // Chunk into bulks of ~1,000 rows to stay within PostgreSQL's parameter limit + // Transaction has 15 columns, so 1000 rows = 15,000 parameters (under 65,535 limit) + const chunkSize = 1000 + + for i := 0; i < len(transactions); i += chunkSize { + // Check for context cancellation between chunks + select { + case <-ctx.Done(): + libOpentelemetry.HandleSpanError(span, "Context cancelled during bulk insert", ctx.Err()) + logger.Log(ctx, libLog.LevelWarn, fmt.Sprintf("Context cancelled during bulk insert: %v", ctx.Err())) + + // Return partial result; Ignored stays 0 since remaining items were not processed + return result, ctx.Err() + default: + } + + end := min(i+chunkSize, len(transactions)) + + chunkInserted, err := r.insertTransactionChunk(ctx, db, transactions[i:end]) + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to insert transaction chunk", err) + logger.Log(ctx, libLog.LevelError, fmt.Sprintf("Failed to insert transaction chunk: %v", err)) + + // Return partial result; Ignored stays 0 since remaining items were not processed (not duplicates) + return result, err + } + + result.Inserted += chunkInserted + } + + result.Ignored = result.Attempted - result.Inserted + + logger.Log(ctx, libLog.LevelInfo, fmt.Sprintf("Bulk insert transactions%s: attempted=%d, inserted=%d, ignored=%d", + logSuffix, result.Attempted, result.Inserted, result.Ignored)) + + return result, nil +} + +// insertTransactionChunk inserts a chunk of transactions using multi-row INSERT. +// Uses repository.DBExecutor to work with both dbresolver.DB and dbresolver.Tx. +func (r *TransactionPostgreSQLRepository) insertTransactionChunk(ctx context.Context, db repository.DBExecutor, transactions []*Transaction) (int64, error) { + logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) + + ctx, span := tracer.Start(ctx, "postgres.insert_transaction_chunk") + defer span.End() + + logger.Log(ctx, libLog.LevelDebug, fmt.Sprintf("Inserting chunk of %d transactions", len(transactions))) + + builder := squirrel.Insert(r.tableName). + Columns(transactionColumnList...). + PlaceholderFormat(squirrel.Dollar) + + for _, tx := range transactions { + record := &TransactionPostgreSQLModel{} + record.FromEntity(tx) + + builder = builder.Values( + record.ID, + record.ParentTransactionID, + record.Description, + record.Status, + record.StatusDescription, + record.Amount, + record.AssetCode, + record.ChartOfAccountsGroupName, + record.LedgerID, + record.OrganizationID, + record.Body, + record.CreatedAt, + record.UpdatedAt, + record.DeletedAt, + record.Route, + ) + } + + builder = builder.Suffix("ON CONFLICT (id) DO NOTHING") + + query, args, err := builder.ToSql() + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to build bulk insert query", err) + + return 0, err + } + + execResult, err := db.ExecContext(ctx, query, args...) + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to execute bulk insert", err) + + return 0, err + } + + rowsAffected, err := execResult.RowsAffected() + if err != nil { + libOpentelemetry.HandleSpanError(span, "Failed to get rows affected", err) + + return 0, err + } + + return rowsAffected, nil +} + // FindAll retrieves Transactions entities from the database. func (r *TransactionPostgreSQLRepository) FindAll(ctx context.Context, organizationID, ledgerID uuid.UUID, filter http.Pagination) ([]*Transaction, libHTTP.CursorPagination, error) { logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) diff --git a/components/transaction/internal/adapters/postgres/transaction/transaction.postgresql_mock.go b/components/transaction/internal/adapters/postgres/transaction/transaction.postgresql_mock.go index 00e364fdc..392e66ba8 100644 --- a/components/transaction/internal/adapters/postgres/transaction/transaction.postgresql_mock.go +++ b/components/transaction/internal/adapters/postgres/transaction/transaction.postgresql_mock.go @@ -15,6 +15,7 @@ import ( http "github.com/LerianStudio/lib-commons/v4/commons/net/http" http0 "github.com/LerianStudio/midaz/v3/pkg/net/http" + repository "github.com/LerianStudio/midaz/v3/pkg/repository" uuid "github.com/google/uuid" gomock "go.uber.org/mock/gomock" ) @@ -58,6 +59,36 @@ func (mr *MockRepositoryMockRecorder) Create(ctx, transaction any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockRepository)(nil).Create), ctx, transaction) } +// CreateBulk mocks base method. +func (m *MockRepository) CreateBulk(ctx context.Context, transactions []*Transaction) (*repository.BulkInsertResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateBulk", ctx, transactions) + ret0, _ := ret[0].(*repository.BulkInsertResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateBulk indicates an expected call of CreateBulk. +func (mr *MockRepositoryMockRecorder) CreateBulk(ctx, transactions any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateBulk", reflect.TypeOf((*MockRepository)(nil).CreateBulk), ctx, transactions) +} + +// CreateBulkTx mocks base method. +func (m *MockRepository) CreateBulkTx(ctx context.Context, tx repository.DBExecutor, transactions []*Transaction) (*repository.BulkInsertResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateBulkTx", ctx, tx, transactions) + ret0, _ := ret[0].(*repository.BulkInsertResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateBulkTx indicates an expected call of CreateBulkTx. +func (mr *MockRepositoryMockRecorder) CreateBulkTx(ctx, tx, transactions any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateBulkTx", reflect.TypeOf((*MockRepository)(nil).CreateBulkTx), ctx, tx, transactions) +} + // Delete mocks base method. func (m *MockRepository) Delete(ctx context.Context, organizationID, ledgerID, id uuid.UUID) error { m.ctrl.T.Helper() diff --git a/components/transaction/internal/adapters/postgres/transaction/transaction_createbulk_test.go b/components/transaction/internal/adapters/postgres/transaction/transaction_createbulk_test.go new file mode 100644 index 000000000..b33f285e7 --- /dev/null +++ b/components/transaction/internal/adapters/postgres/transaction/transaction_createbulk_test.go @@ -0,0 +1,539 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package transaction + +import ( + "context" + "database/sql" + "database/sql/driver" + "errors" + "testing" + "time" + + tmcore "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/core" + "github.com/LerianStudio/midaz/v3/pkg/repository" + "github.com/bxcodec/dbresolver/v2" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// generateTestTransaction creates a test transaction with the given ID or generates a new UUID. +func generateTestTransaction(id string) *Transaction { + if id == "" { + id = uuid.NewString() + } + + now := time.Now().UTC() + + return &Transaction{ + ID: id, + Description: "Test transaction " + id[:8], + Status: Status{Code: "PENDING"}, + AssetCode: "USD", + LedgerID: uuid.NewString(), + OrganizationID: uuid.NewString(), + CreatedAt: now, + UpdatedAt: now, + } +} + +// generateTestTransactions creates n test transactions with unique UUIDs. +func generateTestTransactions(n int) []*Transaction { + transactions := make([]*Transaction, n) + for i := range n { + transactions[i] = generateTestTransaction("") + } + + return transactions +} + +func TestCreateBulk_EmptyInput(t *testing.T) { + t.Parallel() + + repo := &TransactionPostgreSQLRepository{ + connection: nil, // Will return empty result before DB call + tableName: "transaction", + } + + result, err := repo.CreateBulk(context.Background(), []*Transaction{}) + + require.NoError(t, err, "empty input should not error") + assert.Equal(t, int64(0), result.Attempted) + assert.Equal(t, int64(0), result.Inserted) + assert.Equal(t, int64(0), result.Ignored) +} + +func TestCreateBulk_NilInput(t *testing.T) { + t.Parallel() + + repo := &TransactionPostgreSQLRepository{ + connection: nil, + tableName: "transaction", + } + + result, err := repo.CreateBulk(context.Background(), nil) + + require.NoError(t, err, "nil input should be treated as empty") + assert.Equal(t, int64(0), result.Attempted) + assert.Equal(t, int64(0), result.Inserted) + assert.Equal(t, int64(0), result.Ignored) +} + +func TestCreateBulk_NilElementInSlice(t *testing.T) { + t.Parallel() + + mockDB := &bulkMockDB{} + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &TransactionPostgreSQLRepository{ + connection: nil, + tableName: "transaction", + requireTenant: false, + } + + transactions := []*Transaction{ + generateTestTransaction(""), + nil, // nil element + generateTestTransaction(""), + } + + result, err := repo.CreateBulk(ctx, transactions) + + require.Error(t, err, "should error on nil element") + assert.Nil(t, result) + assert.Contains(t, err.Error(), "nil transaction at index 1") +} + +func TestCreateBulk_NilElementAtStart(t *testing.T) { + t.Parallel() + + mockDB := &bulkMockDB{} + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &TransactionPostgreSQLRepository{ + connection: nil, + tableName: "transaction", + requireTenant: false, + } + + transactions := []*Transaction{ + nil, // nil at index 0 + generateTestTransaction(""), + } + + result, err := repo.CreateBulk(ctx, transactions) + + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "nil transaction at index 0") +} + +func TestCreateBulk_NilElementAtEnd(t *testing.T) { + t.Parallel() + + mockDB := &bulkMockDB{} + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &TransactionPostgreSQLRepository{ + connection: nil, + tableName: "transaction", + requireTenant: false, + } + + transactions := []*Transaction{ + generateTestTransaction(""), + generateTestTransaction(""), + nil, // nil at end + } + + result, err := repo.CreateBulk(ctx, transactions) + + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "nil transaction at index 2") +} + +func TestCreateBulk_SortsInputByID(t *testing.T) { + t.Parallel() + + // Create transactions with IDs that will sort differently + tx1 := generateTestTransaction("ffffffff-ffff-ffff-ffff-ffffffffffff") // Highest + tx2 := generateTestTransaction("00000000-0000-0000-0000-000000000001") // Lowest + tx3 := generateTestTransaction("88888888-8888-8888-8888-888888888888") // Middle + + input := []*Transaction{tx1, tx2, tx3} + + // Verify initial order: tx1 (highest) is first + assert.Equal(t, tx1.ID, input[0].ID, "original order should have tx1 first") + + // Verify lexicographic ordering assumption + assert.True(t, tx2.ID < tx3.ID, "tx2 should be less than tx3") + assert.True(t, tx3.ID < tx1.ID, "tx3 should be less than tx1") + + // Create mock DB that returns success + mockDB := &bulkMockDB{ + rowsAffected: 3, + } + + // Inject mock DB into context using tenant manager + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &TransactionPostgreSQLRepository{ + connection: nil, + tableName: "transaction", + requireTenant: false, + } + + // Call CreateBulk which sorts the slice in-place before inserting + result, err := repo.CreateBulk(ctx, input) + + require.NoError(t, err) + require.NotNil(t, result) + + // Verify the slice was sorted in-place by ID (ascending) + // Expected order after sort: tx2 (lowest) -> tx3 (middle) -> tx1 (highest) + assert.Equal(t, tx2.ID, input[0].ID, "after CreateBulk, first element should be tx2 (lowest ID)") + assert.Equal(t, tx3.ID, input[1].ID, "after CreateBulk, second element should be tx3 (middle ID)") + assert.Equal(t, tx1.ID, input[2].ID, "after CreateBulk, third element should be tx1 (highest ID)") +} + +func TestCreateBulk_ChunkingBoundaryConditions(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + inputCount int + expectedChunks int + }{ + {"single_item", 1, 1}, + {"exactly_999", 999, 1}, + {"exactly_1000", 1000, 1}, + {"exactly_1001", 1001, 2}, + {"exactly_2000", 2000, 2}, + {"exactly_2001", 2001, 3}, + {"exactly_3000", 3000, 3}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // Verify the chunking math + const chunkSize = 1000 + chunks := (tt.inputCount + chunkSize - 1) / chunkSize + assert.Equal(t, tt.expectedChunks, chunks, "chunk count should match") + }) + } +} + +func TestBulkInsertResult_Invariant(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + attempted int64 + inserted int64 + }{ + {"all_inserted", 100, 100}, + {"all_ignored", 100, 0}, + {"partial", 100, 75}, + {"single_inserted", 1, 1}, + {"single_ignored", 1, 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + result := &repository.BulkInsertResult{ + Attempted: tt.attempted, + Inserted: tt.inserted, + Ignored: tt.attempted - tt.inserted, + } + + // Verify invariant: Attempted = Inserted + Ignored + assert.Equal(t, result.Attempted, result.Inserted+result.Ignored, + "invariant failed: Attempted should equal Inserted + Ignored") + }) + } +} + +func TestBulkInsertResult_ZeroValues(t *testing.T) { + t.Parallel() + + result := &repository.BulkInsertResult{} + + assert.Equal(t, int64(0), result.Attempted) + assert.Equal(t, int64(0), result.Inserted) + assert.Equal(t, int64(0), result.Ignored) +} + +// bulkMockDB implements dbresolver.DB for testing bulk operations +type bulkMockDB struct { + execErr error + rowsAffected int64 + rowsAffectedErr error +} + +func (m *bulkMockDB) Begin() (dbresolver.Tx, error) { + return nil, errors.New("not implemented") +} + +func (m *bulkMockDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (dbresolver.Tx, error) { + return nil, errors.New("not implemented") +} + +func (m *bulkMockDB) Close() error { + return nil +} + +func (m *bulkMockDB) Conn(ctx context.Context) (dbresolver.Conn, error) { + return nil, errors.New("not implemented") +} + +func (m *bulkMockDB) Driver() driver.Driver { + return nil +} + +func (m *bulkMockDB) Exec(query string, args ...any) (sql.Result, error) { + return m.ExecContext(context.Background(), query, args...) +} + +func (m *bulkMockDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + if m.execErr != nil { + return nil, m.execErr + } + + return &bulkMockResult{rowsAffected: m.rowsAffected, rowsAffectedErr: m.rowsAffectedErr}, nil +} + +func (m *bulkMockDB) Ping() error { + return nil +} + +func (m *bulkMockDB) PingContext(ctx context.Context) error { + return nil +} + +func (m *bulkMockDB) Prepare(query string) (dbresolver.Stmt, error) { + return nil, errors.New("not implemented") +} + +func (m *bulkMockDB) PrepareContext(ctx context.Context, query string) (dbresolver.Stmt, error) { + return nil, errors.New("not implemented") +} + +func (m *bulkMockDB) Query(query string, args ...any) (*sql.Rows, error) { + return nil, errors.New("not implemented") +} + +func (m *bulkMockDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { + return nil, errors.New("not implemented") +} + +func (m *bulkMockDB) QueryRow(query string, args ...any) *sql.Row { + return nil +} + +func (m *bulkMockDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { + return nil +} + +func (m *bulkMockDB) SetConnMaxIdleTime(d time.Duration) {} + +func (m *bulkMockDB) SetConnMaxLifetime(d time.Duration) {} + +func (m *bulkMockDB) SetMaxIdleConns(n int) {} + +func (m *bulkMockDB) SetMaxOpenConns(n int) {} + +func (m *bulkMockDB) PrimaryDBs() []*sql.DB { + return nil +} + +func (m *bulkMockDB) ReplicaDBs() []*sql.DB { + return nil +} + +func (m *bulkMockDB) Stats() sql.DBStats { + return sql.DBStats{} +} + +type bulkMockResult struct { + rowsAffected int64 + rowsAffectedErr error +} + +func (m *bulkMockResult) LastInsertId() (int64, error) { + return 0, nil +} + +func (m *bulkMockResult) RowsAffected() (int64, error) { + if m.rowsAffectedErr != nil { + return 0, m.rowsAffectedErr + } + + return m.rowsAffected, nil +} + +func TestInsertTransactionChunk_ColumnCount(t *testing.T) { + t.Parallel() + + // Verify that transactionColumnList has expected number of columns + // This ensures the bulk insert won't have column/value mismatch + expectedColumns := 15 // Based on transactionColumnList definition + assert.Equal(t, expectedColumns, len(transactionColumnList), + "transactionColumnList should have %d columns", expectedColumns) +} + +func TestInsertTransactionChunk_ParameterLimitCalculation(t *testing.T) { + t.Parallel() + + // Verify that 1000 rows * 15 columns stays under PostgreSQL's 65,535 limit + const chunkSize = 1000 + const columnCount = 15 // transactionColumnList length + const postgresLimit = 65535 + + parametersPerChunk := chunkSize * columnCount + assert.Less(t, parametersPerChunk, postgresLimit, + "parameters per chunk (%d) should be less than PostgreSQL limit (%d)", + parametersPerChunk, postgresLimit) +} + +// bulkMockDBSequence tracks call count and returns different results per call +type bulkMockDBSequence struct { + bulkMockDB + callCount int + resultsPerCall []bulkMockCallResult +} + +type bulkMockCallResult struct { + err error + rowsAffected int64 +} + +func (m *bulkMockDBSequence) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + if m.callCount < len(m.resultsPerCall) { + result := m.resultsPerCall[m.callCount] + m.callCount++ + + if result.err != nil { + return nil, result.err + } + + return &bulkMockResult{rowsAffected: result.rowsAffected}, nil + } + + m.callCount++ + + return &bulkMockResult{rowsAffected: 0}, nil +} + +func TestCreateBulk_ChunkFailure_PartialResult(t *testing.T) { + t.Parallel() + + // Create 2001 transactions to trigger 3 chunks (1000 + 1000 + 1) + transactions := generateTestTransactions(2001) + + // Mock: chunk 1 succeeds (1000 rows), chunk 2 fails + dbErr := errors.New("database connection lost") + mockDB := &bulkMockDBSequence{ + resultsPerCall: []bulkMockCallResult{ + {rowsAffected: 1000}, // Chunk 1: success + {err: dbErr}, // Chunk 2: failure + }, + } + + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &TransactionPostgreSQLRepository{ + connection: nil, + tableName: "transaction", + requireTenant: false, + } + + result, err := repo.CreateBulk(ctx, transactions) + + // Should return error + require.Error(t, err) + assert.Equal(t, dbErr, err) + + // Should return partial result + require.NotNil(t, result) + assert.Equal(t, int64(2001), result.Attempted, "Attempted should be total count") + assert.Equal(t, int64(1000), result.Inserted, "Inserted should reflect chunk 1 only") + assert.Equal(t, int64(0), result.Ignored, "Ignored should be 0 on error (unprocessed items are not duplicates)") +} + +func TestCreateBulk_FirstChunkFailure(t *testing.T) { + t.Parallel() + + transactions := generateTestTransactions(500) + + // Mock: first chunk fails immediately + dbErr := errors.New("connection refused") + mockDB := &bulkMockDBSequence{ + resultsPerCall: []bulkMockCallResult{ + {err: dbErr}, // Chunk 1: failure + }, + } + + ctx := tmcore.ContextWithModulePGConnection(context.Background(), "transaction", mockDB) + + repo := &TransactionPostgreSQLRepository{ + connection: nil, + tableName: "transaction", + requireTenant: false, + } + + result, err := repo.CreateBulk(ctx, transactions) + + require.Error(t, err) + assert.Equal(t, dbErr, err) + + require.NotNil(t, result) + assert.Equal(t, int64(500), result.Attempted) + assert.Equal(t, int64(0), result.Inserted, "No rows should be inserted when first chunk fails") + assert.Equal(t, int64(0), result.Ignored, "Ignored should be 0 on error") +} + +func TestCreateBulk_ContextCancellation(t *testing.T) { + t.Parallel() + + // Create enough transactions to require multiple chunks + transactions := generateTestTransactions(2500) + + // Mock: chunk 1 succeeds, then context is cancelled before chunk 2 + mockDB := &bulkMockDBSequence{ + resultsPerCall: []bulkMockCallResult{ + {rowsAffected: 1000}, // Chunk 1: would succeed but context cancelled + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + ctx = tmcore.ContextWithModulePGConnection(ctx, "transaction", mockDB) + + repo := &TransactionPostgreSQLRepository{ + connection: nil, + tableName: "transaction", + requireTenant: false, + } + + // Cancel context before calling CreateBulk + cancel() + + result, err := repo.CreateBulk(ctx, transactions) + + // Should return context error + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) + + // Should return partial result (0 since cancelled before first chunk) + require.NotNil(t, result) + assert.Equal(t, int64(2500), result.Attempted) + assert.Equal(t, int64(0), result.Inserted, "No rows inserted when context cancelled before first chunk") + assert.Equal(t, int64(0), result.Ignored) +} diff --git a/pkg/repository/bulk.go b/pkg/repository/bulk.go new file mode 100644 index 000000000..32187c0be --- /dev/null +++ b/pkg/repository/bulk.go @@ -0,0 +1,30 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +// Package repository provides shared types for database repository operations. +package repository + +import ( + "context" + "database/sql" + "errors" +) + +// ErrNilDBExecutor is returned when a nil database executor is provided to bulk operations. +var ErrNilDBExecutor = errors.New("nil database executor provided") + +// DBExecutor is a minimal interface satisfied by both dbresolver.DB and dbresolver.Tx. +// This allows bulk insert operations to work with either a direct database connection +// or within an external transaction controlled by the caller. +type DBExecutor interface { + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) +} + +// BulkInsertResult contains the counts from a bulk insert operation. +// It tracks how many rows were attempted, actually inserted, and ignored (duplicates). +type BulkInsertResult struct { + Attempted int64 // Rows sent to INSERT + Inserted int64 // Rows actually inserted + Ignored int64 // Rows skipped (duplicates via ON CONFLICT DO NOTHING) +} diff --git a/pkg/utils/metrics.go b/pkg/utils/metrics.go index ed4ada204..9b25b2056 100644 --- a/pkg/utils/metrics.go +++ b/pkg/utils/metrics.go @@ -64,4 +64,69 @@ var ( Unit: "1", Description: "Total messages processed per tenant.", } + + // Bulk Recorder metrics for transaction layer bulk insert operations + + // BulkRecorderTransactionsAttempted counts transactions sent to bulk INSERT. + BulkRecorderTransactionsAttempted = metrics.Metric{ + Name: "bulk_recorder_transactions_attempted_total", + Unit: "1", + Description: "Total transactions sent to bulk INSERT.", + } + + // BulkRecorderTransactionsInserted counts transactions actually inserted. + BulkRecorderTransactionsInserted = metrics.Metric{ + Name: "bulk_recorder_transactions_inserted_total", + Unit: "1", + Description: "Total transactions actually inserted (excluding duplicates).", + } + + // BulkRecorderTransactionsIgnored counts transactions skipped due to duplicates. + BulkRecorderTransactionsIgnored = metrics.Metric{ + Name: "bulk_recorder_transactions_ignored_total", + Unit: "1", + Description: "Total transactions skipped (duplicates via ON CONFLICT DO NOTHING).", + } + + // BulkRecorderOperationsAttempted counts operations sent to bulk INSERT. + BulkRecorderOperationsAttempted = metrics.Metric{ + Name: "bulk_recorder_operations_attempted_total", + Unit: "1", + Description: "Total operations sent to bulk INSERT.", + } + + // BulkRecorderOperationsInserted counts operations actually inserted. + BulkRecorderOperationsInserted = metrics.Metric{ + Name: "bulk_recorder_operations_inserted_total", + Unit: "1", + Description: "Total operations actually inserted (excluding duplicates).", + } + + // BulkRecorderOperationsIgnored counts operations skipped due to duplicates. + BulkRecorderOperationsIgnored = metrics.Metric{ + Name: "bulk_recorder_operations_ignored_total", + Unit: "1", + Description: "Total operations skipped (duplicates via ON CONFLICT DO NOTHING).", + } + + // BulkRecorderBulkSize tracks the number of messages per bulk. + BulkRecorderBulkSize = metrics.Metric{ + Name: "bulk_recorder_bulk_size", + Unit: "1", + Description: "Number of messages per bulk processing batch.", + } + + // BulkRecorderBulkDuration tracks the time taken for each bulk processing. + BulkRecorderBulkDuration = metrics.Metric{ + Name: "bulk_recorder_bulk_duration_seconds", + Unit: "s", + Description: "Time taken for bulk processing in seconds.", + } + + // BulkRecorderFallbackTotal counts fallback activations when bulk fails. + BulkRecorderFallbackTotal = metrics.Metric{ + Name: "bulk_recorder_fallback_total", + Unit: "1", + Description: "Total fallback activations when bulk processing fails.", + } )