Skip to content

fix(mqtt): watchdog forces paho reconnect on stall — recovers from half-open TCP (closes #1335)#1336

Open
Kpa-clawbot wants to merge 2 commits into
masterfrom
fix/issue-1335
Open

fix(mqtt): watchdog forces paho reconnect on stall — recovers from half-open TCP (closes #1335)#1336
Kpa-clawbot wants to merge 2 commits into
masterfrom
fix/issue-1335

Conversation

@Kpa-clawbot
Copy link
Copy Markdown
Owner

RED f06887 — GREEN 8f53c1. CI: (will populate on PR open)

Fixes #1335

Problem

PR #1216 added per-source stall detection (LivenessStalled) but only logged. Staging's lincomatic source has been silently losing ~14k pkts/hr behind a half-open TCP socket the Azure NAT abandons: paho reports IsConnected==true, no messages arrive for 1h+, container restart is the only known recovery. Prod (MikroTik networking) doesn't see it.

Fix

Make the watchdog actually recover.

  • SourceLivenessState.ForceReconnectFn — per-source closure wired in main.go next to IsConnectedFn, wraps client.Disconnect(250) + client.Connect().
  • processLivenessTransition — on the LivenessStalled edge AND on every heartbeat re-emit while still Stalled, invoke maybeForceReconnect. LivenessNeverReceived (cold-start ACL deny / wrong hash) is deliberately not force-reconnected — a new TCP socket won't fix an ACL deny and would just churn the broker.
  • maybeForceReconnect — throttled at forceReconnectThrottle = 60s per source so a stall→reconnect→re-stall loop self-recovers without hammering the broker. The Disconnect+Connect runs in a goroutine so a single slow source can't stall the watchdog tick.
  • buildMQTTOpts — explicit SetKeepAlive(30 * time.Second). paho's default happens to be 30s, but the bug(mqtt): watchdog detects stall but does NOT force reconnect — staging losing ~14k pkts/hr behind half-open TCP #1335 RCA called this out — making it explicit so it can't drift and so operators reading the code know it's intentional.
  • TelemetryWATCHDOG forcing reconnect (intent), WATCHDOG reconnect attempt issued (post-goroutine), WATCHDOG suppressing forced reconnect (throttle window).

TDD

  • RED f06887mqtt_watchdog_force_reconnect_test.go. Stub field + constant added so the file compiles; assertions fail because processLivenessTransition never invokes ForceReconnectFn. Reverting just the s.ForceReconnectFn() call line from GREEN re-fails the same assertion (mutation verified).
  • GREEN 8f53c1 — wiring + throttle + keepalive.

Scope discipline

Additive only. No regression to currently-flowing sources: LivenessOK, LivenessRecovered, LivenessDisconnected, LivenessHeartbeat, and LivenessNeverReceived transitions are unchanged. Throttle bound = ≤1 reconnect/min/source = ≤60/hr worst-case across all sources, well within any broker rate limit.

Preflight: clean (all gates pass).

openclaw-bot added 2 commits May 23, 2026 22:32
Adds failing tests for #1335: when checkSourceLiveness returns
LivenessStalled (paho.IsConnected==true but no messages past threshold —
half-open TCP), processLivenessTransition must invoke a per-source
ForceReconnectFn so paho drops the dead socket and re-dials.

Also asserts:
- throttle: a second stall within forceReconnectThrottle (60s) does NOT
  fire a second reconnect (no broker hammering)
- NeverReceived MUST NOT force-reconnect (likely ACL deny — TCP churn
  won't help; the distinct cold-start alarm is the right signal)
- nil ForceReconnectFn is a safe no-op

Compiles, fails on assertion. GREEN commit follows.
…en TCP

#1335: PR #1216 added per-source stall DETECTION but only logged.
Staging's lincomatic source has been losing ~14k pkts/hr behind a
half-open TCP socket the Azure NAT silently abandons: paho reports
IsConnected==true, no messages arrive for an hour, container restart
is the only known recovery. Prod (MikroTik networking) doesn't see it.

Changes (all additive — no behavior change for currently-flowing sources):
- SourceLivenessState.ForceReconnectFn: per-source closure that wraps
  client.Disconnect(250) + client.Connect(). Wired in main.go right
  next to IsConnectedFn.
- processLivenessTransition: on the LivenessStalled edge AND on every
  heartbeat re-emit while still Stalled, invoke maybeForceReconnect.
  LivenessNeverReceived (cold-start ACL deny / wrong hash) is
  deliberately NOT force-reconnected — a new TCP socket won't fix an
  ACL deny and would just churn the broker.
- maybeForceReconnect: throttled at forceReconnectThrottle (60s) per
  source so a stall→reconnect→re-stall loop self-recovers without
  hammering the broker. Runs the Disconnect+Connect in a goroutine
  so a single slow source can't stall the watchdog tick.
- buildMQTTOpts: explicit SetKeepAlive(30s). paho's default happens
  to be 30s but #1335 RCA called this out specifically — making it
  explicit so it can't drift, and so operators reading the code know
  it's intentional.
- WATCHDOG telemetry: 'forcing reconnect' (intent), 'reconnect attempt
  issued' (post-goroutine), 'suppressing forced reconnect' (throttle).

Test mqtt_watchdog_force_reconnect_test.go covers: stall edge fires,
throttle suppresses second attempt within 60s, throttle re-arms after,
NeverReceived does NOT force-reconnect, nil ForceReconnectFn is safe.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant