Skip to content

Fail hard on disconnects from mqtt and postgres #125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 9, 2025
3 changes: 2 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
runs-on: ubuntu-latest
services:
dind:
image: docker:23.0-rc-dind-rootless
image: docker:rc-dind-rootless
ports:
- 2375:2375
steps:
Expand All @@ -40,6 +40,7 @@ jobs:
go-version: 1.24.3
- name: Run tests with Docker and calculate coverage
run: |
export GITHUB_ACTIONS=1
export POSTGRES_CONTAINER=1
export VERNEMQ_CONTAINER=1
go test -v \
Expand Down
21 changes: 21 additions & 0 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-gorp/gorp"
_ "github.com/lib/pq"
"github.com/sapcc/go-bits/easypg"
"github.com/sapcc/go-bits/jobloop"
)

// Wrapper around gorp.DbMap that adds some convenience functions.
Expand Down Expand Up @@ -72,6 +73,26 @@ func NewPostgresDB(c conf.DBConfig, registry *monitoring.Registry) DB {
return DB{DbMap: dbMap}
}

// Check periodically if the database is alive. If not, panic.
func (d *DB) CheckLivenessPeriodically() {
var failures int
for {
if err := d.Db.Ping(); err != nil {
if failures > 5 {
slog.Error("database is unreachable, giving up", "error", err)
panic(err)
}
slog.Error("failed to ping database", "error", err)
time.Sleep(jobloop.DefaultJitter(1 * time.Second))
failures++
continue
}
failures = 0
slog.Debug("check ok: database is reachable")
time.Sleep(jobloop.DefaultJitter(10 * time.Second))
}
}

// Adds missing functionality to gorp.DbMap which creates one table.
func (d *DB) CreateTable(table ...*gorp.TableMap) error {
tx, err := d.Begin()
Expand Down
1 change: 1 addition & 0 deletions internal/features/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func TestFeatureExtractorPipeline_ExtractOnTrigger(t *testing.T) {

mqttConf := conf.MQTTConfig{URL: "tcp://localhost:" + container.GetPort()}
mqttClient := mqtt.NewClientWithConfig(mqttConf)
defer mqttClient.Disconnect()

// Mock feature extractors
var wg sync.WaitGroup
Expand Down
28 changes: 15 additions & 13 deletions internal/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func NewClientWithConfig(conf conf.MQTTConfig) Client {
return &client{conf: conf, lock: &sync.Mutex{}}
}

// Called when the connection to the mqtt broker is lost.
func (t *client) onUnexpectedConnectionLoss(client mqtt.Client, err error) {
panic(err)
}

// Connect to the mqtt broker.
func (t *client) Connect() error {
if t.client != nil {
Expand All @@ -48,21 +53,13 @@ func (t *client) Connect() error {
opts := mqtt.NewClientOptions()
opts.AddBroker(t.conf.URL)
opts.SetConnectTimeout(10 * time.Second)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetConnectRetry(false)
opts.SetKeepAlive(60 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetOnConnectHandler(func(client mqtt.Client) {
slog.Info("connected to mqtt broker")
})
opts.SetAutoReconnect(true)
opts.SetResumeSubs(true)
opts.SetCleanSession(false)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
slog.Error("mqtt connection lost", "err", err)
})
opts.SetCleanSession(true)
opts.SetConnectionLostHandler(t.onUnexpectedConnectionLoss)
//nolint:gosec // We don't care if the client id is cryptographically secure.
opts.SetClientID(fmt.Sprintf("cortex-scheduler-%d", rand.Intn(1_000_000)))
opts.SetClientID(fmt.Sprintf("cortex-%d", rand.Intn(1_000_000)))
opts.SetOrderMatters(false)
opts.SetProtocolVersion(4)
opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
Expand Down Expand Up @@ -139,7 +136,12 @@ func (t *client) Disconnect() {
return
}
client := *t.client
client.Disconnect(1000)
t.client = nil
// Note: the disconnect will run in a goroutine.
client.Disconnect(1000)
// Wait for the disconnect to finish.
for client.IsConnected() {
time.Sleep(100 * time.Millisecond)
}
slog.Info("disconnected from mqtt broker")
}
10 changes: 9 additions & 1 deletion internal/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,29 @@ func TestConnect(t *testing.T) {
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
c.Disconnect()
}

func TestPublish(t *testing.T) {
if os.Getenv("VERNEMQ_CONTAINER") != "1" {
t.Skip("skipping test; set VERNEMQ_CONTAINER=1 to run")
}
// FIXME: It seems like GitHub Actions kills the container on the publish call.
if os.Getenv("GITHUB_ACTIONS") == "1" {
t.Skip("skipping test; GITHUB_ACTIONS=1")
}

container := containers.VernemqContainer{}
container.Init(t)
defer container.Close()
conf := conf.MQTTConfig{URL: "tcp://localhost:" + container.GetPort()}
c := client{conf: conf, lock: &sync.Mutex{}}

err := c.publish("test/topic", map[string]string{"key": "value"})
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
t.Log("published message to test/topic")
c.Disconnect()
}

func TestSubscribe(t *testing.T) {
Expand All @@ -62,6 +68,7 @@ func TestSubscribe(t *testing.T) {
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
c.Disconnect()
}

func TestDisconnect(t *testing.T) {
Expand All @@ -79,4 +86,5 @@ func TestDisconnect(t *testing.T) {
t.Fatalf("expected no error, got %v", err)
}
c.Disconnect()
c.Disconnect() // Should do nothing (already disconnected)
}
15 changes: 8 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ func main() {
registry := monitoring.NewRegistry(monitoringConfig)
go runMonitoringServer(ctx, registry, monitoringConfig)

dbInstance := db.NewPostgresDB(config.GetDBConfig(), registry)
defer dbInstance.Close()
database := db.NewPostgresDB(config.GetDBConfig(), registry)
defer database.Close()
go database.CheckLivenessPeriodically()

migrater := db.NewMigrater(dbInstance)
migrater := db.NewMigrater(database)
migrater.Migrate(true)

// Run an api server that serves some basic endpoints and can be extended.
Expand All @@ -153,13 +154,13 @@ func main() {

switch taskName {
case "syncer":
runSyncer(ctx, registry, config.GetSyncConfig(), dbInstance)
runSyncer(ctx, registry, config.GetSyncConfig(), database)
case "extractor":
runExtractor(registry, config.GetFeaturesConfig(), dbInstance)
runExtractor(registry, config.GetFeaturesConfig(), database)
case "scheduler":
runScheduler(mux, registry, config.GetSchedulerConfig(), dbInstance)
runScheduler(mux, registry, config.GetSchedulerConfig(), database)
case "kpis":
runKPIService(registry, config.GetKPIsConfig(), dbInstance)
runKPIService(registry, config.GetKPIsConfig(), database)
default:
panic("unknown task")
}
Expand Down
2 changes: 2 additions & 0 deletions testlib/mqtt/containers/vernemq.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (c VernemqContainer) GetPort() string {
}

func (c *VernemqContainer) Init(t *testing.T) {
log.Println("starting vernemq container")
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("could not construct pool: %s", err)
Expand Down Expand Up @@ -66,6 +67,7 @@ func (c *VernemqContainer) Init(t *testing.T) {
if conn := client.Connect(); conn.Wait() && conn.Error() != nil {
panic(conn.Error())
}
log.Println("vernemq container is ready")
}

func (c *VernemqContainer) Close() {
Expand Down
Loading