diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 6d345a5..a8ac168 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -28,7 +28,7 @@ jobs: runs-on: ubuntu-latest services: dind: - image: docker:23.0-rc-dind-rootless + image: docker:rc-dind-rootless ports: - 2375:2375 steps: @@ -40,6 +40,7 @@ jobs: go-version: 1.24.3 - name: Run tests with Docker and calculate coverage run: | + export GITHUB_ACTIONS=1 export POSTGRES_CONTAINER=1 export VERNEMQ_CONTAINER=1 go test -v \ diff --git a/internal/db/db.go b/internal/db/db.go index ec257d0..0c737ea 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -17,6 +17,7 @@ import ( "github.com/go-gorp/gorp" _ "github.com/lib/pq" "github.com/sapcc/go-bits/easypg" + "github.com/sapcc/go-bits/jobloop" ) // Wrapper around gorp.DbMap that adds some convenience functions. @@ -72,6 +73,26 @@ func NewPostgresDB(c conf.DBConfig, registry *monitoring.Registry) DB { return DB{DbMap: dbMap} } +// Check periodically if the database is alive. If not, panic. +func (d *DB) CheckLivenessPeriodically() { + var failures int + for { + if err := d.Db.Ping(); err != nil { + if failures > 5 { + slog.Error("database is unreachable, giving up", "error", err) + panic(err) + } + slog.Error("failed to ping database", "error", err) + time.Sleep(jobloop.DefaultJitter(1 * time.Second)) + failures++ + continue + } + failures = 0 + slog.Debug("check ok: database is reachable") + time.Sleep(jobloop.DefaultJitter(10 * time.Second)) + } +} + // Adds missing functionality to gorp.DbMap which creates one table. func (d *DB) CreateTable(table ...*gorp.TableMap) error { tx, err := d.Begin() diff --git a/internal/features/pipeline_test.go b/internal/features/pipeline_test.go index 5d99f07..10e0886 100644 --- a/internal/features/pipeline_test.go +++ b/internal/features/pipeline_test.go @@ -197,6 +197,7 @@ func TestFeatureExtractorPipeline_ExtractOnTrigger(t *testing.T) { mqttConf := conf.MQTTConfig{URL: "tcp://localhost:" + container.GetPort()} mqttClient := mqtt.NewClientWithConfig(mqttConf) + defer mqttClient.Disconnect() // Mock feature extractors var wg sync.WaitGroup diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go index db64699..f5fa552 100644 --- a/internal/mqtt/mqtt.go +++ b/internal/mqtt/mqtt.go @@ -38,6 +38,11 @@ func NewClientWithConfig(conf conf.MQTTConfig) Client { return &client{conf: conf, lock: &sync.Mutex{}} } +// Called when the connection to the mqtt broker is lost. +func (t *client) onUnexpectedConnectionLoss(client mqtt.Client, err error) { + panic(err) +} + // Connect to the mqtt broker. func (t *client) Connect() error { if t.client != nil { @@ -48,21 +53,13 @@ func (t *client) Connect() error { opts := mqtt.NewClientOptions() opts.AddBroker(t.conf.URL) opts.SetConnectTimeout(10 * time.Second) - opts.SetConnectRetry(true) - opts.SetConnectRetryInterval(5 * time.Second) + opts.SetConnectRetry(false) opts.SetKeepAlive(60 * time.Second) opts.SetPingTimeout(10 * time.Second) - opts.SetOnConnectHandler(func(client mqtt.Client) { - slog.Info("connected to mqtt broker") - }) - opts.SetAutoReconnect(true) - opts.SetResumeSubs(true) - opts.SetCleanSession(false) - opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { - slog.Error("mqtt connection lost", "err", err) - }) + opts.SetCleanSession(true) + opts.SetConnectionLostHandler(t.onUnexpectedConnectionLoss) //nolint:gosec // We don't care if the client id is cryptographically secure. - opts.SetClientID(fmt.Sprintf("cortex-scheduler-%d", rand.Intn(1_000_000))) + opts.SetClientID(fmt.Sprintf("cortex-%d", rand.Intn(1_000_000))) opts.SetOrderMatters(false) opts.SetProtocolVersion(4) opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) { @@ -139,7 +136,12 @@ func (t *client) Disconnect() { return } client := *t.client - client.Disconnect(1000) t.client = nil + // Note: the disconnect will run in a goroutine. + client.Disconnect(1000) + // Wait for the disconnect to finish. + for client.IsConnected() { + time.Sleep(100 * time.Millisecond) + } slog.Info("disconnected from mqtt broker") } diff --git a/internal/mqtt/mqtt_test.go b/internal/mqtt/mqtt_test.go index a3a0d91..1c6b288 100644 --- a/internal/mqtt/mqtt_test.go +++ b/internal/mqtt/mqtt_test.go @@ -28,23 +28,29 @@ func TestConnect(t *testing.T) { if err != nil { t.Fatalf("expected no error, got %v", err) } + c.Disconnect() } func TestPublish(t *testing.T) { if os.Getenv("VERNEMQ_CONTAINER") != "1" { t.Skip("skipping test; set VERNEMQ_CONTAINER=1 to run") } + // FIXME: It seems like GitHub Actions kills the container on the publish call. + if os.Getenv("GITHUB_ACTIONS") == "1" { + t.Skip("skipping test; GITHUB_ACTIONS=1") + } container := containers.VernemqContainer{} container.Init(t) defer container.Close() conf := conf.MQTTConfig{URL: "tcp://localhost:" + container.GetPort()} c := client{conf: conf, lock: &sync.Mutex{}} - err := c.publish("test/topic", map[string]string{"key": "value"}) if err != nil { t.Fatalf("expected no error, got %v", err) } + t.Log("published message to test/topic") + c.Disconnect() } func TestSubscribe(t *testing.T) { @@ -62,6 +68,7 @@ func TestSubscribe(t *testing.T) { if err != nil { t.Fatalf("expected no error, got %v", err) } + c.Disconnect() } func TestDisconnect(t *testing.T) { @@ -79,4 +86,5 @@ func TestDisconnect(t *testing.T) { t.Fatalf("expected no error, got %v", err) } c.Disconnect() + c.Disconnect() // Should do nothing (already disconnected) } diff --git a/main.go b/main.go index 15eaabd..c9b3a29 100644 --- a/main.go +++ b/main.go @@ -139,10 +139,11 @@ func main() { registry := monitoring.NewRegistry(monitoringConfig) go runMonitoringServer(ctx, registry, monitoringConfig) - dbInstance := db.NewPostgresDB(config.GetDBConfig(), registry) - defer dbInstance.Close() + database := db.NewPostgresDB(config.GetDBConfig(), registry) + defer database.Close() + go database.CheckLivenessPeriodically() - migrater := db.NewMigrater(dbInstance) + migrater := db.NewMigrater(database) migrater.Migrate(true) // Run an api server that serves some basic endpoints and can be extended. @@ -153,13 +154,13 @@ func main() { switch taskName { case "syncer": - runSyncer(ctx, registry, config.GetSyncConfig(), dbInstance) + runSyncer(ctx, registry, config.GetSyncConfig(), database) case "extractor": - runExtractor(registry, config.GetFeaturesConfig(), dbInstance) + runExtractor(registry, config.GetFeaturesConfig(), database) case "scheduler": - runScheduler(mux, registry, config.GetSchedulerConfig(), dbInstance) + runScheduler(mux, registry, config.GetSchedulerConfig(), database) case "kpis": - runKPIService(registry, config.GetKPIsConfig(), dbInstance) + runKPIService(registry, config.GetKPIsConfig(), database) default: panic("unknown task") } diff --git a/testlib/mqtt/containers/vernemq.go b/testlib/mqtt/containers/vernemq.go index bbccc83..68b159b 100644 --- a/testlib/mqtt/containers/vernemq.go +++ b/testlib/mqtt/containers/vernemq.go @@ -25,6 +25,7 @@ func (c VernemqContainer) GetPort() string { } func (c *VernemqContainer) Init(t *testing.T) { + log.Println("starting vernemq container") pool, err := dockertest.NewPool("") if err != nil { log.Fatalf("could not construct pool: %s", err) @@ -66,6 +67,7 @@ func (c *VernemqContainer) Init(t *testing.T) { if conn := client.Connect(); conn.Wait() && conn.Error() != nil { panic(conn.Error()) } + log.Println("vernemq container is ready") } func (c *VernemqContainer) Close() {