From f0688743a7ff702fb4e7abcf87ee5e9071430dbf Mon Sep 17 00:00:00 2001 From: openclaw-bot Date: Sat, 23 May 2026 22:31:14 +0000 Subject: [PATCH 1/2] =?UTF-8?q?test(mqtt-watchdog):=20RED=20=E2=80=94=20fo?= =?UTF-8?q?rce-reconnect=20on=20LivenessStalled,=20throttled?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds failing tests for #1335: when checkSourceLiveness returns LivenessStalled (paho.IsConnected==true but no messages past threshold — half-open TCP), processLivenessTransition must invoke a per-source ForceReconnectFn so paho drops the dead socket and re-dials. Also asserts: - throttle: a second stall within forceReconnectThrottle (60s) does NOT fire a second reconnect (no broker hammering) - NeverReceived MUST NOT force-reconnect (likely ACL deny — TCP churn won't help; the distinct cold-start alarm is the right signal) - nil ForceReconnectFn is a safe no-op Compiles, fails on assertion. GREEN commit follows. --- cmd/ingestor/mqtt_watchdog.go | 20 ++ .../mqtt_watchdog_force_reconnect_test.go | 174 ++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 cmd/ingestor/mqtt_watchdog_force_reconnect_test.go diff --git a/cmd/ingestor/mqtt_watchdog.go b/cmd/ingestor/mqtt_watchdog.go index 20f989fc..8d4b22d8 100644 --- a/cmd/ingestor/mqtt_watchdog.go +++ b/cmd/ingestor/mqtt_watchdog.go @@ -14,6 +14,10 @@ import ( // shift, infrequent enough not to spam ops chat. const livenessHeartbeatInterval = time.Hour +// forceReconnectThrottle is the minimum interval between forced +// reconnects on the SAME source. See processLivenessTransition. +const forceReconnectThrottle = 60 * time.Second + // LivenessKind enumerates the watchdog verdicts for a source. Edge-triggered // transitions use this to decide whether to emit (and what severity). type LivenessKind int @@ -63,6 +67,22 @@ type SourceLivenessState struct { StartedAt int64 // atomic; unix seconds when the source was registered / last reconnected (transient-stall tracking) LastAlertUnix int64 // atomic; unix seconds of last emit (WARN or heartbeat); 0 means quiet IsConnectedFn func() bool + // ForceReconnectFn (#1335) is called by the watchdog when a source + // transitions INTO LivenessStalled. It must force the paho client + // to drop its current TCP socket and re-establish (typically + // client.Disconnect(250) followed by client.Connect()). Half-open + // TCP sockets (Azure NAT idle timeout) report IsConnected==true so + // paho's own auto-reconnect never fires; this is the recovery path. + // May be nil (tests, or sources registered before wiring); the + // watchdog must treat that as a safe no-op. Invocations are + // throttled at forceReconnectThrottle per source so a + // stall→reconnect→re-stall loop self-recovers without hammering + // the broker. + ForceReconnectFn func() + // LastForceReconnectUnix is the unix-seconds timestamp of the most + // recent forced reconnect for this source; the watchdog reads it + // to enforce forceReconnectThrottle. atomic. + LastForceReconnectUnix int64 // AttemptCount is incremented on every TCP/TLS connection attempt. Used // by ConnectionAttemptHandler to log attempt # independent of paho's // internal reconnect-loop state. atomic. diff --git a/cmd/ingestor/mqtt_watchdog_force_reconnect_test.go b/cmd/ingestor/mqtt_watchdog_force_reconnect_test.go new file mode 100644 index 00000000..5a5bc1ec --- /dev/null +++ b/cmd/ingestor/mqtt_watchdog_force_reconnect_test.go @@ -0,0 +1,174 @@ +package main + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +// Issue #1335 — staging's lincomatic source stalls: paho reports +// IsConnected==true but no messages arrive for 1h+. The PR #1216 +// watchdog DETECTS this (LivenessStalled) but only LOGS — it never +// forces paho to drop the half-open TCP socket and reconnect, so the +// source stays silently broken until container restart. +// +// Fix: on transition INTO LivenessStalled, invoke a per-source +// ForceReconnectFn (wired in main.go to client.Disconnect(250) + +// client.Connect()). Throttled by forceReconnectThrottle so a +// stall→reconnect→re-stall loop self-recovers without hammering the +// broker. + +// RED on master: ForceReconnectFn is never invoked because the +// transition engine does not call it. After the fix, the WARN edge on +// LivenessStalled MUST fire force-reconnect exactly once. +func TestMQTTStallWatchdog_ForceReconnectOnStallEdge(t *testing.T) { + defer snapshotAndResetRegistry(t)() + + now := time.Now() + var reconnectCount atomic.Int32 + s := &SourceLivenessState{ + Tag: "stalled-half-open", + Broker: "tcp://halfopen.example:1883", + IsConnectedFn: func() bool { return true }, + ForceReconnectFn: func() { reconnectCount.Add(1) }, + } + atomic.StoreInt64(&s.LastMessageUnix, now.Add(-10*time.Minute).Unix()) + atomic.StoreInt64(&s.StartedAt, now.Add(-20*time.Minute).Unix()) + if err := registerLivenessState(s); err != nil { + t.Fatalf("setup: %v", err) + } + + var mu sync.Mutex + var emits []string + emit := func(args ...any) { + mu.Lock() + defer mu.Unlock() + if len(args) > 0 { + if str, ok := args[0].(string); ok { + emits = append(emits, str) + } + } + } + + processLivenessTransition(s, LivenessStalled, "10m silent", now, emit) + + // ForceReconnectFn runs in a goroutine (the production code can't + // block the watchdog tick on a slow Disconnect+Connect). Wait + // briefly for it to land before asserting. + waitForReconnect(t, &reconnectCount, 1, 2*time.Second) + + if got := reconnectCount.Load(); got != 1 { + t.Fatalf("LivenessStalled transition MUST force-reconnect exactly once; got %d invocations (emits=%v)", got, emits) + } +} + +// Throttle: a second LivenessStalled transition within the throttle +// window MUST NOT fire a second reconnect (no broker hammering). +func TestMQTTStallWatchdog_ForceReconnectThrottled(t *testing.T) { + defer snapshotAndResetRegistry(t)() + + now := time.Now() + var reconnectCount atomic.Int32 + s := &SourceLivenessState{ + Tag: "throttled", + Broker: "tcp://x:1883", + IsConnectedFn: func() bool { return true }, + ForceReconnectFn: func() { reconnectCount.Add(1) }, + } + if err := registerLivenessState(s); err != nil { + t.Fatalf("setup: %v", err) + } + + emit := func(args ...any) {} + + // First stall edge → fires. + processLivenessTransition(s, LivenessStalled, "stall 1", now, emit) + waitForReconnect(t, &reconnectCount, 1, 2*time.Second) + // Simulate paho reconnect cycle: MarkReconnected clears the alert + // cooldown, then the source goes stalled again 5s later. + s.MarkReconnected(now.Add(5 * time.Second)) + processLivenessTransition(s, LivenessStalled, "stall 2", now.Add(10*time.Second), emit) + // Give a stray goroutine a chance to land (it shouldn't, due to throttle). + time.Sleep(100 * time.Millisecond) + + if got := reconnectCount.Load(); got != 1 { + t.Fatalf("force-reconnect MUST be throttled within %s; got %d invocations", forceReconnectThrottle, got) + } + + // After the throttle window, a fresh stall edge MAY fire again. + s.MarkReconnected(now.Add(30 * time.Second)) + processLivenessTransition(s, LivenessStalled, "stall 3", now.Add(forceReconnectThrottle+30*time.Second), emit) + waitForReconnect(t, &reconnectCount, 2, 2*time.Second) + if got := reconnectCount.Load(); got != 2 { + t.Fatalf("after throttle window, force-reconnect must re-arm; got %d invocations", got) + } +} + +// NeverReceived (cold-start ACL-deny / never-flowed) MUST NOT +// force-reconnect. A SUBSCRIBE ACL deny is not fixed by a new TCP +// socket; reconnecting just churns the broker. Operators get the +// distinct "NEVER received" alarm so they can address the ACL. +func TestMQTTStallWatchdog_NoForceReconnectOnNeverReceived(t *testing.T) { + defer snapshotAndResetRegistry(t)() + + now := time.Now() + var reconnectCount atomic.Int32 + s := &SourceLivenessState{ + Tag: "acl-denied", + Broker: "tcp://x:1883", + IsConnectedFn: func() bool { return true }, + ForceReconnectFn: func() { reconnectCount.Add(1) }, + } + if err := registerLivenessState(s); err != nil { + t.Fatalf("setup: %v", err) + } + + emit := func(args ...any) {} + processLivenessTransition(s, LivenessNeverReceived, "no msgs ever", now, emit) + // Settle any (incorrect) goroutine before counting. + time.Sleep(100 * time.Millisecond) + + if got := reconnectCount.Load(); got != 0 { + t.Fatalf("LivenessNeverReceived must NOT force-reconnect (likely ACL deny — TCP churn won't help); got %d invocations", got) + } +} + +// Safety: a source with no ForceReconnectFn wired (e.g. tests, or a +// source registered before the wiring was added) MUST NOT panic when +// LivenessStalled fires. +func TestMQTTStallWatchdog_NilForceReconnectFnIsSafe(t *testing.T) { + defer snapshotAndResetRegistry(t)() + + now := time.Now() + s := &SourceLivenessState{ + Tag: "no-reconnect-fn", + Broker: "tcp://x:1883", + IsConnectedFn: func() bool { return true }, + // ForceReconnectFn deliberately nil. + } + if err := registerLivenessState(s); err != nil { + t.Fatalf("setup: %v", err) + } + defer func() { + if r := recover(); r != nil { + t.Fatalf("nil ForceReconnectFn must be a safe no-op; panicked: %v", r) + } + }() + processLivenessTransition(s, LivenessStalled, "stalled", now, func(args ...any) {}) +} + +// waitForReconnect polls reconnectCount until it reaches `want` or the +// deadline elapses. ForceReconnectFn runs in a goroutine in production +// (Disconnect+Connect can block on broker IO), so tests can't read the +// counter synchronously. +func waitForReconnect(t *testing.T, count *atomic.Int32, want int32, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if count.Load() >= want { + return + } + time.Sleep(5 * time.Millisecond) + } +} From 8f53c10c912181be49bb8f6da7af34c3d99790ca Mon Sep 17 00:00:00 2001 From: openclaw-bot Date: Sat, 23 May 2026 22:34:50 +0000 Subject: [PATCH 2/2] =?UTF-8?q?fix(mqtt):=20watchdog=20forces=20paho=20rec?= =?UTF-8?q?onnect=20on=20stall=20=E2=80=94=20recovers=20half-open=20TCP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #1335: PR #1216 added per-source stall DETECTION but only logged. Staging's lincomatic source has been losing ~14k pkts/hr behind a half-open TCP socket the Azure NAT silently abandons: paho reports IsConnected==true, no messages arrive for an hour, container restart is the only known recovery. Prod (MikroTik networking) doesn't see it. Changes (all additive — no behavior change for currently-flowing sources): - SourceLivenessState.ForceReconnectFn: per-source closure that wraps client.Disconnect(250) + client.Connect(). Wired in main.go right next to IsConnectedFn. - processLivenessTransition: on the LivenessStalled edge AND on every heartbeat re-emit while still Stalled, invoke maybeForceReconnect. LivenessNeverReceived (cold-start ACL deny / wrong hash) is deliberately NOT force-reconnected — a new TCP socket won't fix an ACL deny and would just churn the broker. - maybeForceReconnect: throttled at forceReconnectThrottle (60s) per source so a stall→reconnect→re-stall loop self-recovers without hammering the broker. Runs the Disconnect+Connect in a goroutine so a single slow source can't stall the watchdog tick. - buildMQTTOpts: explicit SetKeepAlive(30s). paho's default happens to be 30s but #1335 RCA called this out specifically — making it explicit so it can't drift, and so operators reading the code know it's intentional. - WATCHDOG telemetry: 'forcing reconnect' (intent), 'reconnect attempt issued' (post-goroutine), 'suppressing forced reconnect' (throttle). Test mqtt_watchdog_force_reconnect_test.go covers: stall edge fires, throttle suppresses second attempt within 60s, throttle re-arms after, NeverReceived does NOT force-reconnect, nil ForceReconnectFn is safe. --- cmd/ingestor/main.go | 23 +++++++++++++++++- cmd/ingestor/mqtt_watchdog.go | 46 +++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index fc3be6ad..c7f10269 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -276,6 +276,18 @@ func main() { // Registration BEFORE Connect so the attempt counter is available // to OnConnectAttempt on the very first dial. liveness.IsConnectedFn = client.IsConnected + // #1335: wire force-reconnect so the watchdog can drop a + // half-open TCP socket and re-dial when paho.IsConnected==true + // but no messages have flowed past the stall threshold. Throttled + // per source by the watchdog itself (forceReconnectThrottle). + // Disconnect(250) gives in-flight publishes 250ms to drain; + // Connect() returns immediately and paho's reconnect machinery + // takes over from there. Captured-by-value `client` is the same + // pointer used everywhere else for this source. + liveness.ForceReconnectFn = func() { + client.Disconnect(250) + client.Connect() + } // PR #1216 r2 item 3: tag collisions used to log.Fatalf, which // killed the entire ingestor over one config typo and recreated // the #1212 total-ingest-stop class this PR exists to prevent. @@ -371,7 +383,16 @@ func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions { SetOrderMatters(true). SetMaxReconnectInterval(30 * time.Second). SetConnectTimeout(10 * time.Second). - SetWriteTimeout(10 * time.Second) + SetWriteTimeout(10 * time.Second). + // #1335: TCP-level keepalive surfaces a half-open socket within + // ~30-60s instead of waiting for the application-level watchdog + // (5m) to notice no messages. paho's MQTT PINGREQ uses this + // interval too — if the broker's PINGRESP doesn't arrive, + // ConnectionLost fires and auto-reconnect kicks in. Was unset + // (paho default 30s actually — making this explicit so it can't + // drift, and so operators reading the code know it's intentional + // per the #1335 RCA). + SetKeepAlive(30 * time.Second) opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config { // Look up the per-source liveness state (registered in main) so we diff --git a/cmd/ingestor/mqtt_watchdog.go b/cmd/ingestor/mqtt_watchdog.go index 8d4b22d8..899ff4c5 100644 --- a/cmd/ingestor/mqtt_watchdog.go +++ b/cmd/ingestor/mqtt_watchdog.go @@ -292,12 +292,30 @@ func processLivenessTransition(s *SourceLivenessState, kind LivenessKind, msg st // First detection — fire WARN edge. emit(msg) atomic.StoreInt64(&s.LastAlertUnix, now.Unix()) + // #1335: ONLY LivenessStalled (paho reports connected but no + // messages past threshold — classic half-open TCP) gets + // force-reconnected. LivenessNeverReceived is almost always + // an ACL deny / wrong channel hash — a new TCP socket won't + // fix it and would just churn the broker. The distinct + // "NEVER received" alarm is the right operator signal for + // that class. + if kind == LivenessStalled { + maybeForceReconnect(s, now, emit) + } return } // Already alerted; only re-emit on heartbeat interval to avoid log flood. if now.Sub(time.Unix(lastAlert, 0)) >= livenessHeartbeatInterval { emit(fmt.Sprintf("MQTT [%s] WATCHDOG heartbeat: still stalled — %s", s.Tag, msg)) atomic.StoreInt64(&s.LastAlertUnix, now.Unix()) + // Heartbeat re-emit on a still-Stalled source: try another + // force-reconnect IF the throttle window has elapsed. Under + // a persistent broker issue this caps at one attempt per + // heartbeat (1h) — orders of magnitude under any rate + // limit and well within "don't hammer the broker". + if kind == LivenessStalled { + maybeForceReconnect(s, now, emit) + } } case LivenessOK: if lastAlert != 0 { @@ -314,3 +332,31 @@ func processLivenessTransition(s *SourceLivenessState, kind LivenessKind, msg st } } +// maybeForceReconnect invokes ForceReconnectFn IFF (a) one is wired and +// (b) the throttle window (forceReconnectThrottle) has elapsed since +// the most recent forced reconnect for this source. Logs WATCHDOG +// telemetry before/after so operators can correlate the reconnect with +// downstream paho ConnectionAttempt/OnConnect lines. +func maybeForceReconnect(s *SourceLivenessState, now time.Time, emit func(...any)) { + if s.ForceReconnectFn == nil { + return + } + lastForce := atomic.LoadInt64(&s.LastForceReconnectUnix) + if lastForce != 0 && now.Sub(time.Unix(lastForce, 0)) < forceReconnectThrottle { + emit(fmt.Sprintf("MQTT [%s] WATCHDOG suppressing forced reconnect (last attempt %s ago, throttle %s)", + s.Tag, now.Sub(time.Unix(lastForce, 0)).Round(time.Second), forceReconnectThrottle)) + return + } + atomic.StoreInt64(&s.LastForceReconnectUnix, now.Unix()) + emit(fmt.Sprintf("MQTT [%s] WATCHDOG forcing reconnect (half-open TCP suspected — paho.IsConnected==true but no messages)", s.Tag)) + // Run in a goroutine: ForceReconnectFn typically calls + // client.Disconnect(250) which blocks up to 250ms, then + // client.Connect() which can block on the connect timeout. The + // watchdog goroutine must not stall a per-tick scan over a single + // slow source. + go func() { + s.ForceReconnectFn() + emit(fmt.Sprintf("MQTT [%s] WATCHDOG reconnect attempt issued", s.Tag)) + }() +} +