Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cmd/ingestor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,24 @@ func LoadConfig(path string) (*Config, error) {
}

// ResolvedSources returns the final list of MQTT sources to connect to.
//
// Scheme mapping:
//
// mqtt:// → tcp:// (paho plain TCP)
// mqtts:// → ssl:// (paho TLS over TCP)
// ws:// (paho WebSocket — passed through, no mapping needed)
// wss:// (paho WebSocket TLS — passed through, no mapping needed)
func (c *Config) ResolvedSources() []MQTTSource {
for i := range c.MQTTSources {
// paho uses tcp:// and ssl:// not mqtt:// and mqtts://
// paho uses tcp:// and ssl:// for plain MQTT; ws:// and wss:// are accepted natively.
b := c.MQTTSources[i].Broker
if strings.HasPrefix(b, "mqtt://") {
c.MQTTSources[i].Broker = "tcp://" + b[7:]
} else if strings.HasPrefix(b, "mqtts://") {
c.MQTTSources[i].Broker = "ssl://" + b[8:]
}
// ws:// and wss:// pass through unchanged — paho handles WebSocket
// connections natively via gorilla/websocket.
}
return c.MQTTSources
}
90 changes: 90 additions & 0 deletions cmd/ingestor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,93 @@ func TestMQTTSourceRegionField(t *testing.T) {
t.Fatalf("expected region PDX, got %q", cfg.MQTTSources[0].Region)
}
}

// TestResolvedSourcesSchemeMapping verifies that mqtt:// and mqtts:// are translated
// to the paho-native tcp:// and ssl:// schemes, while ws:// and wss:// pass through
// unchanged (paho handles WebSocket connections natively).
func TestResolvedSourcesSchemeMapping(t *testing.T) {
tests := []struct {
input string
want string
}{
{"mqtt://host:1883", "tcp://host:1883"},
{"mqtts://host:8883", "ssl://host:8883"},
{"tcp://host:1883", "tcp://host:1883"},
{"ssl://host:8883", "ssl://host:8883"},
{"ws://host:9001", "ws://host:9001"},
{"wss://host:9001", "wss://host:9001"},
{"ws://host:9001/mqtt", "ws://host:9001/mqtt"},
{"wss://host:9001/mqtt", "wss://host:9001/mqtt"},
}

for _, tt := range tests {
cfg := &Config{
MQTTSources: []MQTTSource{
{Name: "test", Broker: tt.input, Topics: []string{"meshcore/#"}},
},
}
sources := cfg.ResolvedSources()
if got := sources[0].Broker; got != tt.want {
t.Errorf("ResolvedSources(%q) = %q, want %q", tt.input, got, tt.want)
}
}
}

// TestLoadConfigWSSource verifies that a WebSocket MQTT source round-trips through
// LoadConfig correctly — username/password preserved, scheme unchanged.
func TestLoadConfigWSSource(t *testing.T) {
t.Setenv("DB_PATH", "")
t.Setenv("MQTT_BROKER", "")

dir := t.TempDir()
cfgPath := filepath.Join(dir, "config.json")
os.WriteFile(cfgPath, []byte(`{
"dbPath": "test.db",
"mqttSources": [
{
"name": "local-tcp",
"broker": "mqtt://localhost:1883",
"topics": ["meshcore/#"]
},
{
"name": "wsmqtt-ws",
"broker": "wss://wsmqtt.example.com/mqtt",
"username": "corescope",
"password": "s3cr3t",
"topics": ["meshcore/#"]
}
]
}`), 0o644)

cfg, err := LoadConfig(cfgPath)
if err != nil {
t.Fatal(err)
}
if len(cfg.MQTTSources) != 2 {
t.Fatalf("mqttSources len=%d, want 2", len(cfg.MQTTSources))
}

tcp := cfg.MQTTSources[0]
if tcp.Name != "local-tcp" {
t.Errorf("name=%s, want local-tcp", tcp.Name)
}

ws := cfg.MQTTSources[1]
if ws.Name != "wsmqtt-ws" {
t.Errorf("name=%s, want wsmqtt-ws", ws.Name)
}
if ws.Broker != "wss://wsmqtt.example.com/mqtt" {
t.Errorf("broker=%s, want wss://wsmqtt.example.com/mqtt", ws.Broker)
}
if ws.Username != "corescope" {
t.Errorf("username=%s, want corescope", ws.Username)
}
if ws.Password != "s3cr3t" {
t.Errorf("password=%s, want s3cr3t", ws.Password)
}

sources := cfg.ResolvedSources()
if sources[1].Broker != "wss://wsmqtt.example.com/mqtt" {
t.Errorf("ResolvedSources wss broker=%s, want unchanged", sources[1].Broker)
}
}
4 changes: 3 additions & 1 deletion cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,9 @@ func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions {
}
if source.RejectUnauthorized != nil && !*source.RejectUnauthorized {
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
} else if strings.HasPrefix(source.Broker, "ssl://") {
} else if strings.HasPrefix(source.Broker, "ssl://") || strings.HasPrefix(source.Broker, "wss://") {
// TLS with system CA pool — valid for ssl:// MQTT brokers and
// wss:// WebSocket brokers behind a publicly-trusted certificate.
opts.SetTLSConfig(&tls.Config{})
}
return opts
Expand Down
13 changes: 11 additions & 2 deletions config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@
],
"region": "SJC",
"connectTimeoutSec": 45
},
{
"_comment": "WebSocket MQTT broker (e.g. meshcore-mqtt-broker). Use ws:// for plain WebSocket or wss:// for TLS. Username/password supported.",
"name": "wsmqtt",
"broker": "wss://wsmqtt.example.com/mqtt",
"username": "corescope",
"password": "your-password",
"topics": [
"meshcore/#"
]
}
],
"channelKeys": {
Expand Down Expand Up @@ -262,7 +272,7 @@
"criticalMv": 3000,
"_comment": "Voltage cutoffs (millivolts) for the per-node battery trend chart on /node-analytics. Latest sample below lowMv shows the node as ⚠️ Low; below criticalMv shows 🪫 Critical. Both default to 3300 / 3000 if omitted. Source data: observer_metrics.battery_mv populated from observer status messages; only nodes that are themselves observers (matching pubkey ↔ observer id) yield a series. Issue #663."
},
"_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional). region: default IATA region for this source — used when packet/topic doesn't specify one (optional, priority: payload > topic > this field).",
"_comment_mqttSources": "Each source connects to an MQTT broker. Supported schemes: mqtt:// (plain TCP), mqtts:// (TLS), ws:// (WebSocket), wss:// (WebSocket TLS). topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional). region: default IATA region for this source — used when packet/topic doesn't specify one (optional, priority: payload > topic > this field).",
"compression": {
"gzip": false,
"websocket": false,
Expand All @@ -279,7 +289,6 @@
]
},
"_comment_compression": "Opt-in HTTP gzip middleware + WebSocket permessage-deflate. Both default to false — enable ONLY when your upstream reverse proxy is NOT already compressing. gzip: enables the gzipMiddleware wrapper around the HTTP handler. websocket: sets gorilla websocket Upgrader.EnableCompression. level: gzip compression level 1..9 (1=BestSpeed, 9=BestCompression, default 6). minSizeBytes: advisory minimum response size below which compression would not pay off. contentTypes: MIME allow-list — only responses with these Content-Type values are compressed. Already-compressed types (image/*, video/*, audio/*, application/zip, application/x-gzip, application/pdf, application/octet-stream) are always skipped, as are responses whose handler already set Content-Encoding. Omit contentTypes to use the built-in default allow-list.",
"_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).",
"_comment_channelKeys": "Hex keys for decrypting channel messages. Key name = channel display name. public channel key is well-known.",
"_comment_hashChannels": "Channel names whose keys are derived via SHA256. Key = SHA256(name)[:16]. Listed here so the ingestor can auto-derive keys.",
"hashRegions": [
Expand Down