Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -22,6 +23,7 @@ import (
"github.com/LerianStudio/midaz/v3/pkg"
"github.com/LerianStudio/midaz/v3/pkg/constant"
"github.com/LerianStudio/midaz/v3/pkg/net/http"
"github.com/LerianStudio/midaz/v3/pkg/repository"
"github.com/Masterminds/squirrel"
"github.com/bxcodec/dbresolver/v2"
"github.com/google/uuid"
Expand All @@ -44,6 +46,8 @@ type OperationFilter struct {

type Repository interface {
Create(ctx context.Context, operation *Operation) (*Operation, error)
CreateBulk(ctx context.Context, operations []*Operation) (*repository.BulkInsertResult, error)
CreateBulkTx(ctx context.Context, tx repository.DBExecutor, operations []*Operation) (*repository.BulkInsertResult, error)
FindAll(ctx context.Context, organizationID, ledgerID, transactionID uuid.UUID, filter http.Pagination) ([]*Operation, libHTTP.CursorPagination, error)
FindAllByAccount(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, opFilter OperationFilter, filter http.Pagination) ([]*Operation, libHTTP.CursorPagination, error)
Find(ctx context.Context, organizationID, ledgerID, transactionID, id uuid.UUID) (*Operation, error)
Expand Down Expand Up @@ -250,6 +254,209 @@ func (r *OperationPostgreSQLRepository) Create(ctx context.Context, operation *O
return record.ToEntity(), nil
}

// CreateBulk inserts multiple operations in bulk using multi-row INSERT with ON CONFLICT DO NOTHING.
// Returns BulkInsertResult with counts of attempted, inserted, and ignored (duplicate) rows.
// Operations are sorted by ID before insert to prevent deadlocks in concurrent scenarios.
// Large bulks are automatically chunked to stay within PostgreSQL's parameter limits.
//
// NOTE: Chunks are committed independently. If chunk N fails, chunks 1 to N-1 remain committed.
// On error, partial results are returned along with the error. Retry is safe due to idempotency.
// On error, only Inserted is reliable; Ignored remains 0 since unprocessed chunks are not duplicates.
//
// NOTE: The input slice is sorted in-place by ID. Callers should not rely on original order after this call.
func (r *OperationPostgreSQLRepository) CreateBulk(ctx context.Context, operations []*Operation) (*repository.BulkInsertResult, error) {
logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx)

ctx, span := tracer.Start(ctx, "postgres.create_bulk_operations")
defer span.End()

// Early return for empty input before acquiring DB connection
if len(operations) == 0 {
return &repository.BulkInsertResult{}, nil
}

db, err := r.getDB(ctx)
if err != nil {
libOpentelemetry.HandleSpanError(span, "Failed to get database connection", err)
logger.Log(ctx, libLog.LevelError, fmt.Sprintf("Failed to get database connection: %v", err))

return nil, err
}

return r.createBulkInternal(ctx, db, operations, "postgres.create_bulk_operations_internal", "")
}

// CreateBulkTx inserts multiple operations in bulk using a caller-provided transaction.
// This allows the caller to control transaction boundaries for atomic multi-table operations.
// Returns BulkInsertResult with counts of attempted, inserted, and ignored (duplicate) rows.
// Operations are sorted by ID before insert to prevent deadlocks in concurrent scenarios.
// Large bulks are automatically chunked to stay within PostgreSQL's parameter limits.
//
// NOTE: The caller is responsible for calling Commit() or Rollback() on the transaction.
// On error, partial results are returned along with the error. The caller should rollback.
// On error, only Inserted is reliable; Ignored remains 0 since unprocessed chunks are not duplicates.
//
// NOTE: The input slice is sorted in-place by ID. Callers should not rely on original order after this call.
func (r *OperationPostgreSQLRepository) CreateBulkTx(ctx context.Context, tx repository.DBExecutor, operations []*Operation) (*repository.BulkInsertResult, error) {
if tx == nil {
return nil, repository.ErrNilDBExecutor
}

return r.createBulkInternal(ctx, tx, operations, "postgres.create_bulk_operations_tx", " (tx)")
}

// createBulkInternal contains the shared logic for CreateBulk and CreateBulkTx.
// It validates input, sorts operations by ID to prevent deadlocks, and inserts in chunks.
// Returns partial results on error with Attempted/Inserted counts.
func (r *OperationPostgreSQLRepository) createBulkInternal(
ctx context.Context,
db repository.DBExecutor,
operations []*Operation,
spanName string,
logSuffix string,
) (*repository.BulkInsertResult, error) {
logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx)

ctx, span := tracer.Start(ctx, spanName)
defer span.End()

if len(operations) == 0 {
return &repository.BulkInsertResult{}, nil
}

// Validate no nil elements to prevent panic during sort or insert
for i, op := range operations {
if op == nil {
err := fmt.Errorf("nil operation at index %d", i)
libOpentelemetry.HandleSpanError(span, "Invalid input: nil operation", err)

return nil, err
}
}

// Sort by ID (string UUID) to prevent deadlocks in concurrent bulk operations
sort.Slice(operations, func(i, j int) bool {
return operations[i].ID < operations[j].ID
})

result := &repository.BulkInsertResult{Attempted: int64(len(operations))}

// Chunk into bulks of ~1,000 rows to stay within PostgreSQL's parameter limit
// Operation has 30 columns, so 1000 rows = 30,000 parameters (under 65,535 limit)
const chunkSize = 1000

for i := 0; i < len(operations); i += chunkSize {
// Check for context cancellation between chunks
select {
case <-ctx.Done():
libOpentelemetry.HandleSpanError(span, "Context cancelled during bulk insert", ctx.Err())
logger.Log(ctx, libLog.LevelWarn, fmt.Sprintf("Context cancelled during bulk insert: %v", ctx.Err()))

// Return partial result; Ignored stays 0 since remaining items were not processed
return result, ctx.Err()
default:
}

end := min(i+chunkSize, len(operations))

chunkInserted, err := r.insertOperationChunk(ctx, db, operations[i:end])
if err != nil {
libOpentelemetry.HandleSpanError(span, "Failed to insert operation chunk", err)
logger.Log(ctx, libLog.LevelError, fmt.Sprintf("Failed to insert operation chunk: %v", err))

// Return partial result; Ignored stays 0 since remaining items were not processed (not duplicates)
return result, err
}

result.Inserted += chunkInserted
}

result.Ignored = result.Attempted - result.Inserted

logger.Log(ctx, libLog.LevelInfo, fmt.Sprintf("Bulk insert operations%s: attempted=%d, inserted=%d, ignored=%d",
logSuffix, result.Attempted, result.Inserted, result.Ignored))

return result, nil
}

// insertOperationChunk inserts a chunk of operations using multi-row INSERT.
// Uses repository.DBExecutor to work with both dbresolver.DB and dbresolver.Tx.
func (r *OperationPostgreSQLRepository) insertOperationChunk(ctx context.Context, db repository.DBExecutor, operations []*Operation) (int64, error) {
logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx)

ctx, span := tracer.Start(ctx, "postgres.insert_operation_chunk")
defer span.End()

logger.Log(ctx, libLog.LevelDebug, fmt.Sprintf("Inserting chunk of %d operations", len(operations)))

builder := squirrel.Insert(r.tableName).
Columns(operationColumnList...).
PlaceholderFormat(squirrel.Dollar)

for _, op := range operations {
record := &OperationPostgreSQLModel{}
record.FromEntity(op)

builder = builder.Values(
record.ID,
record.TransactionID,
record.Description,
record.Type,
record.AssetCode,
record.Amount,
record.AvailableBalance,
record.OnHoldBalance,
record.AvailableBalanceAfter,
record.OnHoldBalanceAfter,
record.Status,
record.StatusDescription,
record.AccountID,
record.AccountAlias,
record.BalanceID,
record.ChartOfAccounts,
record.OrganizationID,
record.LedgerID,
record.CreatedAt,
record.UpdatedAt,
record.DeletedAt,
record.Route,
record.BalanceAffected,
record.BalanceKey,
record.VersionBalance,
record.VersionBalanceAfter,
record.Direction,
record.RouteID,
record.RouteCode,
record.RouteDescription,
)
}

builder = builder.Suffix("ON CONFLICT (id) DO NOTHING")

query, args, err := builder.ToSql()
if err != nil {
libOpentelemetry.HandleSpanError(span, "Failed to build bulk insert query", err)

return 0, err
}

execResult, err := db.ExecContext(ctx, query, args...)
if err != nil {
libOpentelemetry.HandleSpanError(span, "Failed to execute bulk insert", err)

return 0, err
}

rowsAffected, err := execResult.RowsAffected()
if err != nil {
libOpentelemetry.HandleSpanError(span, "Failed to get rows affected", err)

return 0, err
}

return rowsAffected, nil
}

// FindAll retrieves Operations entities from the database.
func (r *OperationPostgreSQLRepository) FindAll(ctx context.Context, organizationID, ledgerID, transactionID uuid.UUID, filter http.Pagination) ([]*Operation, libHTTP.CursorPagination, error) {
logger, tracer, _, _ := libCommons.NewTrackingFromContext(ctx)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading