diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index 42f87417c1997..4c39c63367929 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -15,6 +15,8 @@ import ( "time" "github.com/alitto/pond" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" @@ -23,6 +25,7 @@ import ( type packetListener struct { AllowedSources []net.IP + MulticastSource string Encoding string MaxDecompressionSize int64 SocketMode string @@ -36,9 +39,17 @@ type packetListener struct { parsePool *pond.WorkerPool } -func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int, allowedSources []net.IP, logger telegraf.Logger) *packetListener { +func newPacketListener( + encoding string, + maxDecompressionSize config.Size, + maxWorkers int, + allowedSources []net.IP, + multicastSource string, + logger telegraf.Logger, +) *packetListener { return &packetListener{ AllowedSources: allowedSources, + MulticastSource: multicastSource, Encoding: encoding, Log: logger, MaxDecompressionSize: int64(maxDecompressionSize), @@ -202,8 +213,16 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err return fmt.Errorf("resolving address of %q failed: %w", ifname, err) } } - conn, err = net.ListenMulticastUDP(u.Scheme, iface, addr) + + if l.MulticastSource != "" { + conn, err = listenSSM(u.Scheme, iface, addr, l.MulticastSource) + } else { + conn, err = net.ListenMulticastUDP(u.Scheme, iface, addr) + } if err != nil { + if conn != nil { + conn.Close() + } return fmt.Errorf("listening (udp multicast) failed: %w", err) } } else { @@ -280,3 +299,40 @@ func (l *packetListener) close() error { return nil } + +func listenSSM(network string, ifi *net.Interface, gaddr *net.UDPAddr, sourceAddr string) (*net.UDPConn, error) { + src := &net.UDPAddr{IP: net.ParseIP(sourceAddr)} + if src.IP == nil { + return nil, fmt.Errorf("invalid multicast_source address %q", sourceAddr) + } + + if network == "udp" { + if gaddr.IP.To4() != nil { + network = "udp4" + } else { + network = "udp6" + } + } + + conn, err := net.ListenUDP(network, gaddr) + if err != nil { + return nil, fmt.Errorf("creating UDP socket failed: %w", err) + } + + switch network { + case "udp4": + p := ipv4.NewPacketConn(conn) + if err := p.JoinSourceSpecificGroup(ifi, gaddr, src); err != nil { + return conn, fmt.Errorf("joining SSM group (IGMPv3) failed: %w", err) + } + case "udp6": + p := ipv6.NewPacketConn(conn) + if err := p.JoinSourceSpecificGroup(ifi, gaddr, src); err != nil { + return conn, fmt.Errorf("joining SSM group (MLDv2) failed: %w", err) + } + default: + return conn, fmt.Errorf("ssm support failure for network %q, use udp4 or udp6", network) + } + + return conn, nil +} diff --git a/plugins/common/socket/socket.conf b/plugins/common/socket/socket.conf index 52d872da44384..d60913af2a33d 100644 --- a/plugins/common/socket/socket.conf +++ b/plugins/common/socket/socket.conf @@ -38,4 +38,7 @@ ## List of allowed source IP addresses for incoming packets/messages. ## If not specified or empty, all sources are allowed. - # allowed_sources = [] \ No newline at end of file + # allowed_sources = [] + + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" \ No newline at end of file diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go index c0518c43f87e4..26e2ae6609fcc 100644 --- a/plugins/common/socket/socket.go +++ b/plugins/common/socket/socket.go @@ -38,6 +38,7 @@ type Config struct { MaxDecompressionSize config.Size `toml:"max_decompression_size"` MaxParallelParsers int `toml:"max_parallel_parsers"` AllowedSources []net.IP `toml:"allowed_sources"` + MulticastSource string `toml:"multicast_source"` common_tls.ServerConfig } @@ -125,19 +126,19 @@ func (s *Socket) Setup() error { } s.listener = l case "udp", "udp4", "udp6": - l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log) + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log) if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil { return err } s.listener = l case "ip", "ip4", "ip6": - l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log) + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log) if err := l.setupIP(s.url); err != nil { return err } s.listener = l case "unixgram": - l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log) + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log) if err := l.setupUnixgram(s.url, s.SocketMode, int(s.ReadBufferSize)); err != nil { return err } diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index 0255977c1f24a..5b28a9bd902af 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -89,6 +89,9 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details. ## If not specified or empty, all sources are allowed. # allowed_sources = [] + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" + ## Message splitting strategy and corresponding settings for stream sockets ## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet ## listeners such as udp. diff --git a/plugins/inputs/socket_listener/sample.conf b/plugins/inputs/socket_listener/sample.conf index ee326ed20be00..415577af8ea1a 100644 --- a/plugins/inputs/socket_listener/sample.conf +++ b/plugins/inputs/socket_listener/sample.conf @@ -55,6 +55,9 @@ ## If not specified or empty, all sources are allowed. # allowed_sources = [] + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" + ## Message splitting strategy and corresponding settings for stream sockets ## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet ## listeners such as udp. diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index af2fc26a364c5..98d8100b4d84d 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -94,6 +94,9 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details. ## If not specified or empty, all sources are allowed. # allowed_sources = [] + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" + ## Framing technique used for messages transport ## Available settings are: ## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1 diff --git a/plugins/inputs/syslog/sample.conf b/plugins/inputs/syslog/sample.conf index c60d886e12c92..cde4ec0c6f1d6 100644 --- a/plugins/inputs/syslog/sample.conf +++ b/plugins/inputs/syslog/sample.conf @@ -52,6 +52,9 @@ ## If not specified or empty, all sources are allowed. # allowed_sources = [] + ## Source IP for Source-Specific Multicast (SSM / IGMPv3) + # multicast_source = "" + ## Framing technique used for messages transport ## Available settings are: ## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1