diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index 800faab7..f7b2e1f3 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -286,15 +286,24 @@ func LoadConfig(path string) (*Config, error) { } // ResolvedSources returns the final list of MQTT sources to connect to. +// +// Scheme mapping: +// +// mqtt:// → tcp:// (paho plain TCP) +// mqtts:// → ssl:// (paho TLS over TCP) +// ws:// (paho WebSocket — passed through, no mapping needed) +// wss:// (paho WebSocket TLS — passed through, no mapping needed) func (c *Config) ResolvedSources() []MQTTSource { for i := range c.MQTTSources { - // paho uses tcp:// and ssl:// not mqtt:// and mqtts:// + // paho uses tcp:// and ssl:// for plain MQTT; ws:// and wss:// are accepted natively. b := c.MQTTSources[i].Broker if strings.HasPrefix(b, "mqtt://") { c.MQTTSources[i].Broker = "tcp://" + b[7:] } else if strings.HasPrefix(b, "mqtts://") { c.MQTTSources[i].Broker = "ssl://" + b[8:] } + // ws:// and wss:// pass through unchanged — paho handles WebSocket + // connections natively via gorilla/websocket. } return c.MQTTSources } diff --git a/cmd/ingestor/config_test.go b/cmd/ingestor/config_test.go index cb9e6098..d1b984bc 100644 --- a/cmd/ingestor/config_test.go +++ b/cmd/ingestor/config_test.go @@ -394,3 +394,93 @@ func TestMQTTSourceRegionField(t *testing.T) { t.Fatalf("expected region PDX, got %q", cfg.MQTTSources[0].Region) } } + +// TestResolvedSourcesSchemeMapping verifies that mqtt:// and mqtts:// are translated +// to the paho-native tcp:// and ssl:// schemes, while ws:// and wss:// pass through +// unchanged (paho handles WebSocket connections natively). +func TestResolvedSourcesSchemeMapping(t *testing.T) { + tests := []struct { + input string + want string + }{ + {"mqtt://host:1883", "tcp://host:1883"}, + {"mqtts://host:8883", "ssl://host:8883"}, + {"tcp://host:1883", "tcp://host:1883"}, + {"ssl://host:8883", "ssl://host:8883"}, + {"ws://host:9001", "ws://host:9001"}, + {"wss://host:9001", "wss://host:9001"}, + {"ws://host:9001/mqtt", "ws://host:9001/mqtt"}, + {"wss://host:9001/mqtt", "wss://host:9001/mqtt"}, + } + + for _, tt := range tests { + cfg := &Config{ + MQTTSources: []MQTTSource{ + {Name: "test", Broker: tt.input, Topics: []string{"meshcore/#"}}, + }, + } + sources := cfg.ResolvedSources() + if got := sources[0].Broker; got != tt.want { + t.Errorf("ResolvedSources(%q) = %q, want %q", tt.input, got, tt.want) + } + } +} + +// TestLoadConfigWSSource verifies that a WebSocket MQTT source round-trips through +// LoadConfig correctly — username/password preserved, scheme unchanged. +func TestLoadConfigWSSource(t *testing.T) { + t.Setenv("DB_PATH", "") + t.Setenv("MQTT_BROKER", "") + + dir := t.TempDir() + cfgPath := filepath.Join(dir, "config.json") + os.WriteFile(cfgPath, []byte(`{ + "dbPath": "test.db", + "mqttSources": [ + { + "name": "local-tcp", + "broker": "mqtt://localhost:1883", + "topics": ["meshcore/#"] + }, + { + "name": "wsmqtt-ws", + "broker": "wss://wsmqtt.example.com/mqtt", + "username": "corescope", + "password": "s3cr3t", + "topics": ["meshcore/#"] + } + ] + }`), 0o644) + + cfg, err := LoadConfig(cfgPath) + if err != nil { + t.Fatal(err) + } + if len(cfg.MQTTSources) != 2 { + t.Fatalf("mqttSources len=%d, want 2", len(cfg.MQTTSources)) + } + + tcp := cfg.MQTTSources[0] + if tcp.Name != "local-tcp" { + t.Errorf("name=%s, want local-tcp", tcp.Name) + } + + ws := cfg.MQTTSources[1] + if ws.Name != "wsmqtt-ws" { + t.Errorf("name=%s, want wsmqtt-ws", ws.Name) + } + if ws.Broker != "wss://wsmqtt.example.com/mqtt" { + t.Errorf("broker=%s, want wss://wsmqtt.example.com/mqtt", ws.Broker) + } + if ws.Username != "corescope" { + t.Errorf("username=%s, want corescope", ws.Username) + } + if ws.Password != "s3cr3t" { + t.Errorf("password=%s, want s3cr3t", ws.Password) + } + + sources := cfg.ResolvedSources() + if sources[1].Broker != "wss://wsmqtt.example.com/mqtt" { + t.Errorf("ResolvedSources wss broker=%s, want unchanged", sources[1].Broker) + } +} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index fc3be6ad..98b82abe 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -396,7 +396,9 @@ func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions { } if source.RejectUnauthorized != nil && !*source.RejectUnauthorized { opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) - } else if strings.HasPrefix(source.Broker, "ssl://") { + } else if strings.HasPrefix(source.Broker, "ssl://") || strings.HasPrefix(source.Broker, "wss://") { + // TLS with system CA pool — valid for ssl:// MQTT brokers and + // wss:// WebSocket brokers behind a publicly-trusted certificate. opts.SetTLSConfig(&tls.Config{}) } return opts diff --git a/config.example.json b/config.example.json index 41102e70..27c22b65 100644 --- a/config.example.json +++ b/config.example.json @@ -135,6 +135,16 @@ ], "region": "SJC", "connectTimeoutSec": 45 + }, + { + "_comment": "WebSocket MQTT broker (e.g. meshcore-mqtt-broker). Use ws:// for plain WebSocket or wss:// for TLS. Username/password supported.", + "name": "wsmqtt", + "broker": "wss://wsmqtt.example.com/mqtt", + "username": "corescope", + "password": "your-password", + "topics": [ + "meshcore/#" + ] } ], "channelKeys": { @@ -262,7 +272,7 @@ "criticalMv": 3000, "_comment": "Voltage cutoffs (millivolts) for the per-node battery trend chart on /node-analytics. Latest sample below lowMv shows the node as ⚠️ Low; below criticalMv shows 🪫 Critical. Both default to 3300 / 3000 if omitted. Source data: observer_metrics.battery_mv populated from observer status messages; only nodes that are themselves observers (matching pubkey ↔ observer id) yield a series. Issue #663." }, - "_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional). region: default IATA region for this source — used when packet/topic doesn't specify one (optional, priority: payload > topic > this field).", + "_comment_mqttSources": "Each source connects to an MQTT broker. Supported schemes: mqtt:// (plain TCP), mqtts:// (TLS), ws:// (WebSocket), wss:// (WebSocket TLS). topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional). region: default IATA region for this source — used when packet/topic doesn't specify one (optional, priority: payload > topic > this field).", "compression": { "gzip": false, "websocket": false, @@ -279,7 +289,6 @@ ] }, "_comment_compression": "Opt-in HTTP gzip middleware + WebSocket permessage-deflate. Both default to false — enable ONLY when your upstream reverse proxy is NOT already compressing. gzip: enables the gzipMiddleware wrapper around the HTTP handler. websocket: sets gorilla websocket Upgrader.EnableCompression. level: gzip compression level 1..9 (1=BestSpeed, 9=BestCompression, default 6). minSizeBytes: advisory minimum response size below which compression would not pay off. contentTypes: MIME allow-list — only responses with these Content-Type values are compressed. Already-compressed types (image/*, video/*, audio/*, application/zip, application/x-gzip, application/pdf, application/octet-stream) are always skipped, as are responses whose handler already set Content-Encoding. Omit contentTypes to use the built-in default allow-list.", - "_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).", "_comment_channelKeys": "Hex keys for decrypting channel messages. Key name = channel display name. public channel key is well-known.", "_comment_hashChannels": "Channel names whose keys are derived via SHA256. Key = SHA256(name)[:16]. Listed here so the ingestor can auto-derive keys.", "hashRegions": [