Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/ledger/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM --platform=$BUILDPLATFORM golang:1.25.8-alpine AS builder
# ci: force rebuild to sync ledger image with beta.10
# ci: force rebuild

WORKDIR /ledger-app

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type BalanceSyncWorker struct {
multiTenantEnabled bool
tenantClient *tmclient.Client
pgManager *tmpostgres.Manager
serviceName string
}

func NewBalanceSyncWorker(conn *libRedis.Client, logger libLog.Logger, useCase *command.UseCase, maxWorkers int) *BalanceSyncWorker {
Expand All @@ -61,6 +62,8 @@ func NewBalanceSyncWorker(conn *libRedis.Client, logger libLog.Logger, useCase *
// When multiTenantEnabled is true, both tenantClient and pgManager must be non-nil for the worker
// to be considered ready (isMultiTenantReady). The worker uses tenantClient to discover active
// tenants and pgManager to resolve per-tenant PostgreSQL connections.
// serviceName is the service identifier used to query active tenants from the Tenant Manager
// (e.g. the value of APPLICATION_NAME). It must not be empty when multi-tenant is enabled.
func NewBalanceSyncWorkerMultiTenant(
conn *libRedis.Client,
logger libLog.Logger,
Expand All @@ -69,11 +72,13 @@ func NewBalanceSyncWorkerMultiTenant(
multiTenantEnabled bool,
tenantClient *tmclient.Client,
pgManager *tmpostgres.Manager,
serviceName string,
) *BalanceSyncWorker {
w := NewBalanceSyncWorker(conn, logger, useCase, maxWorkers)
w.multiTenantEnabled = multiTenantEnabled
w.tenantClient = tenantClient
w.pgManager = pgManager
w.serviceName = serviceName

return w
}
Expand Down Expand Up @@ -189,7 +194,7 @@ func (w *BalanceSyncWorker) runMultiTenant() error {
// Returns (tenants, true) on success, (nil, false) if an error or empty result requires
// backing off and retrying, or (nil, true) if shutdown was requested during backoff.
func (w *BalanceSyncWorker) discoverActiveTenants(ctx context.Context, rds redis.UniversalClient) ([]*tmclient.TenantSummary, bool) {
tenants, err := w.tenantClient.GetActiveTenantsByService(ctx, "transaction")
tenants, err := w.tenantClient.GetActiveTenantsByService(ctx, w.serviceName)
if err != nil {
w.logger.Log(ctx, libLog.LevelError, fmt.Sprintf("BalanceSyncWorker: failed to get active tenants: %v", err))

Expand Down
20 changes: 16 additions & 4 deletions components/transaction/internal/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@
}

// initBalanceSyncWorker creates the balance sync worker (multi-tenant or single-tenant).
func initBalanceSyncWorker(opts *Options, cfg *Config, logger libLog.Logger, commandUC *command.UseCase, redisConn *libRedis.Client, pgManager *tmpostgres.Manager) *BalanceSyncWorker {
// tenantServiceName is the pre-validated service identifier for the Tenant Manager;
// it is only used when multi-tenant mode is active.
func initBalanceSyncWorker(opts *Options, cfg *Config, logger libLog.Logger, commandUC *command.UseCase, redisConn *libRedis.Client, pgManager *tmpostgres.Manager, tenantServiceName string) *BalanceSyncWorker {
const defaultBalanceSyncMaxWorkers = 5

balanceSyncMaxWorkers := cfg.BalanceSyncMaxWorkers
Expand All @@ -368,7 +370,7 @@
var balanceSyncWorker *BalanceSyncWorker

if opts != nil && opts.MultiTenantEnabled {
balanceSyncWorker = NewBalanceSyncWorkerMultiTenant(redisConn, logger, commandUC, balanceSyncMaxWorkers, true, opts.TenantClient, pgManager)
balanceSyncWorker = NewBalanceSyncWorkerMultiTenant(redisConn, logger, commandUC, balanceSyncMaxWorkers, true, opts.TenantClient, pgManager, tenantServiceName)
} else {
balanceSyncWorker = NewBalanceSyncWorker(redisConn, logger, commandUC, balanceSyncMaxWorkers)
}
Expand All @@ -379,7 +381,7 @@
}

// InitServersWithOptions initiates http and grpc servers with optional dependency injection.
func InitServersWithOptions(opts *Options) (*Service, error) {

Check failure on line 384 in components/transaction/internal/bootstrap/config.go

View workflow job for this annotation

GitHub Actions / Go Analysis / Lint (midaz-transaction)

cyclomatic complexity 19 of func `InitServersWithOptions` is high (> 18) (gocyclo)
cfg := &Config{}

if err := libCommons.SetConfigFromEnvVars(cfg); err != nil {
Expand All @@ -394,6 +396,16 @@
// BALANCE_SYNC_WORKER_ENABLED is deprecated - balance sync is always enabled
logger.Log(context.Background(), libLog.LevelInfo, "BalanceSyncWorker: always enabled (BALANCE_SYNC_WORKER_ENABLED env var is deprecated)")

// Validate TenantServiceName early so that workers fail fast on misconfiguration
// instead of silently backing off when the Tenant Manager returns no tenants.
var tenantServiceName string
if opts != nil && opts.MultiTenantEnabled {
tenantServiceName = strings.TrimSpace(opts.TenantServiceName)
if tenantServiceName == "" {
return nil, fmt.Errorf("TenantServiceName must not be empty when multi-tenant is enabled")
}
}

telemetry, err := libOpentelemetry.NewTelemetry(libOpentelemetry.TelemetryConfig{
LibraryName: cfg.OtelLibraryName,
ServiceName: cfg.OtelServiceName,
Expand Down Expand Up @@ -495,13 +507,13 @@
// RedisQueueConsumer: multi-tenant or single-tenant
var redisConsumer *RedisQueueConsumer
if opts != nil && opts.MultiTenantEnabled {
redisConsumer = NewRedisQueueConsumerMultiTenant(logger, *h.transaction, true, opts.TenantClient, pg.pgManager)
redisConsumer = NewRedisQueueConsumerMultiTenant(logger, *h.transaction, true, opts.TenantClient, pg.pgManager, tenantServiceName)
} else {
redisConsumer = NewRedisQueueConsumer(logger, *h.transaction)
}

// BalanceSyncWorker: multi-tenant or single-tenant
balanceSyncWorker := initBalanceSyncWorker(opts, cfg, logger, commandUseCase, redisConnection, pg.pgManager)
balanceSyncWorker := initBalanceSyncWorker(opts, cfg, logger, commandUseCase, redisConnection, pg.pgManager, tenantServiceName)

return &Service{
Server: server,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
multiTenantEnabled bool
tenantClient *tmclient.Client
pgManager *tmpostgres.Manager
serviceName string
}

func NewRedisQueueConsumer(logger libLog.Logger, handler in.TransactionHandler) *RedisQueueConsumer {
Expand All @@ -57,17 +58,21 @@
// When multiTenantEnabled is true, both tenantClient and pgManager must be non-nil for the consumer
// to be considered ready (isMultiTenantReady). The consumer uses tenantClient to discover active
// tenants and pgManager to resolve per-tenant PostgreSQL connections.
// serviceName is the service identifier used to query active tenants from the Tenant Manager
// (e.g. the value of APPLICATION_NAME). It must not be empty when multi-tenant is enabled.
func NewRedisQueueConsumerMultiTenant(
logger libLog.Logger,
handler in.TransactionHandler,
multiTenantEnabled bool,
tenantClient *tmclient.Client,
pgManager *tmpostgres.Manager,
serviceName string,
) *RedisQueueConsumer {
c := NewRedisQueueConsumer(logger, handler)
c.multiTenantEnabled = multiTenantEnabled
c.tenantClient = tenantClient
c.pgManager = pgManager
c.serviceName = serviceName

return c
}
Expand Down Expand Up @@ -131,7 +136,7 @@
return nil

case <-ticker.C:
tenants, err := r.tenantClient.GetActiveTenantsByService(ctx, "transaction")
tenants, err := r.tenantClient.GetActiveTenantsByService(ctx, r.serviceName)
if err != nil {
r.Logger.Log(ctx, libLog.LevelError, fmt.Sprintf("RedisQueueConsumer: failed to get active tenants: %v", err))

Expand Down Expand Up @@ -238,7 +243,7 @@

// processMessage handles a single Redis backup queue message: acquires a distributed lock,
// rebuilds balances and operations, and writes the transaction via the async path.
func (r *RedisQueueConsumer) processMessage(ctx context.Context, key string, m mmodel.TransactionRedisQueue) {

Check failure on line 246 in components/transaction/internal/bootstrap/redis.consumer.go

View workflow job for this annotation

GitHub Actions / Go Analysis / Lint (midaz-transaction)

cyclomatic complexity 20 of func `(*RedisQueueConsumer).processMessage` is high (> 18) (gocyclo)
_, tracer, _, _ := libCommons.NewTrackingFromContext(ctx) //nolint:dogsled

msgCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,31 @@ import (
"github.com/LerianStudio/midaz/v3/components/transaction/internal/services/command"
)

// FuzzNewBalanceSyncWorkerMultiTenant_MaxWorkers fuzzes the maxWorkers parameter
// and multiTenantEnabled flag of the NewBalanceSyncWorkerMultiTenant constructor.
// FuzzNewBalanceSyncWorkerMultiTenant_MaxWorkers fuzzes the maxWorkers parameter,
// multiTenantEnabled flag, and serviceName of the NewBalanceSyncWorkerMultiTenant constructor.
//
// Properties verified (not specific values):
// 1. Constructor never panics for any int/bool combination.
// 1. Constructor never panics for any int/bool/string combination.
// 2. Returned worker is never nil.
// 3. maxWorkers is always > 0 after construction (default applied for <= 0).
// 4. multiTenantEnabled matches the input value.
// 5. isMultiTenantReady() is consistent with field state.
// 6. serviceName is stored exactly as provided (no implicit trimming).
func FuzzNewBalanceSyncWorkerMultiTenant_MaxWorkers(f *testing.F) {
// Seed 1: typical valid input
f.Add(5, true)
f.Add(5, true, "transaction")
// Seed 2: zero maxWorkers (boundary -- triggers default)
f.Add(0, false)
f.Add(0, false, "ledger")
// Seed 3: negative maxWorkers (boundary -- triggers default)
f.Add(-1, true)
f.Add(-1, true, "")
// Seed 4: large maxWorkers (stress)
f.Add(math.MaxInt32, false)
f.Add(math.MaxInt32, false, " ")
// Seed 5: minimum int (extreme negative boundary)
f.Add(math.MinInt32, true)
f.Add(math.MinInt32, true, " ledger ")
// Seed 6: one worker (minimum valid)
f.Add(1, false)
f.Add(1, false, "transaction")
// Seed 7: large negative (another extreme)
f.Add(-1000000, false)
f.Add(-1000000, false, "my-service")

logger := newTestLogger()
conn := &libRedis.Client{}
Expand All @@ -70,9 +71,9 @@ func FuzzNewBalanceSyncWorkerMultiTenant_MaxWorkers(f *testing.F) {
}
pgMgr := tmpostgres.NewManager(tenantClient, "transaction", tmpostgres.WithLogger(logger))

f.Fuzz(func(t *testing.T, maxWorkers int, multiTenantEnabled bool) {
f.Fuzz(func(t *testing.T, maxWorkers int, multiTenantEnabled bool, serviceName string) {
// Property: constructor must never panic (enforced by test execution).
worker := NewBalanceSyncWorkerMultiTenant(conn, logger, useCase, maxWorkers, multiTenantEnabled, tenantClient, pgMgr)
worker := NewBalanceSyncWorkerMultiTenant(conn, logger, useCase, maxWorkers, multiTenantEnabled, tenantClient, pgMgr, serviceName)

// Property: returned worker is never nil.
if worker == nil {
Expand All @@ -89,6 +90,11 @@ func FuzzNewBalanceSyncWorkerMultiTenant_MaxWorkers(f *testing.F) {
t.Fatalf("multiTenantEnabled mismatch: got %v, want %v", worker.multiTenantEnabled, multiTenantEnabled)
}

// Property: serviceName is stored exactly as provided.
if worker.serviceName != serviceName {
t.Fatalf("serviceName mismatch: got %q, want %q", worker.serviceName, serviceName)
}

// Property: isMultiTenantReady() is true only when enabled AND pgManager AND tenantClient non-nil.
ready := worker.isMultiTenantReady()
if multiTenantEnabled && worker.pgManager != nil && worker.tenantClient != nil && !ready {
Expand All @@ -107,24 +113,25 @@ func FuzzNewBalanceSyncWorkerMultiTenant_MaxWorkers(f *testing.F) {
}

// FuzzNewRedisQueueConsumerMultiTenant_MultiTenantEnabled fuzzes the
// multiTenantEnabled flag of the NewRedisQueueConsumerMultiTenant constructor.
// multiTenantEnabled flag and serviceName of the NewRedisQueueConsumerMultiTenant constructor.
//
// Properties verified:
// 1. Constructor never panics.
// 2. Returned consumer is never nil.
// 3. multiTenantEnabled matches input.
// 4. isMultiTenantReady() is consistent with field state.
// 5. serviceName is stored exactly as provided.
func FuzzNewRedisQueueConsumerMultiTenant_MultiTenantEnabled(f *testing.F) {
// Seed 1: enabled with pgManager (typical multi-tenant)
f.Add(true, true)
f.Add(true, true, "transaction")
// Seed 2: disabled with no pgManager (typical single-tenant)
f.Add(false, false)
f.Add(false, false, "ledger")
// Seed 3: enabled without pgManager (fallback case)
f.Add(true, false)
f.Add(true, false, "")
// Seed 4: disabled with pgManager (misconfiguration edge case)
f.Add(false, true)
// Seed 5: enabled with pgManager (duplicate to ensure stability)
f.Add(true, true)
f.Add(false, true, " ")
// Seed 5: enabled with pgManager, different service name
f.Add(true, true, " ledger ")

logger := newTestLogger()
handler := in.TransactionHandler{}
Expand All @@ -134,14 +141,14 @@ func FuzzNewRedisQueueConsumerMultiTenant_MultiTenantEnabled(f *testing.F) {
}
pgMgr := tmpostgres.NewManager(tenantClient, "transaction", tmpostgres.WithLogger(logger))

f.Fuzz(func(t *testing.T, multiTenantEnabled bool, hasPGManager bool) {
f.Fuzz(func(t *testing.T, multiTenantEnabled bool, hasPGManager bool, serviceName string) {
var mgr *tmpostgres.Manager
if hasPGManager {
mgr = pgMgr
}

// Property: constructor must never panic.
consumer := NewRedisQueueConsumerMultiTenant(logger, handler, multiTenantEnabled, tenantClient, mgr)
consumer := NewRedisQueueConsumerMultiTenant(logger, handler, multiTenantEnabled, tenantClient, mgr, serviceName)

// Property: returned consumer is never nil.
if consumer == nil {
Expand All @@ -153,6 +160,11 @@ func FuzzNewRedisQueueConsumerMultiTenant_MultiTenantEnabled(f *testing.F) {
t.Fatalf("multiTenantEnabled mismatch: got %v, want %v", consumer.multiTenantEnabled, multiTenantEnabled)
}

// Property: serviceName is stored exactly as provided.
if consumer.serviceName != serviceName {
t.Fatalf("serviceName mismatch: got %q, want %q", consumer.serviceName, serviceName)
}

// Property: isMultiTenantReady() follows the predicate logic.
ready := consumer.isMultiTenantReady()
expectReady := multiTenantEnabled && mgr != nil && consumer.tenantClient != nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestProperty_NewBalanceSyncWorkerMultiTenant_PreservesBaseFields(t *testing
maxWorkers = -maxBound
}

worker := NewBalanceSyncWorkerMultiTenant(conn, logger, useCase, maxWorkers, enabled, tenantClient, pgMgr)
worker := NewBalanceSyncWorkerMultiTenant(conn, logger, useCase, maxWorkers, enabled, tenantClient, pgMgr, "transaction")

// Property: constructor never returns nil.
if worker == nil {
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestProperty_NewRedisQueueConsumerMultiTenant_PreservesBaseFields(t *testin
mgr = pgMgr
}

consumer := NewRedisQueueConsumerMultiTenant(logger, handler, enabled, tenantClient, mgr)
consumer := NewRedisQueueConsumerMultiTenant(logger, handler, enabled, tenantClient, mgr, "transaction")

// Property: constructor never returns nil.
if consumer == nil {
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestProperty_MultiTenantConstructors_NeverPanic(t *testing.T) {
// Note: logger is always non-nil because the base constructor calls
// logger methods; passing nil logger would panic in production too,
// but that is a caller contract, not a multi-tenant invariant.
worker := NewBalanceSyncWorkerMultiTenant(wConn, logger, wUseCase, 0, workerEnabled, wTenantClient, wPGManager)
worker := NewBalanceSyncWorkerMultiTenant(wConn, logger, wUseCase, 0, workerEnabled, wTenantClient, wPGManager, "transaction")
if worker == nil {
return false
}
Expand All @@ -375,7 +375,7 @@ func TestProperty_MultiTenantConstructors_NeverPanic(t *testing.T) {
}

// Property: NewRedisQueueConsumerMultiTenant must never panic.
consumer := NewRedisQueueConsumerMultiTenant(logger, handler, consumerEnabled, cTenantClient, cPGManager)
consumer := NewRedisQueueConsumerMultiTenant(logger, handler, consumerEnabled, cTenantClient, cPGManager, "transaction")
if consumer == nil {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestNewBalanceSyncWorkerMultiTenant(t *testing.T) {
require.NoError(t, err)
pgMgr := tmpostgres.NewManager(tenantClient, "transaction", tmpostgres.WithLogger(logger))

worker := NewBalanceSyncWorkerMultiTenant(conn, logger, useCase, 5, true, tenantClient, pgMgr)
worker := NewBalanceSyncWorkerMultiTenant(conn, logger, useCase, 5, true, tenantClient, pgMgr, "transaction")

require.NotNil(t, worker, "worker should not be nil")
assert.True(t, worker.multiTenantEnabled,
Expand All @@ -231,6 +231,8 @@ func TestNewBalanceSyncWorkerMultiTenant(t *testing.T) {
"pgManager should be the same instance")
assert.Equal(t, 5, worker.maxWorkers,
"maxWorkers should be set correctly")
assert.Equal(t, "transaction", worker.serviceName,
"serviceName should be set correctly")
}

// TestNewRedisQueueConsumerMultiTenant verifies the multi-tenant-aware
Expand All @@ -244,7 +246,7 @@ func TestNewRedisQueueConsumerMultiTenant(t *testing.T) {
require.NoError(t, err)
pgMgr := tmpostgres.NewManager(tenantClient, "transaction", tmpostgres.WithLogger(logger))

consumer := NewRedisQueueConsumerMultiTenant(logger, handler, true, tenantClient, pgMgr)
consumer := NewRedisQueueConsumerMultiTenant(logger, handler, true, tenantClient, pgMgr, "transaction")

require.NotNil(t, consumer, "consumer should not be nil")
assert.True(t, consumer.multiTenantEnabled,
Expand All @@ -253,6 +255,8 @@ func TestNewRedisQueueConsumerMultiTenant(t *testing.T) {
"tenantClient should be the same instance")
assert.Same(t, pgMgr, consumer.pgManager,
"pgManager should be the same instance")
assert.Equal(t, "transaction", consumer.serviceName,
"serviceName should be set correctly")
}

// TestRabbitMQConsumerHandlerReceivesPGManager verifies that the
Expand Down Expand Up @@ -498,7 +502,7 @@ func TestNewBalanceSyncWorkerMultiTenant_EdgeCases(t *testing.T) {

worker := NewBalanceSyncWorkerMultiTenant(
conn, logger, useCase, 5,
tt.multiTenantEnabled, tt.tenantClient, tt.pgManager,
tt.multiTenantEnabled, tt.tenantClient, tt.pgManager, "transaction",
)

require.NotNil(t, worker, "constructor must return non-nil")
Expand Down Expand Up @@ -566,7 +570,7 @@ func TestNewRedisQueueConsumerMultiTenant_EdgeCases(t *testing.T) {

consumer := NewRedisQueueConsumerMultiTenant(
logger, handler,
tt.multiTenantEnabled, tt.tenantClient, tt.pgManager,
tt.multiTenantEnabled, tt.tenantClient, tt.pgManager, "transaction",
)

require.NotNil(t, consumer, "constructor must return non-nil")
Expand Down Expand Up @@ -907,7 +911,7 @@ func TestBalanceSyncWorker_MultiTenantConstructorPreservesRunBehavior(t *testing

worker := NewBalanceSyncWorkerMultiTenant(
conn, logger, useCase, 5,
tt.multiTenantEnabled, tenantClient, tt.pgManager,
tt.multiTenantEnabled, tenantClient, tt.pgManager, "transaction",
)

require.NotNil(t, worker, "constructor must return non-nil worker")
Expand Down
Loading