Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"time"

"github.com/alitto/pond"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
Expand All @@ -23,6 +25,7 @@ import (

type packetListener struct {
AllowedSources []net.IP
MulticastSource string
Encoding string
MaxDecompressionSize int64
SocketMode string
Expand All @@ -36,9 +39,17 @@ type packetListener struct {
parsePool *pond.WorkerPool
}

func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int, allowedSources []net.IP, logger telegraf.Logger) *packetListener {
func newPacketListener(
encoding string,
maxDecompressionSize config.Size,
maxWorkers int,
allowedSources []net.IP,
multicastSource string,
logger telegraf.Logger,
) *packetListener {
return &packetListener{
AllowedSources: allowedSources,
MulticastSource: multicastSource,
Encoding: encoding,
Log: logger,
MaxDecompressionSize: int64(maxDecompressionSize),
Expand Down Expand Up @@ -202,8 +213,16 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err
return fmt.Errorf("resolving address of %q failed: %w", ifname, err)
}
}
conn, err = net.ListenMulticastUDP(u.Scheme, iface, addr)

if l.MulticastSource != "" {
conn, err = listenSSM(u.Scheme, iface, addr, l.MulticastSource)
} else {
conn, err = net.ListenMulticastUDP(u.Scheme, iface, addr)
}
if err != nil {
if conn != nil {
conn.Close()
}
return fmt.Errorf("listening (udp multicast) failed: %w", err)
}
} else {
Expand Down Expand Up @@ -280,3 +299,40 @@ func (l *packetListener) close() error {

return nil
}

func listenSSM(network string, ifi *net.Interface, gaddr *net.UDPAddr, sourceAddr string) (*net.UDPConn, error) {
Comment thread
srebhan marked this conversation as resolved.
src := &net.UDPAddr{IP: net.ParseIP(sourceAddr)}
if src.IP == nil {
return nil, fmt.Errorf("invalid multicast_source address %q", sourceAddr)
}

if network == "udp" {
if gaddr.IP.To4() != nil {
network = "udp4"
} else {
network = "udp6"
}
}

conn, err := net.ListenUDP(network, gaddr)
if err != nil {
return nil, fmt.Errorf("creating UDP socket failed: %w", err)
}

switch network {
case "udp4":
Comment thread
srebhan marked this conversation as resolved.
p := ipv4.NewPacketConn(conn)
if err := p.JoinSourceSpecificGroup(ifi, gaddr, src); err != nil {
return conn, fmt.Errorf("joining SSM group (IGMPv3) failed: %w", err)
}
case "udp6":
p := ipv6.NewPacketConn(conn)
if err := p.JoinSourceSpecificGroup(ifi, gaddr, src); err != nil {
return conn, fmt.Errorf("joining SSM group (MLDv2) failed: %w", err)
}
default:
return conn, fmt.Errorf("ssm support failure for network %q, use udp4 or udp6", network)
}

return conn, nil
}
5 changes: 4 additions & 1 deletion plugins/common/socket/socket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@

## List of allowed source IP addresses for incoming packets/messages.
## If not specified or empty, all sources are allowed.
# allowed_sources = []
# allowed_sources = []

## Source IP for Source-Specific Multicast (SSM / IGMPv3)
# multicast_source = ""
7 changes: 4 additions & 3 deletions plugins/common/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
MaxParallelParsers int `toml:"max_parallel_parsers"`
AllowedSources []net.IP `toml:"allowed_sources"`
MulticastSource string `toml:"multicast_source"`
common_tls.ServerConfig
}

Expand Down Expand Up @@ -125,19 +126,19 @@ func (s *Socket) Setup() error {
}
s.listener = l
case "udp", "udp4", "udp6":
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log)
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log)
if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil {
return err
}
s.listener = l
case "ip", "ip4", "ip6":
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log)
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log)
if err := l.setupIP(s.url); err != nil {
return err
}
s.listener = l
case "unixgram":
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.log)
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers, s.AllowedSources, s.MulticastSource, s.log)
if err := l.setupUnixgram(s.url, s.SocketMode, int(s.ReadBufferSize)); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/socket_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details.
## If not specified or empty, all sources are allowed.
# allowed_sources = []

## Source IP for Source-Specific Multicast (SSM / IGMPv3)
# multicast_source = ""

## Message splitting strategy and corresponding settings for stream sockets
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
## listeners such as udp.
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/socket_listener/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
## If not specified or empty, all sources are allowed.
# allowed_sources = []

## Source IP for Source-Specific Multicast (SSM / IGMPv3)
# multicast_source = ""

## Message splitting strategy and corresponding settings for stream sockets
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
## listeners such as udp.
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/syslog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details.
## If not specified or empty, all sources are allowed.
# allowed_sources = []

## Source IP for Source-Specific Multicast (SSM / IGMPv3)
# multicast_source = ""

## Framing technique used for messages transport
## Available settings are:
## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/syslog/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
## If not specified or empty, all sources are allowed.
# allowed_sources = []

## Source IP for Source-Specific Multicast (SSM / IGMPv3)
# multicast_source = ""

## Framing technique used for messages transport
## Available settings are:
## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1
Expand Down
Loading