Skip to content

feat: Implement distributed Redis lock before provisioning mqinfra #433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/e2e/configs/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/google/uuid"
"github.com/hookdeck/outpost/internal/config"
"github.com/hookdeck/outpost/internal/infra"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/util/testinfra"
"github.com/hookdeck/outpost/internal/util/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -74,10 +75,15 @@ func Basic(t *testing.T, opts BasicOpts) config.Config {

// Setup cleanup
t.Cleanup(func() {
if err := infra.Teardown(context.Background(), infra.Config{
redisClient, err := redis.New(context.Background(), c.Redis.ToConfig())
if err != nil {
log.Println("Failed to create redis client:", err)
}
outpostInfra := infra.NewInfra(infra.Config{
DeliveryMQ: c.MQs.ToInfraConfig("deliverymq"),
LogMQ: c.MQs.ToInfraConfig("logmq"),
}); err != nil {
}, redisClient)
if err := outpostInfra.Teardown(context.Background()); err != nil {
log.Println("Teardown failed:", err)
}
})
Expand Down
13 changes: 10 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/migrator"
"github.com/hookdeck/outpost/internal/otel"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/services/api"
"github.com/hookdeck/outpost/internal/services/delivery"
"github.com/hookdeck/outpost/internal/services/log"
Expand Down Expand Up @@ -53,14 +54,20 @@ func run(mainContext context.Context, cfg *config.Config) error {
return err
}

if err := infra.Declare(mainContext, infra.Config{
redisClient, err := redis.New(mainContext, cfg.Redis.ToConfig())
if err != nil {
return err
}

outpostInfra := infra.NewInfra(infra.Config{
DeliveryMQ: cfg.MQs.ToInfraConfig("deliverymq"),
LogMQ: cfg.MQs.ToInfraConfig("logmq"),
}); err != nil {
}, redisClient)
if err := outpostInfra.Declare(mainContext); err != nil {
return err
}

installationID, err := getInstallation(mainContext, cfg.Redis.ToConfig(), cfg.Telemetry.ToTelemetryConfig())
installationID, err := getInstallation(mainContext, redisClient, cfg.Telemetry.ToTelemetryConfig())
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions internal/app/installation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,11 @@ const (
installationKey = "installation"
)

func getInstallation(ctx context.Context, redisConfig *redis.RedisConfig, telemetryConfig telemetry.TelemetryConfig) (string, error) {
func getInstallation(ctx context.Context, redisClient *redis.Client, telemetryConfig telemetry.TelemetryConfig) (string, error) {
if telemetryConfig.Disabled {
return "", nil
}

redisClient, err := redis.New(ctx, redisConfig)
if err != nil {
return "", err
}

// TODO: consider using WATCH to avoid race condition
// There's a potential race condition when multiple Outpost instances are started at the same time.
// However, given this is for telemetry purposes, and it will be a temporary issue, we can ignore it for now.
Expand Down
167 changes: 149 additions & 18 deletions internal/infra/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,32 @@ package infra

import (
"context"
"fmt"
"time"

"github.com/hookdeck/outpost/internal/mqinfra"
"github.com/hookdeck/outpost/internal/redis"
)

const (
lockKey = "outpost:lock"
lockAttempts = 5
lockDelay = 5 * time.Second
lockTTL = 10 * time.Second
)

type Infra struct {
lock Lock
provider InfraProvider
}

// InfraProvider handles the actual infrastructure operations
type InfraProvider interface {
Exist(ctx context.Context) (bool, error)
Declare(ctx context.Context) error
Teardown(ctx context.Context) error
}

type Config struct {
DeliveryMQ *mqinfra.MQInfraConfig
LogMQ *mqinfra.MQInfraConfig
Expand All @@ -16,36 +38,145 @@ func (cfg *Config) SetSensiblePolicyDefaults() {
cfg.LogMQ.Policy.RetryLimit = 5
}

func Declare(ctx context.Context, cfg Config) error {
cfg.SetSensiblePolicyDefaults()
type Lock interface {
AttemptLock(ctx context.Context) (bool, error)
Unlock(ctx context.Context) (bool, error)
}

if cfg.DeliveryMQ != nil {
if err := mqinfra.New(cfg.DeliveryMQ).Declare(ctx); err != nil {
return err
}
// infraProvider implements InfraProvider using real MQ infrastructure
type infraProvider struct {
deliveryMQ mqinfra.MQInfra
logMQ mqinfra.MQInfra
}

func (p *infraProvider) Exist(ctx context.Context) (bool, error) {
if exists, err := p.deliveryMQ.Exist(ctx); err != nil {
return false, err
} else if !exists {
return false, nil
}

if cfg.LogMQ != nil {
if err := mqinfra.New(cfg.LogMQ).Declare(ctx); err != nil {
return err
}
if exists, err := p.logMQ.Exist(ctx); err != nil {
return false, err
} else if !exists {
return false, nil
}

return true, nil
}

func (p *infraProvider) Declare(ctx context.Context) error {
if err := p.deliveryMQ.Declare(ctx); err != nil {
return err
}

if err := p.logMQ.Declare(ctx); err != nil {
return err
}

return nil
}

func Teardown(ctx context.Context, cfg Config) error {
if cfg.DeliveryMQ != nil {
if err := mqinfra.New(cfg.DeliveryMQ).TearDown(ctx); err != nil {
return err
}
func (p *infraProvider) Teardown(ctx context.Context) error {
if err := p.deliveryMQ.TearDown(ctx); err != nil {
return err
}

if err := p.logMQ.TearDown(ctx); err != nil {
return err
}

if cfg.LogMQ != nil {
if err := mqinfra.New(cfg.LogMQ).TearDown(ctx); err != nil {
return nil
}

func NewInfra(cfg Config, redisClient *redis.Client) Infra {
cfg.SetSensiblePolicyDefaults()

provider := &infraProvider{
deliveryMQ: mqinfra.New(cfg.DeliveryMQ),
logMQ: mqinfra.New(cfg.LogMQ),
}

return Infra{
lock: NewRedisLock(redisClient),
provider: provider,
}
}

// NewInfraWithProvider creates an Infra instance with custom lock and provider (for testing)
func NewInfraWithProvider(lock Lock, provider InfraProvider) *Infra {
return &Infra{
lock: lock,
provider: provider,
}
}

func (infra *Infra) Declare(ctx context.Context) error {
for attempt := 0; attempt < lockAttempts; attempt++ {
shouldDeclare, hasLocked, err := infra.shouldDeclareAndAcquireLock(ctx)
if err != nil {
return err
}
if !shouldDeclare {
return nil
}

if hasLocked {
// We got the lock, declare infrastructure
defer func() {
// TODO: improve error handling
unlocked, err := infra.lock.Unlock(ctx)
if err != nil {
panic(err)
}
if !unlocked {
panic("failed to unlock lock")
}
}()

if err := infra.provider.Declare(ctx); err != nil {
return err
}

return nil
}

// We didn't get the lock, wait before retry
if attempt < lockAttempts-1 {
time.Sleep(lockDelay)
}
}

return nil
return fmt.Errorf("failed to acquire lock after %d attempts", lockAttempts)
}

func (infra *Infra) Teardown(ctx context.Context) error {
return infra.provider.Teardown(ctx)
}

// shouldDeclareAndAcquireLock checks if
func (infra *Infra) shouldDeclareAndAcquireLock(ctx context.Context) (shouldDeclare bool, hasLocked bool, err error) {
shouldDeclare = false
hasLocked = false
err = nil

exists, err := infra.provider.Exist(ctx)
if err != nil {
err = fmt.Errorf("failed to check if infra exists: %w", err)
return
}
if exists {
// if infra exists, return early, no need to acquire lock
shouldDeclare = false
return
}
shouldDeclare = true

hasLocked, err = infra.lock.AttemptLock(ctx)
if err != nil {
err = fmt.Errorf("failed to acquire lock: %w", err)
return
}

return
}
Loading