diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index fc3be6ad..c7f10269 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -276,6 +276,18 @@ func main() { // Registration BEFORE Connect so the attempt counter is available // to OnConnectAttempt on the very first dial. liveness.IsConnectedFn = client.IsConnected + // #1335: wire force-reconnect so the watchdog can drop a + // half-open TCP socket and re-dial when paho.IsConnected==true + // but no messages have flowed past the stall threshold. Throttled + // per source by the watchdog itself (forceReconnectThrottle). + // Disconnect(250) gives in-flight publishes 250ms to drain; + // Connect() returns immediately and paho's reconnect machinery + // takes over from there. Captured-by-value `client` is the same + // pointer used everywhere else for this source. + liveness.ForceReconnectFn = func() { + client.Disconnect(250) + client.Connect() + } // PR #1216 r2 item 3: tag collisions used to log.Fatalf, which // killed the entire ingestor over one config typo and recreated // the #1212 total-ingest-stop class this PR exists to prevent. @@ -371,7 +383,16 @@ func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions { SetOrderMatters(true). SetMaxReconnectInterval(30 * time.Second). SetConnectTimeout(10 * time.Second). - SetWriteTimeout(10 * time.Second) + SetWriteTimeout(10 * time.Second). + // #1335: TCP-level keepalive surfaces a half-open socket within + // ~30-60s instead of waiting for the application-level watchdog + // (5m) to notice no messages. paho's MQTT PINGREQ uses this + // interval too — if the broker's PINGRESP doesn't arrive, + // ConnectionLost fires and auto-reconnect kicks in. Was unset + // (paho default 30s actually — making this explicit so it can't + // drift, and so operators reading the code know it's intentional + // per the #1335 RCA). + SetKeepAlive(30 * time.Second) opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config { // Look up the per-source liveness state (registered in main) so we diff --git a/cmd/ingestor/mqtt_watchdog.go b/cmd/ingestor/mqtt_watchdog.go index 20f989fc..899ff4c5 100644 --- a/cmd/ingestor/mqtt_watchdog.go +++ b/cmd/ingestor/mqtt_watchdog.go @@ -14,6 +14,10 @@ import ( // shift, infrequent enough not to spam ops chat. const livenessHeartbeatInterval = time.Hour +// forceReconnectThrottle is the minimum interval between forced +// reconnects on the SAME source. See processLivenessTransition. +const forceReconnectThrottle = 60 * time.Second + // LivenessKind enumerates the watchdog verdicts for a source. Edge-triggered // transitions use this to decide whether to emit (and what severity). type LivenessKind int @@ -63,6 +67,22 @@ type SourceLivenessState struct { StartedAt int64 // atomic; unix seconds when the source was registered / last reconnected (transient-stall tracking) LastAlertUnix int64 // atomic; unix seconds of last emit (WARN or heartbeat); 0 means quiet IsConnectedFn func() bool + // ForceReconnectFn (#1335) is called by the watchdog when a source + // transitions INTO LivenessStalled. It must force the paho client + // to drop its current TCP socket and re-establish (typically + // client.Disconnect(250) followed by client.Connect()). Half-open + // TCP sockets (Azure NAT idle timeout) report IsConnected==true so + // paho's own auto-reconnect never fires; this is the recovery path. + // May be nil (tests, or sources registered before wiring); the + // watchdog must treat that as a safe no-op. Invocations are + // throttled at forceReconnectThrottle per source so a + // stall→reconnect→re-stall loop self-recovers without hammering + // the broker. + ForceReconnectFn func() + // LastForceReconnectUnix is the unix-seconds timestamp of the most + // recent forced reconnect for this source; the watchdog reads it + // to enforce forceReconnectThrottle. atomic. + LastForceReconnectUnix int64 // AttemptCount is incremented on every TCP/TLS connection attempt. Used // by ConnectionAttemptHandler to log attempt # independent of paho's // internal reconnect-loop state. atomic. @@ -272,12 +292,30 @@ func processLivenessTransition(s *SourceLivenessState, kind LivenessKind, msg st // First detection — fire WARN edge. emit(msg) atomic.StoreInt64(&s.LastAlertUnix, now.Unix()) + // #1335: ONLY LivenessStalled (paho reports connected but no + // messages past threshold — classic half-open TCP) gets + // force-reconnected. LivenessNeverReceived is almost always + // an ACL deny / wrong channel hash — a new TCP socket won't + // fix it and would just churn the broker. The distinct + // "NEVER received" alarm is the right operator signal for + // that class. + if kind == LivenessStalled { + maybeForceReconnect(s, now, emit) + } return } // Already alerted; only re-emit on heartbeat interval to avoid log flood. if now.Sub(time.Unix(lastAlert, 0)) >= livenessHeartbeatInterval { emit(fmt.Sprintf("MQTT [%s] WATCHDOG heartbeat: still stalled — %s", s.Tag, msg)) atomic.StoreInt64(&s.LastAlertUnix, now.Unix()) + // Heartbeat re-emit on a still-Stalled source: try another + // force-reconnect IF the throttle window has elapsed. Under + // a persistent broker issue this caps at one attempt per + // heartbeat (1h) — orders of magnitude under any rate + // limit and well within "don't hammer the broker". + if kind == LivenessStalled { + maybeForceReconnect(s, now, emit) + } } case LivenessOK: if lastAlert != 0 { @@ -294,3 +332,31 @@ func processLivenessTransition(s *SourceLivenessState, kind LivenessKind, msg st } } +// maybeForceReconnect invokes ForceReconnectFn IFF (a) one is wired and +// (b) the throttle window (forceReconnectThrottle) has elapsed since +// the most recent forced reconnect for this source. Logs WATCHDOG +// telemetry before/after so operators can correlate the reconnect with +// downstream paho ConnectionAttempt/OnConnect lines. +func maybeForceReconnect(s *SourceLivenessState, now time.Time, emit func(...any)) { + if s.ForceReconnectFn == nil { + return + } + lastForce := atomic.LoadInt64(&s.LastForceReconnectUnix) + if lastForce != 0 && now.Sub(time.Unix(lastForce, 0)) < forceReconnectThrottle { + emit(fmt.Sprintf("MQTT [%s] WATCHDOG suppressing forced reconnect (last attempt %s ago, throttle %s)", + s.Tag, now.Sub(time.Unix(lastForce, 0)).Round(time.Second), forceReconnectThrottle)) + return + } + atomic.StoreInt64(&s.LastForceReconnectUnix, now.Unix()) + emit(fmt.Sprintf("MQTT [%s] WATCHDOG forcing reconnect (half-open TCP suspected — paho.IsConnected==true but no messages)", s.Tag)) + // Run in a goroutine: ForceReconnectFn typically calls + // client.Disconnect(250) which blocks up to 250ms, then + // client.Connect() which can block on the connect timeout. The + // watchdog goroutine must not stall a per-tick scan over a single + // slow source. + go func() { + s.ForceReconnectFn() + emit(fmt.Sprintf("MQTT [%s] WATCHDOG reconnect attempt issued", s.Tag)) + }() +} + diff --git a/cmd/ingestor/mqtt_watchdog_force_reconnect_test.go b/cmd/ingestor/mqtt_watchdog_force_reconnect_test.go new file mode 100644 index 00000000..5a5bc1ec --- /dev/null +++ b/cmd/ingestor/mqtt_watchdog_force_reconnect_test.go @@ -0,0 +1,174 @@ +package main + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +// Issue #1335 — staging's lincomatic source stalls: paho reports +// IsConnected==true but no messages arrive for 1h+. The PR #1216 +// watchdog DETECTS this (LivenessStalled) but only LOGS — it never +// forces paho to drop the half-open TCP socket and reconnect, so the +// source stays silently broken until container restart. +// +// Fix: on transition INTO LivenessStalled, invoke a per-source +// ForceReconnectFn (wired in main.go to client.Disconnect(250) + +// client.Connect()). Throttled by forceReconnectThrottle so a +// stall→reconnect→re-stall loop self-recovers without hammering the +// broker. + +// RED on master: ForceReconnectFn is never invoked because the +// transition engine does not call it. After the fix, the WARN edge on +// LivenessStalled MUST fire force-reconnect exactly once. +func TestMQTTStallWatchdog_ForceReconnectOnStallEdge(t *testing.T) { + defer snapshotAndResetRegistry(t)() + + now := time.Now() + var reconnectCount atomic.Int32 + s := &SourceLivenessState{ + Tag: "stalled-half-open", + Broker: "tcp://halfopen.example:1883", + IsConnectedFn: func() bool { return true }, + ForceReconnectFn: func() { reconnectCount.Add(1) }, + } + atomic.StoreInt64(&s.LastMessageUnix, now.Add(-10*time.Minute).Unix()) + atomic.StoreInt64(&s.StartedAt, now.Add(-20*time.Minute).Unix()) + if err := registerLivenessState(s); err != nil { + t.Fatalf("setup: %v", err) + } + + var mu sync.Mutex + var emits []string + emit := func(args ...any) { + mu.Lock() + defer mu.Unlock() + if len(args) > 0 { + if str, ok := args[0].(string); ok { + emits = append(emits, str) + } + } + } + + processLivenessTransition(s, LivenessStalled, "10m silent", now, emit) + + // ForceReconnectFn runs in a goroutine (the production code can't + // block the watchdog tick on a slow Disconnect+Connect). Wait + // briefly for it to land before asserting. + waitForReconnect(t, &reconnectCount, 1, 2*time.Second) + + if got := reconnectCount.Load(); got != 1 { + t.Fatalf("LivenessStalled transition MUST force-reconnect exactly once; got %d invocations (emits=%v)", got, emits) + } +} + +// Throttle: a second LivenessStalled transition within the throttle +// window MUST NOT fire a second reconnect (no broker hammering). +func TestMQTTStallWatchdog_ForceReconnectThrottled(t *testing.T) { + defer snapshotAndResetRegistry(t)() + + now := time.Now() + var reconnectCount atomic.Int32 + s := &SourceLivenessState{ + Tag: "throttled", + Broker: "tcp://x:1883", + IsConnectedFn: func() bool { return true }, + ForceReconnectFn: func() { reconnectCount.Add(1) }, + } + if err := registerLivenessState(s); err != nil { + t.Fatalf("setup: %v", err) + } + + emit := func(args ...any) {} + + // First stall edge → fires. + processLivenessTransition(s, LivenessStalled, "stall 1", now, emit) + waitForReconnect(t, &reconnectCount, 1, 2*time.Second) + // Simulate paho reconnect cycle: MarkReconnected clears the alert + // cooldown, then the source goes stalled again 5s later. + s.MarkReconnected(now.Add(5 * time.Second)) + processLivenessTransition(s, LivenessStalled, "stall 2", now.Add(10*time.Second), emit) + // Give a stray goroutine a chance to land (it shouldn't, due to throttle). + time.Sleep(100 * time.Millisecond) + + if got := reconnectCount.Load(); got != 1 { + t.Fatalf("force-reconnect MUST be throttled within %s; got %d invocations", forceReconnectThrottle, got) + } + + // After the throttle window, a fresh stall edge MAY fire again. + s.MarkReconnected(now.Add(30 * time.Second)) + processLivenessTransition(s, LivenessStalled, "stall 3", now.Add(forceReconnectThrottle+30*time.Second), emit) + waitForReconnect(t, &reconnectCount, 2, 2*time.Second) + if got := reconnectCount.Load(); got != 2 { + t.Fatalf("after throttle window, force-reconnect must re-arm; got %d invocations", got) + } +} + +// NeverReceived (cold-start ACL-deny / never-flowed) MUST NOT +// force-reconnect. A SUBSCRIBE ACL deny is not fixed by a new TCP +// socket; reconnecting just churns the broker. Operators get the +// distinct "NEVER received" alarm so they can address the ACL. +func TestMQTTStallWatchdog_NoForceReconnectOnNeverReceived(t *testing.T) { + defer snapshotAndResetRegistry(t)() + + now := time.Now() + var reconnectCount atomic.Int32 + s := &SourceLivenessState{ + Tag: "acl-denied", + Broker: "tcp://x:1883", + IsConnectedFn: func() bool { return true }, + ForceReconnectFn: func() { reconnectCount.Add(1) }, + } + if err := registerLivenessState(s); err != nil { + t.Fatalf("setup: %v", err) + } + + emit := func(args ...any) {} + processLivenessTransition(s, LivenessNeverReceived, "no msgs ever", now, emit) + // Settle any (incorrect) goroutine before counting. + time.Sleep(100 * time.Millisecond) + + if got := reconnectCount.Load(); got != 0 { + t.Fatalf("LivenessNeverReceived must NOT force-reconnect (likely ACL deny — TCP churn won't help); got %d invocations", got) + } +} + +// Safety: a source with no ForceReconnectFn wired (e.g. tests, or a +// source registered before the wiring was added) MUST NOT panic when +// LivenessStalled fires. +func TestMQTTStallWatchdog_NilForceReconnectFnIsSafe(t *testing.T) { + defer snapshotAndResetRegistry(t)() + + now := time.Now() + s := &SourceLivenessState{ + Tag: "no-reconnect-fn", + Broker: "tcp://x:1883", + IsConnectedFn: func() bool { return true }, + // ForceReconnectFn deliberately nil. + } + if err := registerLivenessState(s); err != nil { + t.Fatalf("setup: %v", err) + } + defer func() { + if r := recover(); r != nil { + t.Fatalf("nil ForceReconnectFn must be a safe no-op; panicked: %v", r) + } + }() + processLivenessTransition(s, LivenessStalled, "stalled", now, func(args ...any) {}) +} + +// waitForReconnect polls reconnectCount until it reaches `want` or the +// deadline elapses. ForceReconnectFn runs in a goroutine in production +// (Disconnect+Connect can block on broker IO), so tests can't read the +// counter synchronously. +func waitForReconnect(t *testing.T, count *atomic.Int32, want int32, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if count.Load() >= want { + return + } + time.Sleep(5 * time.Millisecond) + } +}