Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,31 @@ func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions {
if tag == "" {
tag = source.Broker
}
// #1337: paho defaults silently throttle delivery on this broker.
// - CleanSession=true + empty ClientID (random per reconnect) made the
// broker treat every reconnect as a brand-new session and discard the
// backlog it had queued since the previous disconnect. With watchdog
// reconnects every ~5min on staging, this lost ~99% of messages.
// - Order=true serialized the default publish handler; one slow packet
// blocked all others, compounding the loss under bursts.
// Fix: persistent unique ClientID + CleanSession=false (broker keeps
// our subscription state across reconnects and forwards what we missed),
// explicit KeepAlive so half-open TCP is detected at the paho layer, and
// Order=false for parallel handler dispatch.
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown-host"
}
clientID := "corescope-ingestor-" + hostname + "-" + tag

opts := mqtt.NewClientOptions().
AddBroker(source.Broker).
SetClientID(clientID).
SetCleanSession(false).
SetKeepAlive(30 * time.Second).
SetOrderMatters(false).
SetAutoReconnect(true).
SetConnectRetry(true).
SetOrderMatters(true).
SetMaxReconnectInterval(30 * time.Second).
SetConnectTimeout(10 * time.Second).
SetWriteTimeout(10 * time.Second)
Expand Down
85 changes: 85 additions & 0 deletions cmd/ingestor/mqtt_session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"os"
"strings"
"testing"
"time"
)

// Issue #1337: paho client misconfigured — ingestor receives 200× fewer
// messages than mosquitto_sub on the same broker/creds/topics. Root cause
// (hypothesis 1+5): paho defaults — CleanSession=true, empty ClientID
// (auto-random per reconnect), Order=true (handler serialized) — combined
// with the reconnect-every-5min watchdog meant the broker dropped queued
// messages on every reconnect AND the handler couldn't keep up under load.
//
// These tests pin the four paho options that fix the gap:
// 1. CleanSession=false — broker keeps the subscription state across
// reconnects instead of treating each dial
// as a brand-new session.
// 2. ClientID = persistent — broker recognizes the returning session.
// Empty ClientID makes paho generate a fresh
// random one on every reconnect, which is
// treated as a new client by the broker.
// 3. KeepAlive = 30s — half-open TCP detected at the paho layer
// instead of waiting for OS keepalive.
// 4. Order = false — handler dispatch is parallel; one slow
// packet does not block all the others.
//
// All four must be set in buildMQTTOpts. This test fails on master.

func TestBuildMQTTOpts_PersistentSession_Issue1337(t *testing.T) {
source := MQTTSource{
Broker: "ssl://broker.example:8883",
Name: "sjc-test",
}
opts := buildMQTTOpts(source)

if opts.CleanSession {
t.Error("CleanSession must be false (#1337): broker drops queued msgs across reconnects when true")
}

host, _ := os.Hostname()
if opts.ClientID == "" {
t.Fatal("ClientID must be set to a persistent value (#1337): empty = paho generates random per reconnect, broker treats every reconnect as new session")
}
if !strings.Contains(opts.ClientID, "sjc-test") {
t.Errorf("ClientID must embed source name for uniqueness across sources, got %q", opts.ClientID)
}
if host != "" && !strings.Contains(opts.ClientID, host) {
t.Errorf("ClientID must embed hostname for uniqueness across deployments, got %q (host=%q)", opts.ClientID, host)
}

if opts.KeepAlive != int64((30 * time.Second).Seconds()) {
t.Errorf("KeepAlive must be 30s (#1337): got %ds — needed so paho detects half-open TCP", opts.KeepAlive)
}

if opts.Order {
t.Error("Order must be false (#1337): default true serializes handler dispatch; a slow packet stalls all others")
}
}

// Stability: ClientID must be deterministic for a given (hostname, source)
// across two builds. Otherwise reconnect = new session = lost backlog.
func TestBuildMQTTOpts_ClientIDStableAcrossBuilds_Issue1337(t *testing.T) {
source := MQTTSource{Broker: "ssl://broker.example:8883", Name: "stable-test"}
a := buildMQTTOpts(source).ClientID
b := buildMQTTOpts(source).ClientID
if a == "" {
t.Fatal("ClientID empty")
}
if a != b {
t.Errorf("ClientID must be stable across buildMQTTOpts calls (#1337): %q vs %q — random = broker drops session on reconnect", a, b)
}
}

// Distinct sources must NOT share a ClientID — broker disconnects the older
// session whenever a duplicate ClientID connects, causing flapping.
func TestBuildMQTTOpts_ClientIDUniquePerSource_Issue1337(t *testing.T) {
a := buildMQTTOpts(MQTTSource{Broker: "ssl://a:8883", Name: "alpha"}).ClientID
b := buildMQTTOpts(MQTTSource{Broker: "ssl://b:8883", Name: "beta"}).ClientID
if a == b {
t.Errorf("distinct sources must get distinct ClientIDs (#1337): both got %q — duplicate IDs cause broker to disconnect the older one, infinite flap", a)
}
}