Skip to content

Commit

Permalink
add unix datagram socket support
Browse files Browse the repository at this point in the history
  • Loading branch information
vektah authored and smira committed Oct 29, 2024
1 parent 516dfdc commit cee3b7c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 3 deletions.
3 changes: 2 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type transport struct {
func NewClient(addr string, options ...Option) *Client {
opts := ClientOptions{
Addr: addr,
AddrNetwork: DefaultNetwork,
MetricPrefix: DefaultMetricPrefix,
MaxPacketSize: DefaultMaxPacketSize,
FlushInterval: DefaultFlushInterval,
Expand Down Expand Up @@ -106,7 +107,7 @@ func NewClient(addr string, options ...Option) *Client {

for i := 0; i < opts.SendLoopCount; i++ {
c.trans.shutdownWg.Add(1)
go c.trans.sendLoop(opts.Addr, opts.ReconnectInterval, opts.RetryTimeout, opts.Logger)
go c.trans.sendLoop(opts.Addr, opts.AddrNetwork, opts.ReconnectInterval, opts.RetryTimeout, opts.Logger)
}

if opts.ReportInterval > 0 {
Expand Down
37 changes: 37 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ SOFTWARE.
*/

import (
"fmt"
"math/rand"
"net"
"strconv"
Expand Down Expand Up @@ -386,6 +387,42 @@ func BenchmarkSimple(b *testing.B) {
_ = inSocket.Close()
}

func BenchmarkSimpleUnixSocket(b *testing.B) {
socket := fmt.Sprintf("/tmp/go-statsd-%d", time.Now().UnixNano())
inSocket, err := net.ListenUnixgram("unixgram", &net.UnixAddr{Name: socket, Net: "unixgram"})
if err != nil {
b.Error(err)
return
}
if err := inSocket.SetReadBuffer(1024_000); err != nil {
b.Error(err)
return
}
go func() {
buf := make([]byte, 1500)
for {
_, err := inSocket.Read(buf)
if err != nil {
return
}
}
}()

c := NewClient(socket, Network("unixgram"), MetricPrefix("metricPrefix"),
MaxPacketSize(1432), FlushInterval(100*time.Millisecond), SendLoopCount(1))

b.ResetTimer()

for i := 0; i < b.N; i++ {
c.Incr("foo.bar.counter", 1)
c.Gauge("foo.bar.gauge", 42)
c.PrecisionTiming("foo.bar.timing", 153*time.Millisecond)
}
time.Sleep(1 * time.Millisecond)
_ = c.Close()
_ = inSocket.Close()
}

func BenchmarkComplexDelivery(b *testing.B) {
inSocket, err := net.ListenUDP("udp4", &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Expand Down
4 changes: 2 additions & 2 deletions loops.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (t *transport) flushLoop(flushInterval time.Duration) {
}

// sendLoop handles packet delivery over UDP and periodic reconnects
func (t *transport) sendLoop(addr string, reconnectInterval, retryTimeout time.Duration, log SomeLogger) {
func (t *transport) sendLoop(addr string, network string, reconnectInterval, retryTimeout time.Duration, log SomeLogger) {
var (
sock net.Conn
err error
Expand Down Expand Up @@ -94,7 +94,7 @@ RECONNECT:
}()

var d net.Dialer
return d.DialContext(ctx, "udp", addr)
return d.DialContext(ctx, network, addr)
}()

if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
DefaultBufPoolCapacity = 20
DefaultSendQueueCapacity = 10
DefaultSendLoopCount = 1
DefaultNetwork = "udp"
)

// SomeLogger defines logging interface that allows using 3rd party loggers
Expand All @@ -53,6 +54,9 @@ type ClientOptions struct {
// Addr is statsd server address in "host:port" format
Addr string

// AddrNetwork is network type for the address. Defaults to udp.
AddrNetwork string

// MetricPrefix is metricPrefix to prepend to every metric being sent
//
// If not set defaults to empty string
Expand Down Expand Up @@ -261,3 +265,10 @@ func DefaultTags(tags ...Tag) Option {
c.DefaultTags = tags
}
}

// Network sets the network to use Dialing the statsd server
func Network(network string) Option {
return func(c *ClientOptions) {
c.AddrNetwork = network
}
}

0 comments on commit cee3b7c

Please sign in to comment.