Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,18 @@ func main() {
// Registration BEFORE Connect so the attempt counter is available
// to OnConnectAttempt on the very first dial.
liveness.IsConnectedFn = client.IsConnected
// #1335: wire force-reconnect so the watchdog can drop a
// half-open TCP socket and re-dial when paho.IsConnected==true
// but no messages have flowed past the stall threshold. Throttled
// per source by the watchdog itself (forceReconnectThrottle).
// Disconnect(250) gives in-flight publishes 250ms to drain;
// Connect() returns immediately and paho's reconnect machinery
// takes over from there. Captured-by-value `client` is the same
// pointer used everywhere else for this source.
liveness.ForceReconnectFn = func() {
client.Disconnect(250)
client.Connect()
}
// PR #1216 r2 item 3: tag collisions used to log.Fatalf, which
// killed the entire ingestor over one config typo and recreated
// the #1212 total-ingest-stop class this PR exists to prevent.
Expand Down Expand Up @@ -371,7 +383,16 @@ func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions {
SetOrderMatters(true).
SetMaxReconnectInterval(30 * time.Second).
SetConnectTimeout(10 * time.Second).
SetWriteTimeout(10 * time.Second)
SetWriteTimeout(10 * time.Second).
// #1335: TCP-level keepalive surfaces a half-open socket within
// ~30-60s instead of waiting for the application-level watchdog
// (5m) to notice no messages. paho's MQTT PINGREQ uses this
// interval too — if the broker's PINGRESP doesn't arrive,
// ConnectionLost fires and auto-reconnect kicks in. Was unset
// (paho default 30s actually — making this explicit so it can't
// drift, and so operators reading the code know it's intentional
// per the #1335 RCA).
SetKeepAlive(30 * time.Second)

opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
// Look up the per-source liveness state (registered in main) so we
Expand Down
66 changes: 66 additions & 0 deletions cmd/ingestor/mqtt_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
// shift, infrequent enough not to spam ops chat.
const livenessHeartbeatInterval = time.Hour

// forceReconnectThrottle is the minimum interval between forced
// reconnects on the SAME source. See processLivenessTransition.
const forceReconnectThrottle = 60 * time.Second

// LivenessKind enumerates the watchdog verdicts for a source. Edge-triggered
// transitions use this to decide whether to emit (and what severity).
type LivenessKind int
Expand Down Expand Up @@ -63,6 +67,22 @@ type SourceLivenessState struct {
StartedAt int64 // atomic; unix seconds when the source was registered / last reconnected (transient-stall tracking)
LastAlertUnix int64 // atomic; unix seconds of last emit (WARN or heartbeat); 0 means quiet
IsConnectedFn func() bool
// ForceReconnectFn (#1335) is called by the watchdog when a source
// transitions INTO LivenessStalled. It must force the paho client
// to drop its current TCP socket and re-establish (typically
// client.Disconnect(250) followed by client.Connect()). Half-open
// TCP sockets (Azure NAT idle timeout) report IsConnected==true so
// paho's own auto-reconnect never fires; this is the recovery path.
// May be nil (tests, or sources registered before wiring); the
// watchdog must treat that as a safe no-op. Invocations are
// throttled at forceReconnectThrottle per source so a
// stall→reconnect→re-stall loop self-recovers without hammering
// the broker.
ForceReconnectFn func()
// LastForceReconnectUnix is the unix-seconds timestamp of the most
// recent forced reconnect for this source; the watchdog reads it
// to enforce forceReconnectThrottle. atomic.
LastForceReconnectUnix int64
// AttemptCount is incremented on every TCP/TLS connection attempt. Used
// by ConnectionAttemptHandler to log attempt # independent of paho's
// internal reconnect-loop state. atomic.
Expand Down Expand Up @@ -272,12 +292,30 @@ func processLivenessTransition(s *SourceLivenessState, kind LivenessKind, msg st
// First detection — fire WARN edge.
emit(msg)
atomic.StoreInt64(&s.LastAlertUnix, now.Unix())
// #1335: ONLY LivenessStalled (paho reports connected but no
// messages past threshold — classic half-open TCP) gets
// force-reconnected. LivenessNeverReceived is almost always
// an ACL deny / wrong channel hash — a new TCP socket won't
// fix it and would just churn the broker. The distinct
// "NEVER received" alarm is the right operator signal for
// that class.
if kind == LivenessStalled {
maybeForceReconnect(s, now, emit)
}
return
}
// Already alerted; only re-emit on heartbeat interval to avoid log flood.
if now.Sub(time.Unix(lastAlert, 0)) >= livenessHeartbeatInterval {
emit(fmt.Sprintf("MQTT [%s] WATCHDOG heartbeat: still stalled — %s", s.Tag, msg))
atomic.StoreInt64(&s.LastAlertUnix, now.Unix())
// Heartbeat re-emit on a still-Stalled source: try another
// force-reconnect IF the throttle window has elapsed. Under
// a persistent broker issue this caps at one attempt per
// heartbeat (1h) — orders of magnitude under any rate
// limit and well within "don't hammer the broker".
if kind == LivenessStalled {
maybeForceReconnect(s, now, emit)
}
}
case LivenessOK:
if lastAlert != 0 {
Expand All @@ -294,3 +332,31 @@ func processLivenessTransition(s *SourceLivenessState, kind LivenessKind, msg st
}
}

// maybeForceReconnect invokes ForceReconnectFn IFF (a) one is wired and
// (b) the throttle window (forceReconnectThrottle) has elapsed since
// the most recent forced reconnect for this source. Logs WATCHDOG
// telemetry before/after so operators can correlate the reconnect with
// downstream paho ConnectionAttempt/OnConnect lines.
func maybeForceReconnect(s *SourceLivenessState, now time.Time, emit func(...any)) {
if s.ForceReconnectFn == nil {
return
}
lastForce := atomic.LoadInt64(&s.LastForceReconnectUnix)
if lastForce != 0 && now.Sub(time.Unix(lastForce, 0)) < forceReconnectThrottle {
emit(fmt.Sprintf("MQTT [%s] WATCHDOG suppressing forced reconnect (last attempt %s ago, throttle %s)",
s.Tag, now.Sub(time.Unix(lastForce, 0)).Round(time.Second), forceReconnectThrottle))
return
}
atomic.StoreInt64(&s.LastForceReconnectUnix, now.Unix())
emit(fmt.Sprintf("MQTT [%s] WATCHDOG forcing reconnect (half-open TCP suspected — paho.IsConnected==true but no messages)", s.Tag))
// Run in a goroutine: ForceReconnectFn typically calls
// client.Disconnect(250) which blocks up to 250ms, then
// client.Connect() which can block on the connect timeout. The
// watchdog goroutine must not stall a per-tick scan over a single
// slow source.
go func() {
s.ForceReconnectFn()
emit(fmt.Sprintf("MQTT [%s] WATCHDOG reconnect attempt issued", s.Tag))
}()
}

174 changes: 174 additions & 0 deletions cmd/ingestor/mqtt_watchdog_force_reconnect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package main

import (
"sync"
"sync/atomic"
"testing"
"time"
)

// Issue #1335 — staging's lincomatic source stalls: paho reports
// IsConnected==true but no messages arrive for 1h+. The PR #1216
// watchdog DETECTS this (LivenessStalled) but only LOGS — it never
// forces paho to drop the half-open TCP socket and reconnect, so the
// source stays silently broken until container restart.
//
// Fix: on transition INTO LivenessStalled, invoke a per-source
// ForceReconnectFn (wired in main.go to client.Disconnect(250) +
// client.Connect()). Throttled by forceReconnectThrottle so a
// stall→reconnect→re-stall loop self-recovers without hammering the
// broker.

// RED on master: ForceReconnectFn is never invoked because the
// transition engine does not call it. After the fix, the WARN edge on
// LivenessStalled MUST fire force-reconnect exactly once.
func TestMQTTStallWatchdog_ForceReconnectOnStallEdge(t *testing.T) {
defer snapshotAndResetRegistry(t)()

now := time.Now()
var reconnectCount atomic.Int32
s := &SourceLivenessState{
Tag: "stalled-half-open",
Broker: "tcp://halfopen.example:1883",
IsConnectedFn: func() bool { return true },
ForceReconnectFn: func() { reconnectCount.Add(1) },
}
atomic.StoreInt64(&s.LastMessageUnix, now.Add(-10*time.Minute).Unix())
atomic.StoreInt64(&s.StartedAt, now.Add(-20*time.Minute).Unix())
if err := registerLivenessState(s); err != nil {
t.Fatalf("setup: %v", err)
}

var mu sync.Mutex
var emits []string
emit := func(args ...any) {
mu.Lock()
defer mu.Unlock()
if len(args) > 0 {
if str, ok := args[0].(string); ok {
emits = append(emits, str)
}
}
}

processLivenessTransition(s, LivenessStalled, "10m silent", now, emit)

// ForceReconnectFn runs in a goroutine (the production code can't
// block the watchdog tick on a slow Disconnect+Connect). Wait
// briefly for it to land before asserting.
waitForReconnect(t, &reconnectCount, 1, 2*time.Second)

if got := reconnectCount.Load(); got != 1 {
t.Fatalf("LivenessStalled transition MUST force-reconnect exactly once; got %d invocations (emits=%v)", got, emits)
}
}

// Throttle: a second LivenessStalled transition within the throttle
// window MUST NOT fire a second reconnect (no broker hammering).
func TestMQTTStallWatchdog_ForceReconnectThrottled(t *testing.T) {
defer snapshotAndResetRegistry(t)()

now := time.Now()
var reconnectCount atomic.Int32
s := &SourceLivenessState{
Tag: "throttled",
Broker: "tcp://x:1883",
IsConnectedFn: func() bool { return true },
ForceReconnectFn: func() { reconnectCount.Add(1) },
}
if err := registerLivenessState(s); err != nil {
t.Fatalf("setup: %v", err)
}

emit := func(args ...any) {}

// First stall edge → fires.
processLivenessTransition(s, LivenessStalled, "stall 1", now, emit)
waitForReconnect(t, &reconnectCount, 1, 2*time.Second)
// Simulate paho reconnect cycle: MarkReconnected clears the alert
// cooldown, then the source goes stalled again 5s later.
s.MarkReconnected(now.Add(5 * time.Second))
processLivenessTransition(s, LivenessStalled, "stall 2", now.Add(10*time.Second), emit)
// Give a stray goroutine a chance to land (it shouldn't, due to throttle).
time.Sleep(100 * time.Millisecond)

if got := reconnectCount.Load(); got != 1 {
t.Fatalf("force-reconnect MUST be throttled within %s; got %d invocations", forceReconnectThrottle, got)
}

// After the throttle window, a fresh stall edge MAY fire again.
s.MarkReconnected(now.Add(30 * time.Second))
processLivenessTransition(s, LivenessStalled, "stall 3", now.Add(forceReconnectThrottle+30*time.Second), emit)
waitForReconnect(t, &reconnectCount, 2, 2*time.Second)
if got := reconnectCount.Load(); got != 2 {
t.Fatalf("after throttle window, force-reconnect must re-arm; got %d invocations", got)
}
}

// NeverReceived (cold-start ACL-deny / never-flowed) MUST NOT
// force-reconnect. A SUBSCRIBE ACL deny is not fixed by a new TCP
// socket; reconnecting just churns the broker. Operators get the
// distinct "NEVER received" alarm so they can address the ACL.
func TestMQTTStallWatchdog_NoForceReconnectOnNeverReceived(t *testing.T) {
defer snapshotAndResetRegistry(t)()

now := time.Now()
var reconnectCount atomic.Int32
s := &SourceLivenessState{
Tag: "acl-denied",
Broker: "tcp://x:1883",
IsConnectedFn: func() bool { return true },
ForceReconnectFn: func() { reconnectCount.Add(1) },
}
if err := registerLivenessState(s); err != nil {
t.Fatalf("setup: %v", err)
}

emit := func(args ...any) {}
processLivenessTransition(s, LivenessNeverReceived, "no msgs ever", now, emit)
// Settle any (incorrect) goroutine before counting.
time.Sleep(100 * time.Millisecond)

if got := reconnectCount.Load(); got != 0 {
t.Fatalf("LivenessNeverReceived must NOT force-reconnect (likely ACL deny — TCP churn won't help); got %d invocations", got)
}
}

// Safety: a source with no ForceReconnectFn wired (e.g. tests, or a
// source registered before the wiring was added) MUST NOT panic when
// LivenessStalled fires.
func TestMQTTStallWatchdog_NilForceReconnectFnIsSafe(t *testing.T) {
defer snapshotAndResetRegistry(t)()

now := time.Now()
s := &SourceLivenessState{
Tag: "no-reconnect-fn",
Broker: "tcp://x:1883",
IsConnectedFn: func() bool { return true },
// ForceReconnectFn deliberately nil.
}
if err := registerLivenessState(s); err != nil {
t.Fatalf("setup: %v", err)
}
defer func() {
if r := recover(); r != nil {
t.Fatalf("nil ForceReconnectFn must be a safe no-op; panicked: %v", r)
}
}()
processLivenessTransition(s, LivenessStalled, "stalled", now, func(args ...any) {})
}

// waitForReconnect polls reconnectCount until it reaches `want` or the
// deadline elapses. ForceReconnectFn runs in a goroutine in production
// (Disconnect+Connect can block on broker IO), so tests can't read the
// counter synchronously.
func waitForReconnect(t *testing.T, count *atomic.Int32, want int32, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if count.Load() >= want {
return
}
time.Sleep(5 * time.Millisecond)
}
}