From ae77e880e76b366656131515a6819f620438b075 Mon Sep 17 00:00:00 2001 From: maxcillius Date: Thu, 14 May 2026 17:00:25 +0530 Subject: [PATCH 1/5] feat(inputs.socket_listener): add SSM/IGMPv3 support via multicast_source option Signed-off-by: maxcillius --- plugins/common/socket/datagram.go | 46 +++++++++++++++++++++- plugins/common/socket/socket.go | 7 ++-- plugins/inputs/socket_listener/sample.conf | 6 +++ 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index 42f87417c1997..5bc060f74b6d5 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -15,6 +15,8 @@ import ( "time" "github.com/alitto/pond" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" @@ -23,6 +25,7 @@ import ( type packetListener struct { AllowedSources []net.IP + MulticastSource string Encoding string MaxDecompressionSize int64 SocketMode string @@ -36,9 +39,10 @@ type packetListener struct { parsePool *pond.WorkerPool } -func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int, allowedSources []net.IP, logger telegraf.Logger) *packetListener { +func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int, allowedSources []net.IP, multicastSource string, logger telegraf.Logger) *packetListener { return &packetListener{ AllowedSources: allowedSources, + MulticastSource: multicastSource, Encoding: encoding, Log: logger, MaxDecompressionSize: int64(maxDecompressionSize), @@ -202,7 +206,12 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err return fmt.Errorf("resolving address of %q failed: %w", ifname, err) } } - conn, err = net.ListenMulticastUDP(u.Scheme, iface, addr) + + if l.MulticastSource != "" { + conn, err = listenSSM(u.Scheme, iface, addr, l.MulticastSource) + } else { + conn, err = net.ListenMulticastUDP(u.Scheme, iface, addr) + } if err != nil { return fmt.Errorf("listening (udp multicast) failed: %w", err) } @@ -280,3 +289,36 @@ func (l *packetListener) close() error { return nil } + +func listenSSM(network string, ifi *net.Interface, gaddr *net.UDPAddr, sourceAddr string) (*net.UDPConn, error) { + conn, err := net.ListenUDP(network, gaddr) + if err != nil { + return nil, fmt.Errorf("creating UDP socket failed: %w", err) + } + + src := &net.UDPAddr{IP: net.ParseIP(sourceAddr)} + if src.IP == nil { + conn.Close() + return nil, fmt.Errorf("invalid multicast_source address %q", sourceAddr) + } + + switch network { + case "udp4": + p := ipv4.NewPacketConn(conn) + if err := p.JoinSourceSpecificGroup(ifi, gaddr, src); err != nil { + conn.Close() + return nil, fmt.Errorf("joining SSM group (IGMPv3) failed: %w", err) + } + case "udp6": + p := ipv6.NewPacketConn(conn) + if err := p.JoinSourceSpecificGroup(ifi, gaddr, src); err != nil { + conn.Close() + return nil, fmt.Errorf("joining SSM group (MLDv2) failed: %w", err) + } + default: + conn.Close() + return nil, fmt.Errorf("ssm support failure for network %q, use udp4 or udp6", network) + } + + return conn, nil +} diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go index c0518c43f87e4..26e2ae6609fcc 100644 --- a/plugins/common/socket/socket.go +++ b/plugins/common/socket/socket.go @@ -38,6 +38,7 @@ type Config struct { MaxDecompressionSize config.Size `toml:"max_decompression_size"` MaxParallelParsers int `toml:"max_parallel_parsers"` AllowedSources []net.IP `toml:"allowed_sources"` + MulticastSource string `toml:"multicast_source"` common_tls.ServerConfig } @@ -125,19 +126,19 @@ func (s *Socket) Setup() error { } s.listener = l case "udp", "udp4", "udp6": - l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log) + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log) if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil { return err } s.listener = l case "ip", "ip4", "ip6": - l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log) + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log) if err := l.setupIP(s.url); err != nil { return err } s.listener = l case "unixgram": - l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log) + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log) if err := l.setupUnixgram(s.url, s.SocketMode, int(s.ReadBufferSize)); err != nil { return err } diff --git a/plugins/inputs/socket_listener/sample.conf b/plugins/inputs/socket_listener/sample.conf index ee326ed20be00..2e7ae4c347304 100644 --- a/plugins/inputs/socket_listener/sample.conf +++ b/plugins/inputs/socket_listener/sample.conf @@ -98,3 +98,9 @@ ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md # data_format = "influx" + + ## Optional source IP for Source-Specific Multicast (SSM / IGMPv3). + ## When set, the service_address must be a multicast group address, and + ## the socket will join using IP_ADD_SOURCE_MULTICAST instead of + ## IP_ADD_MEMBERSHIP. Requires a udp4:// or udp6:// service_address. + # multicast_source = "" \ No newline at end of file From 21f0e2a6a445b95e8cfb503c5a644aed9fc2fefe Mon Sep 17 00:00:00 2001 From: maxcillius Date: Tue, 19 May 2026 22:16:37 +0530 Subject: [PATCH 2/5] fix(inputs.socket_listener): address reviewer feedback Signed-off-by: maxcillius --- plugins/common/socket/datagram.go | 31 +++++++++++++--------- plugins/inputs/socket_listener/sample.conf | 5 +--- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index 5bc060f74b6d5..9d4bcadb526fa 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -213,6 +213,9 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err conn, err = net.ListenMulticastUDP(u.Scheme, iface, addr) } if err != nil { + if conn != nil { + conn.Close() + } return fmt.Errorf("listening (udp multicast) failed: %w", err) } } else { @@ -291,33 +294,37 @@ func (l *packetListener) close() error { } func listenSSM(network string, ifi *net.Interface, gaddr *net.UDPAddr, sourceAddr string) (*net.UDPConn, error) { - conn, err := net.ListenUDP(network, gaddr) - if err != nil { - return nil, fmt.Errorf("creating UDP socket failed: %w", err) - } - src := &net.UDPAddr{IP: net.ParseIP(sourceAddr)} if src.IP == nil { - conn.Close() return nil, fmt.Errorf("invalid multicast_source address %q", sourceAddr) } + if network == "udp" { + if gaddr.IP.To4() != nil { + network = "udp4" + } else { + network = "udp6" + } + } + + conn, err := net.ListenUDP(network, gaddr) + if err != nil { + return nil, fmt.Errorf("creating UDP socket failed: %w", err) + } + switch network { case "udp4": p := ipv4.NewPacketConn(conn) if err := p.JoinSourceSpecificGroup(ifi, gaddr, src); err != nil { - conn.Close() - return nil, fmt.Errorf("joining SSM group (IGMPv3) failed: %w", err) + return conn, fmt.Errorf("joining SSM group (IGMPv3) failed: %w", err) } case "udp6": p := ipv6.NewPacketConn(conn) if err := p.JoinSourceSpecificGroup(ifi, gaddr, src); err != nil { - conn.Close() - return nil, fmt.Errorf("joining SSM group (MLDv2) failed: %w", err) + return conn, fmt.Errorf("joining SSM group (MLDv2) failed: %w", err) } default: - conn.Close() - return nil, fmt.Errorf("ssm support failure for network %q, use udp4 or udp6", network) + return conn, fmt.Errorf("ssm support failure for network %q, use udp4 or udp6", network) } return conn, nil diff --git a/plugins/inputs/socket_listener/sample.conf b/plugins/inputs/socket_listener/sample.conf index 2e7ae4c347304..accd000911f0b 100644 --- a/plugins/inputs/socket_listener/sample.conf +++ b/plugins/inputs/socket_listener/sample.conf @@ -99,8 +99,5 @@ ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md # data_format = "influx" - ## Optional source IP for Source-Specific Multicast (SSM / IGMPv3). - ## When set, the service_address must be a multicast group address, and - ## the socket will join using IP_ADD_SOURCE_MULTICAST instead of - ## IP_ADD_MEMBERSHIP. Requires a udp4:// or udp6:// service_address. + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) # multicast_source = "" \ No newline at end of file From c12d52f8bf8395ed9b178f077edb72a2c62cb6d6 Mon Sep 17 00:00:00 2001 From: maxcillius Date: Tue, 19 May 2026 22:39:41 +0530 Subject: [PATCH 3/5] docs(inputs.socket_listener): shorten multicast_source comment in sample config Signed-off-by: maxcillius --- plugins/common/socket/socket.conf | 5 ++++- plugins/inputs/socket_listener/README.md | 3 +++ plugins/inputs/socket_listener/sample.conf | 6 +++--- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/plugins/common/socket/socket.conf b/plugins/common/socket/socket.conf index 52d872da44384..d60913af2a33d 100644 --- a/plugins/common/socket/socket.conf +++ b/plugins/common/socket/socket.conf @@ -38,4 +38,7 @@ ## List of allowed source IP addresses for incoming packets/messages. ## If not specified or empty, all sources are allowed. - # allowed_sources = [] \ No newline at end of file + # allowed_sources = [] + + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" \ No newline at end of file diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index 0255977c1f24a..5b28a9bd902af 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -89,6 +89,9 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details. ## If not specified or empty, all sources are allowed. # allowed_sources = [] + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" + ## Message splitting strategy and corresponding settings for stream sockets ## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet ## listeners such as udp. diff --git a/plugins/inputs/socket_listener/sample.conf b/plugins/inputs/socket_listener/sample.conf index accd000911f0b..415577af8ea1a 100644 --- a/plugins/inputs/socket_listener/sample.conf +++ b/plugins/inputs/socket_listener/sample.conf @@ -55,6 +55,9 @@ ## If not specified or empty, all sources are allowed. # allowed_sources = [] + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" + ## Message splitting strategy and corresponding settings for stream sockets ## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet ## listeners such as udp. @@ -98,6 +101,3 @@ ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md # data_format = "influx" - - ## Source IP for Source-Specific Multicast (SSM / IGMPv3) - # multicast_source = "" \ No newline at end of file From 67d1ceaf179850c0fe9bf6cb3787418b5ce9c611 Mon Sep 17 00:00:00 2001 From: maxcillius Date: Wed, 20 May 2026 19:08:40 +0530 Subject: [PATCH 4/5] docs(inputs.syslog): regenerate README and sample config after adding multicast_source Signed-off-by: maxcillius --- plugins/inputs/syslog/README.md | 3 +++ plugins/inputs/syslog/sample.conf | 3 +++ 2 files changed, 6 insertions(+) diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index af2fc26a364c5..98d8100b4d84d 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -94,6 +94,9 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details. ## If not specified or empty, all sources are allowed. # allowed_sources = [] + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" + ## Framing technique used for messages transport ## Available settings are: ## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1 diff --git a/plugins/inputs/syslog/sample.conf b/plugins/inputs/syslog/sample.conf index c60d886e12c92..cde4ec0c6f1d6 100644 --- a/plugins/inputs/syslog/sample.conf +++ b/plugins/inputs/syslog/sample.conf @@ -52,6 +52,9 @@ ## If not specified or empty, all sources are allowed. # allowed_sources = [] + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" + ## Framing technique used for messages transport ## Available settings are: ## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1 From 591513baef37461c87c2f359af68647abd7908e8 Mon Sep 17 00:00:00 2001 From: maxcillius Date: Wed, 20 May 2026 20:28:05 +0530 Subject: [PATCH 5/5] style(common.socket): wrap newPacketListener parameter list for linter Signed-off-by: maxcillius --- plugins/common/socket/datagram.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index 9d4bcadb526fa..4c39c63367929 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -39,7 +39,14 @@ type packetListener struct { parsePool *pond.WorkerPool } -func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int, allowedSources []net.IP, multicastSource string, logger telegraf.Logger) *packetListener { +func newPacketListener( + encoding string, + maxDecompressionSize config.Size, + maxWorkers int, + allowedSources []net.IP, + multicastSource string, + logger telegraf.Logger, +) *packetListener { return &packetListener{ AllowedSources: allowedSources, MulticastSource: multicastSource,