From 67857aed23a6e6fdddb5cd2de5989709905a4685 Mon Sep 17 00:00:00 2001 From: Phil Pearl Date: Mon, 29 Apr 2024 17:14:07 +0100 Subject: [PATCH 1/6] nsqd: get benchmarks running - Close sockets to ensure timely shutdown. This is the major improvement. Without this benchmarks would regularly time out - Switch to a logger with no output to make the benchmark output readable - Fix one unix socket benchmark to use unix socket opening function - Fixed BenchmarkGUID to be a valid test - and added a similar TestGUID With these changes I've also found I need to restrict b.N or NSQD takes too long to exit when N gets large. I've seen this in tests that publish only and do not receive messages. A large number of messages then need to be persisted and this takes tens of seconds. Once these changes are made `go test -bench . -run ^$ -benchtime 1000x` completes cleanly for me. --- internal/test/logger.go | 10 ++++++ nsqd/guid_test.go | 28 +++++++++++------ nsqd/http_test.go | 7 ++--- nsqd/protocol_v2_test.go | 43 +++++++++++++++----------- nsqd/protocol_v2_unixsocket_test.go | 47 +++++++++++++++-------------- nsqd/topic_test.go | 4 +-- 6 files changed, 84 insertions(+), 55 deletions(-) diff --git a/internal/test/logger.go b/internal/test/logger.go index c64a615eb..48c5105d4 100644 --- a/internal/test/logger.go +++ b/internal/test/logger.go @@ -20,3 +20,13 @@ func (tl *testLogger) Output(maxdepth int, s string) error { func NewTestLogger(tbl tbLog) Logger { return &testLogger{tbl} } + +// NilLogger is a Logger that produces no output. It may be useful in +// benchmarks, where otherwise the output would be distracting. The benchmarking +// package restricts output to 10 lines anyway, so logging via the testing.B is +// not very useful. +type NilLogger struct{} + +func (l NilLogger) Output(maxdepth int, s string) error { + return nil +} diff --git a/nsqd/guid_test.go b/nsqd/guid_test.go index dd98d9638..2219eeb28 100644 --- a/nsqd/guid_test.go +++ b/nsqd/guid_test.go @@ -23,20 +23,30 @@ func BenchmarkGUIDUnsafe(b *testing.B) { } func BenchmarkGUID(b *testing.B) { - var okays, errors, fails int64 var previd guid - factory := &guidFactory{} + factory := NewGUIDFactory(37) for i := 0; i < b.N; i++ { id, err := factory.NewGUID() if err != nil { - errors++ - } else if id == previd { - fails++ - b.Fail() - } else { - okays++ + b.Fatal(err) + } else if id <= previd { + b.Fatal("repeated or descending id") } + previd = id id.Hex() } - b.Logf("okays=%d errors=%d bads=%d", okays, errors, fails) +} + +func TestGUID(t *testing.T) { + factory := NewGUIDFactory(1) + var previd guid + for i := 0; i < 1000; i++ { + id, err := factory.NewGUID() + if err != nil { + t.Fatal(err) + } else if id <= previd { + t.Fatal("repeated or descending id") + } + previd = id + } } diff --git a/nsqd/http_test.go b/nsqd/http_test.go index 5fdb9b3d3..f77ba5fdd 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -519,9 +519,8 @@ func TestHTTPClientStats(t *testing.T) { var d struct { Topics []struct { Channels []struct { - ClientCount int `json:"client_count"` - Clients []struct { - } `json:"clients"` + ClientCount int `json:"client_count"` + Clients []struct{} `json:"clients"` } `json:"channels"` } `json:"topics"` Memory *struct{} `json:"memory,omitempty"` @@ -899,7 +898,7 @@ func BenchmarkHTTPpub(b *testing.B) { var wg sync.WaitGroup b.StopTimer() opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 5ef34e93e..7b340f3c2 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -1616,7 +1616,7 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { func BenchmarkProtocolV2Exec(b *testing.B) { b.StopTimer() opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} nsqd, _ := New(opts) p := &protocolV2{nsqd} c := newClientV2(0, nil, nsqd) @@ -1630,11 +1630,10 @@ func BenchmarkProtocolV2Exec(b *testing.B) { func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { var wg sync.WaitGroup - b.StopTimer() opts := NewOptions() size := 200 batchSize := int(opts.MaxBodySize) / (size + 4) - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1644,7 +1643,7 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { batch[i] = msg } b.SetBytes(int64(len(msg))) - b.StartTimer() + b.ResetTimer() for j := 0; j < numTopics; j++ { topicName := fmt.Sprintf("bench_v2_pub_multi_topic_%d_%d", j, time.Now().Unix()) @@ -1657,7 +1656,8 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) num := b.N / numTopics / batchSize - wg.Add(1) + var subWG sync.WaitGroup + subWG.Add(1) go func() { for i := 0; i < num; i++ { cmd, _ := nsq.MultiPublish(topicName, batch) @@ -1670,9 +1670,9 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { panic(err.Error()) } } - wg.Done() + subWG.Done() }() - wg.Add(1) + subWG.Add(1) go func() { for i := 0; i < num; i++ { resp, err := nsq.ReadResponse(rw) @@ -1684,8 +1684,10 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { panic("invalid response") } } - wg.Done() + subWG.Done() }() + subWG.Wait() + conn.Close() wg.Done() }() } @@ -1693,6 +1695,8 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { wg.Wait() b.StopTimer() + // This benchmark does not drain the receive side of the connection, so + // exiting here can take some time as it persists the messages to disk. nsqd.Exit() } @@ -1705,10 +1709,9 @@ func BenchmarkProtocolV2PubMultiTopic32(b *testing.B) { benchmarkProtocolV2PubMu func benchmarkProtocolV2Pub(b *testing.B, size int) { var wg sync.WaitGroup - b.StopTimer() opts := NewOptions() batchSize := int(opts.MaxBodySize) / (size + 4) - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1719,19 +1722,21 @@ func benchmarkProtocolV2Pub(b *testing.B, size int) { } topicName := "bench_v2_pub" + strconv.Itoa(int(time.Now().Unix())) b.SetBytes(int64(len(msg))) - b.StartTimer() + b.ResetTimer() for j := 0; j < runtime.GOMAXPROCS(0); j++ { wg.Add(1) go func() { + var subWg sync.WaitGroup conn, err := mustConnectNSQD(tcpAddr) if err != nil { panic(err.Error()) } + defer conn.Close() rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) num := b.N / runtime.GOMAXPROCS(0) / batchSize - wg.Add(1) + subWg.Add(1) go func() { for i := 0; i < num; i++ { cmd, _ := nsq.MultiPublish(topicName, batch) @@ -1744,9 +1749,9 @@ func benchmarkProtocolV2Pub(b *testing.B, size int) { panic(err.Error()) } } - wg.Done() + subWg.Done() }() - wg.Add(1) + subWg.Add(1) go func() { for i := 0; i < num; i++ { resp, err := nsq.ReadResponse(rw) @@ -1758,8 +1763,9 @@ func benchmarkProtocolV2Pub(b *testing.B, size int) { panic("invalid response") } } - wg.Done() + subWg.Done() }() + subWg.Wait() wg.Done() }() } @@ -1788,7 +1794,7 @@ func benchmarkProtocolV2Sub(b *testing.B, size int) { var wg sync.WaitGroup b.StopTimer() opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1824,8 +1830,9 @@ func benchmarkProtocolV2Sub(b *testing.B, size int) { func subWorker(n int, workers int, tcpAddr net.Addr, topicName string, rdyChan chan int, goChan chan int) { conn, err := mustConnectNSQD(tcpAddr) if err != nil { - panic(err.Error()) + panic(fmt.Sprintf("connecting to %s: %s", tcpAddr, err.Error())) } + defer conn.Close() rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriterSize(conn, 65536)) identify(nil, conn, nil, frameTypeResponse) @@ -1882,7 +1889,7 @@ func benchmarkProtocolV2MultiSub(b *testing.B, num int) { b.StopTimer() opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) diff --git a/nsqd/protocol_v2_unixsocket_test.go b/nsqd/protocol_v2_unixsocket_test.go index 85820aca6..89b54ff20 100644 --- a/nsqd/protocol_v2_unixsocket_test.go +++ b/nsqd/protocol_v2_unixsocket_test.go @@ -1442,7 +1442,7 @@ func testUnixSocketIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetCon func BenchmarkUnixSocketProtocolV2Exec(b *testing.B) { b.StopTimer() opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} nsqd, _ := New(opts) p := &protocolV2{nsqd} c := newClientV2(0, nil, nsqd) @@ -1456,11 +1456,10 @@ func BenchmarkUnixSocketProtocolV2Exec(b *testing.B) { func benchmarkUnixSocketProtocolV2PubMultiTopic(b *testing.B, numTopics int) { var wg sync.WaitGroup - b.StopTimer() opts := NewOptions() size := 200 batchSize := int(opts.MaxBodySize) / (size + 4) - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) addr, _, nsqd := mustUnixSocketStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1470,20 +1469,22 @@ func benchmarkUnixSocketProtocolV2PubMultiTopic(b *testing.B, numTopics int) { batch[i] = msg } b.SetBytes(int64(len(msg))) - b.StartTimer() + b.ResetTimer() for j := 0; j < numTopics; j++ { topicName := fmt.Sprintf("bench_v2_pub_multi_topic_%d_%d", j, time.Now().Unix()) wg.Add(1) go func() { + var subWg sync.WaitGroup conn, err := mustUnixSocketConnectNSQD(addr) if err != nil { panic(err.Error()) } + defer conn.Close() rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) num := b.N / numTopics / batchSize - wg.Add(1) + subWg.Add(1) go func() { for i := 0; i < num; i++ { cmd, _ := nsq.MultiPublish(topicName, batch) @@ -1496,9 +1497,9 @@ func benchmarkUnixSocketProtocolV2PubMultiTopic(b *testing.B, numTopics int) { panic(err.Error()) } } - wg.Done() + subWg.Done() }() - wg.Add(1) + subWg.Add(1) go func() { for i := 0; i < num; i++ { resp, err := nsq.ReadResponse(rw) @@ -1510,8 +1511,9 @@ func benchmarkUnixSocketProtocolV2PubMultiTopic(b *testing.B, numTopics int) { panic("invalid response") } } - wg.Done() + subWg.Done() }() + subWg.Wait() wg.Done() }() } @@ -1543,10 +1545,9 @@ func BenchmarkUnixSocketProtocolV2PubMultiTopic32(b *testing.B) { func benchmarkUnixSocketProtocolV2Pub(b *testing.B, size int) { var wg sync.WaitGroup - b.StopTimer() opts := NewOptions() batchSize := int(opts.MaxBodySize) / (size + 4) - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) addr, _, nsqd := mustUnixSocketStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1557,19 +1558,21 @@ func benchmarkUnixSocketProtocolV2Pub(b *testing.B, size int) { } topicName := "bench_v2_pub" + strconv.Itoa(int(time.Now().Unix())) b.SetBytes(int64(len(msg))) - b.StartTimer() + b.ResetTimer() for j := 0; j < runtime.GOMAXPROCS(0); j++ { wg.Add(1) go func() { + var subWg sync.WaitGroup conn, err := mustUnixSocketConnectNSQD(addr) if err != nil { panic(err.Error()) } + defer conn.Close() rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) num := b.N / runtime.GOMAXPROCS(0) / batchSize - wg.Add(1) + subWg.Add(1) go func() { for i := 0; i < num; i++ { cmd, _ := nsq.MultiPublish(topicName, batch) @@ -1582,9 +1585,9 @@ func benchmarkUnixSocketProtocolV2Pub(b *testing.B, size int) { panic(err.Error()) } } - wg.Done() + subWg.Done() }() - wg.Add(1) + subWg.Add(1) go func() { for i := 0; i < num; i++ { resp, err := nsq.ReadResponse(rw) @@ -1596,8 +1599,9 @@ func benchmarkUnixSocketProtocolV2Pub(b *testing.B, size int) { panic("invalid response") } } - wg.Done() + subWg.Done() }() + subWg.Wait() wg.Done() }() } @@ -1630,9 +1634,8 @@ func BenchmarkUnixSocketProtocolV2Pub1m(b *testing.B) { benchmarkUnixSocketProto func benchmarkUnixSocketProtocolV2Sub(b *testing.B, size int) { var wg sync.WaitGroup - b.StopTimer() opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) addr, _, nsqd := mustUnixSocketStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1651,12 +1654,12 @@ func benchmarkUnixSocketProtocolV2Sub(b *testing.B, size int) { for j := 0; j < workers; j++ { wg.Add(1) go func() { - subWorker(b.N, workers, addr, topicName, rdyChan, goChan) + subUnixSocketWorker(b.N, workers, addr, topicName, rdyChan, goChan) wg.Done() }() <-rdyChan } - b.StartTimer() + b.ResetTimer() close(goChan) wg.Wait() @@ -1670,6 +1673,7 @@ func subUnixSocketWorker(n int, workers int, addr net.Addr, topicName string, rd if err != nil { panic(err.Error()) } + defer conn.Close() rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriterSize(conn, 65536)) identify(nil, conn, nil, frameTypeResponse) @@ -1729,10 +1733,9 @@ func BenchmarkUnixSocketProtocolV2Sub1m(b *testing.B) { benchmarkUnixSocketProto func benchmarkUnixSocketProtocolV2MultiSub(b *testing.B, num int) { var wg sync.WaitGroup - b.StopTimer() opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) addr, _, nsqd := mustUnixSocketStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -1760,7 +1763,7 @@ func benchmarkUnixSocketProtocolV2MultiSub(b *testing.B, num int) { <-rdyChan } } - b.StartTimer() + b.ResetTimer() close(goChan) wg.Wait() diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index d78fceae9..7cc93a78a 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -199,7 +199,7 @@ func BenchmarkTopicPut(b *testing.B) { b.StopTimer() topicName := "bench_topic_put" + strconv.Itoa(b.N) opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -218,7 +218,7 @@ func BenchmarkTopicToChannelPut(b *testing.B) { topicName := "bench_topic_to_channel_put" + strconv.Itoa(b.N) channelName := "bench" opts := NewOptions() - opts.Logger = test.NewTestLogger(b) + opts.Logger = test.NilLogger{} opts.MemQueueSize = int64(b.N) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) From 8e64930b0752163a2797022e080e32479f941e92 Mon Sep 17 00:00:00 2001 From: Phil Pearl Date: Mon, 29 Apr 2024 17:43:04 +0100 Subject: [PATCH 2/6] nsqd: add a benchmark for compression --- nsqd/protocol_v2_test.go | 88 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 7b340f3c2..de7c0f8c4 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -5,6 +5,7 @@ import ( "bytes" "compress/flate" "crypto/tls" + _ "embed" "encoding/json" "errors" "fmt" @@ -1931,3 +1932,90 @@ func BenchmarkProtocolV2MultiSub2(b *testing.B) { benchmarkProtocolV2MultiSub(b func BenchmarkProtocolV2MultiSub4(b *testing.B) { benchmarkProtocolV2MultiSub(b, 4) } func BenchmarkProtocolV2MultiSub8(b *testing.B) { benchmarkProtocolV2MultiSub(b, 8) } func BenchmarkProtocolV2MultiSub16(b *testing.B) { benchmarkProtocolV2MultiSub(b, 16) } + +//go:embed protocol_v2_test.go +var testData []byte + +func BenchmarkCompress(b *testing.B) { + // This benchmark uses the go-nsq library, so the benefits of the + // compression chosen are somewhat limited by the implementation in the + // library. At time of writing the library doesn't properly buffer Snappy, + // and is using a slower flate implemenation than that used by nsqd. + for _, compression := range []string{"none", "snappy", "deflate1", "deflate3", "deflate5", "deflate6", "deflate9"} { + b.Run(compression, func(b *testing.B) { + opts := NewOptions() + defer os.RemoveAll(opts.DataPath) + opts.Logger = test.NilLogger{} + + tcpAddr, _, nsqd := mustStartNSQD(opts) + defer nsqd.Exit() + + cfg := nsq.NewConfig() + + switch compression { + case "none": + case "snappy": + cfg.Snappy = true + case "deflate1": + cfg.Deflate = true + cfg.DeflateLevel = 1 + case "deflate3": + cfg.Deflate = true + cfg.DeflateLevel = 3 + case "deflate5": + cfg.Deflate = true + cfg.DeflateLevel = 5 + case "deflate6": + cfg.Deflate = true + cfg.DeflateLevel = 6 + case "deflate9": + cfg.Deflate = true + cfg.DeflateLevel = 9 + default: + b.Fatalf("unknown compression: %s", compression) + } + + consumer, err := nsq.NewConsumer("test", "ch", cfg) + if err != nil { + b.Fatal(err) + } + defer consumer.Stop() + consumer.SetLogger(test.NilLogger{}, nsq.LogLevelInfo) + + var wg sync.WaitGroup + wg.Add(1) + + var count int32 + + consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { + if atomic.AddInt32(&count, 1) == int32(b.N) { + wg.Done() + } + return nil + })) + + if err := consumer.ConnectToNSQD(tcpAddr.String()); err != nil { + b.Fatal(err) + } + + producer, err := nsq.NewProducer(tcpAddr.String(), cfg) + if err != nil { + b.Fatal(err) + } + producer.SetLogger(test.NilLogger{}, nsq.LogLevelInfo) + defer producer.Stop() + + b.SetBytes(int64(len(testData))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := producer.Publish("test", testData); err != nil { + b.Fatal(err) + } + } + + wg.Wait() + }) + } +} From c9b5c4f60a5b838a893e854dd72f0cb79b765cec Mon Sep 17 00:00:00 2001 From: Phil Pearl Date: Mon, 29 Apr 2024 19:09:30 +0100 Subject: [PATCH 3/6] nsqd: Switch to Klaus Post's flate compression library This is a dropin replacement for the standard library flate compression. --- apps/nsq_to_file/file_logger.go | 6 +++--- go.mod | 1 + go.sum | 6 ++---- internal/http_api/compress.go | 5 +++-- nsqd/client_v2.go | 2 +- nsqd/protocol_v2_test.go | 6 +++--- nsqd/protocol_v2_unixsocket_test.go | 16 ++++++++++++++-- 7 files changed, 27 insertions(+), 15 deletions(-) diff --git a/apps/nsq_to_file/file_logger.go b/apps/nsq_to_file/file_logger.go index ae16b4082..d516d03a9 100644 --- a/apps/nsq_to_file/file_logger.go +++ b/apps/nsq_to_file/file_logger.go @@ -1,7 +1,6 @@ package main import ( - "compress/gzip" "errors" "fmt" "io" @@ -11,6 +10,7 @@ import ( "strings" "time" + "github.com/klauspost/compress/gzip" "github.com/nsqio/go-nsq" "github.com/nsqio/nsq/internal/lg" ) @@ -331,7 +331,7 @@ func (f *FileLogger) updateFile() { } else { openFlag |= os.O_APPEND } - f.out, err = os.OpenFile(absFilename, openFlag, 0666) + f.out, err = os.OpenFile(absFilename, openFlag, 0o666) if err != nil { if os.IsExist(err) { f.logf(lg.WARN, "[%s/%s] working file already exists: %s", f.topic, f.opts.Channel, absFilename) @@ -369,7 +369,7 @@ func (f *FileLogger) updateFile() { func makeDirFromPath(logf lg.AppLogFunc, path string) error { dir, _ := filepath.Split(path) if dir != "" { - return os.MkdirAll(dir, 0770) + return os.MkdirAll(dir, 0o770) } return nil } diff --git a/go.mod b/go.mod index 247d4c928..dbcbd9704 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/judwhite/go-svc v1.2.1 github.com/julienschmidt/httprouter v1.3.0 + github.com/klauspost/compress v1.17.8 github.com/mreiferson/go-options v1.0.0 github.com/nsqio/go-diskqueue v1.1.0 github.com/nsqio/go-nsq v1.1.0 diff --git a/go.sum b/go.sum index eb4b44846..c53facc12 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= -github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0= @@ -18,6 +16,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/mreiferson/go-options v1.0.0 h1:RMLidydGlDWpL+lQTXo0bVIf/XT2CTq7AEJMoz5/VWs= github.com/mreiferson/go-options v1.0.0/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w= github.com/mreiferson/go-svc v1.2.2-0.20210815184239-7a96e00010f6 h1:NbuBXARvEXrYZ1SzN53ZpObeuwGhl1zvs/C+kzCggrQ= @@ -32,8 +32,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70 h1:SeSEfdIxyvwGJliREIJhRPPXvW6sDlLT+UQ3B0hD0NA= -golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/http_api/compress.go b/internal/http_api/compress.go index ebecad9f1..651a6012c 100644 --- a/internal/http_api/compress.go +++ b/internal/http_api/compress.go @@ -7,11 +7,12 @@ package http_api import ( - "compress/flate" - "compress/gzip" "io" "net/http" "strings" + + "github.com/klauspost/compress/flate" + "github.com/klauspost/compress/gzip" ) type compressResponseWriter struct { diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 32250e72c..2eabef269 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -2,7 +2,6 @@ package nsqd import ( "bufio" - "compress/flate" "crypto/tls" "fmt" "net" @@ -12,6 +11,7 @@ import ( "time" "github.com/golang/snappy" + "github.com/klauspost/compress/flate" "github.com/nsqio/nsq/internal/auth" ) diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index de7c0f8c4..bf881d836 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -3,7 +3,6 @@ package nsqd import ( "bufio" "bytes" - "compress/flate" "crypto/tls" _ "embed" "encoding/json" @@ -25,6 +24,7 @@ import ( "time" "github.com/golang/snappy" + "github.com/klauspost/compress/flate" "github.com/nsqio/go-nsq" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/test" @@ -968,7 +968,6 @@ func TestTLSAuthRequire(t *testing.T) { t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) test.Equal(t, []byte("OK"), data) - } func TestTLSAuthRequireVerify(t *testing.T) { @@ -1494,7 +1493,8 @@ func TestClientAuth(t *testing.T) { } func runAuthTest(t *testing.T, authResponse string, authSecret string, authError string, - authSuccess string, tlsEnabled bool, commonName string) { + authSuccess string, tlsEnabled bool, commonName string, +) { var err error var expectedRemoteIP string expectedTLS := "false" diff --git a/nsqd/protocol_v2_unixsocket_test.go b/nsqd/protocol_v2_unixsocket_test.go index 89b54ff20..6c3f8485d 100644 --- a/nsqd/protocol_v2_unixsocket_test.go +++ b/nsqd/protocol_v2_unixsocket_test.go @@ -3,7 +3,6 @@ package nsqd import ( "bufio" "bytes" - "compress/flate" "crypto/tls" "encoding/json" "errors" @@ -21,6 +20,7 @@ import ( "time" "github.com/golang/snappy" + "github.com/klauspost/compress/flate" "github.com/nsqio/go-nsq" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/test" @@ -903,7 +903,6 @@ func TestUnixSocketTLSAuthRequire(t *testing.T) { t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) test.Equal(t, []byte("OK"), data) - } func TestUnixSocketTLSAuthRequireVerify(t *testing.T) { @@ -1527,18 +1526,23 @@ func benchmarkUnixSocketProtocolV2PubMultiTopic(b *testing.B, numTopics int) { func BenchmarkUnixSocketProtocolV2PubMultiTopic1(b *testing.B) { benchmarkUnixSocketProtocolV2PubMultiTopic(b, 1) } + func BenchmarkUnixSocketkProtocolV2PubMultiTopic2(b *testing.B) { benchmarkUnixSocketProtocolV2PubMultiTopic(b, 2) } + func BenchmarkUnixSocketProtocolV2PubMultiTopic4(b *testing.B) { benchmarkUnixSocketProtocolV2PubMultiTopic(b, 4) } + func BenchmarkUnixSocketProtocolV2PubMultiTopic8(b *testing.B) { benchmarkUnixSocketProtocolV2PubMultiTopic(b, 8) } + func BenchmarkUnixSocketProtocolV2PubMultiTopic16(b *testing.B) { benchmarkUnixSocketProtocolV2PubMultiTopic(b, 16) } + func BenchmarkUnixSocketProtocolV2PubMultiTopic32(b *testing.B) { benchmarkUnixSocketProtocolV2PubMultiTopic(b, 32) } @@ -1624,9 +1628,11 @@ func BenchmarkUnixSocketProtocolV2Pub64k(b *testing.B) { benchmarkUnixSocketProt func BenchmarkUnixSocketProtocolV2Pub128k(b *testing.B) { benchmarkUnixSocketProtocolV2Pub(b, 128*1024) } + func BenchmarkUnixSocketProtocolV2Pub256k(b *testing.B) { benchmarkUnixSocketProtocolV2Pub(b, 256*1024) } + func BenchmarkUnixSocketProtocolV2Pub512k(b *testing.B) { benchmarkUnixSocketProtocolV2Pub(b, 512*1024) } @@ -1723,9 +1729,11 @@ func BenchmarkUnixSocketProtocolV2Sub64k(b *testing.B) { benchmarkUnixSocketProt func BenchmarkUnixSocketProtocolV2Sub128k(b *testing.B) { benchmarkUnixSocketProtocolV2Sub(b, 128*1024) } + func BenchmarkUnixSocketProtocolV2Sub256k(b *testing.B) { benchmarkUnixSocketProtocolV2Sub(b, 256*1024) } + func BenchmarkUnixSocketProtocolV2Sub512k(b *testing.B) { benchmarkUnixSocketProtocolV2Sub(b, 512*1024) } @@ -1775,15 +1783,19 @@ func benchmarkUnixSocketProtocolV2MultiSub(b *testing.B, num int) { func BenchmarkUnixSocketProtocolV2MultiSub2(b *testing.B) { benchmarkUnixSocketProtocolV2MultiSub(b, 2) } + func BenchmarkUnixSocketProtocolV2MultiSub1(b *testing.B) { benchmarkUnixSocketProtocolV2MultiSub(b, 1) } + func BenchmarkUnixSocketProtocolV2MultiSub4(b *testing.B) { benchmarkUnixSocketProtocolV2MultiSub(b, 4) } + func BenchmarkUnixSocketProtocolV2MultiSub8(b *testing.B) { benchmarkUnixSocketProtocolV2MultiSub(b, 8) } + func BenchmarkUnixSocketProtocolV2MultiSub16(b *testing.B) { benchmarkUnixSocketProtocolV2MultiSub(b, 16) } From 606f4875874830ceab2ab52e5042dc15f5c3cffa Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 12 May 2024 13:10:19 +0200 Subject: [PATCH 4/6] fix connection issues on macOS --- nsqd/http_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/nsqd/http_test.go b/nsqd/http_test.go index f77ba5fdd..7b92f5910 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -905,7 +905,12 @@ func BenchmarkHTTPpub(b *testing.B) { msg := make([]byte, 256) topicName := "bench_http_pub" + strconv.Itoa(int(time.Now().Unix())) url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName) - client := &http.Client{} + client := &http.Client{ + Transport: &http.Transport{ + MaxIdleConnsPerHost: 1000, + MaxIdleConns: 0, + }, + } b.SetBytes(int64(len(msg))) b.StartTimer() From 1920b60f09d0ed384fd726552f9d4582254f4a04 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 12 May 2024 13:39:49 +0200 Subject: [PATCH 5/6] updates for newer go-nsq --- bench/bench_channels/bench_channels.go | 7 +- bench/bench_reader/bench_reader.go | 7 +- bench/bench_writer/bench_writer.go | 5 +- nsqd/protocol_v2_test.go | 98 +++++++++++++------------- nsqd/protocol_v2_unixsocket_test.go | 88 +++++++++++------------ nsqlookupd/nsqlookupd_test.go | 20 +++--- 6 files changed, 115 insertions(+), 110 deletions(-) diff --git a/bench/bench_channels/bench_channels.go b/bench/bench_channels/bench_channels.go index 54a196637..163368498 100644 --- a/bench/bench_channels/bench_channels.go +++ b/bench/bench_channels/bench_channels.go @@ -14,6 +14,7 @@ import ( var ( num = flag.Int("num", 10000, "num channels") tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", ": to connect to nsqd") + maxMsgSize = int32(1024 * 1024) ) func main() { @@ -56,10 +57,10 @@ func subWorker(n int, tcpAddr string, <-goChan nsq.Ready(rdyCount).WriteTo(rw) rw.Flush() - nsq.ReadResponse(rw) - nsq.ReadResponse(rw) + nsq.ReadResponse(rw, maxMsgSize) + nsq.ReadResponse(rw, maxMsgSize) for { - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { panic(err.Error()) } diff --git a/bench/bench_reader/bench_reader.go b/bench/bench_reader/bench_reader.go index c1aee491b..1b45692dd 100644 --- a/bench/bench_reader/bench_reader.go +++ b/bench/bench_reader/bench_reader.go @@ -23,6 +23,7 @@ var ( channel = flag.String("channel", "ch", "channel to receive messages on") deadline = flag.String("deadline", "", "deadline to start the benchmark run") rdy = flag.Int("rdy", 2500, "RDY count to use") + maxMsgSize = int32(1024 * 1024) ) var totalMsgCount int64 @@ -86,15 +87,15 @@ func subWorker(td time.Duration, workers int, tcpAddr string, topic string, chan <-goChan nsq.Ready(*rdy).WriteTo(rw) rw.Flush() - nsq.ReadResponse(rw) - nsq.ReadResponse(rw) + nsq.ReadResponse(rw, maxMsgSize) + nsq.ReadResponse(rw, maxMsgSize) var msgCount int64 go func() { time.Sleep(td) conn.Close() }() for { - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { if strings.Contains(err.Error(), "use of closed network connection") { break diff --git a/bench/bench_writer/bench_writer.go b/bench/bench_writer/bench_writer.go index c4e619f3a..7e4f9ea4e 100644 --- a/bench/bench_writer/bench_writer.go +++ b/bench/bench_writer/bench_writer.go @@ -21,6 +21,7 @@ var ( size = flag.Int("size", 200, "size of messages") batchSize = flag.Int("batch-size", 200, "batch size of messages") deadline = flag.String("deadline", "", "deadline to start the benchmark run") + maxMsgSize = int32(1024 * 1024) ) var totalMsgCount int64 @@ -87,7 +88,7 @@ func pubWorker(td time.Duration, tcpAddr string, batchSize int, batch [][]byte, rdyChan <- 1 <-goChan rw.Flush() - nsq.ReadResponse(rw) + nsq.ReadResponse(rw, maxMsgSize) var msgCount int64 endTime := time.Now().Add(td) for { @@ -100,7 +101,7 @@ func pubWorker(td time.Duration, tcpAddr string, batchSize int, batch [][]byte, if err != nil { panic(err.Error()) } - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { panic(err.Error()) } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index bf881d836..a10fbf75f 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -30,6 +30,8 @@ import ( "github.com/nsqio/nsq/internal/test" ) +var maxMsgSize = int32(1024 * 1024) + func mustStartNSQD(opts *Options) (net.Addr, net.Addr, *NSQD) { opts.TCPAddress = "127.0.0.1:0" opts.HTTPAddress = "127.0.0.1:0" @@ -73,7 +75,7 @@ func identify(t *testing.T, conn io.ReadWriter, extra map[string]interface{}, f cmd, _ := nsq.Identify(ci) _, err := cmd.WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, err := nsq.UnpackResponse(resp) test.Nil(t, err) @@ -99,13 +101,13 @@ func authCmd(t *testing.T, conn io.ReadWriter, authSecret string, expectSuccess func subFail(t *testing.T, conn io.ReadWriter, topicName string, channelName string) { _, err := nsq.Subscribe(topicName, channelName).WriteTo(conn) test.Nil(t, err) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) frameType, _, _ := nsq.UnpackResponse(resp) test.Equal(t, frameTypeError, frameType) } func readValidate(t *testing.T, conn io.Reader, f int32, d string) []byte { - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, err := nsq.UnpackResponse(resp) test.Nil(t, err) @@ -149,7 +151,7 @@ func TestBasicV2(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) @@ -188,7 +190,7 @@ func TestMultipleConsumerV2(t *testing.T) { test.Nil(t, err) go func(c net.Conn) { - resp, err := nsq.ReadResponse(c) + resp, err := nsq.ReadResponse(c, maxMsgSize) test.Nil(t, err) _, data, err := nsq.UnpackResponse(resp) test.Nil(t, err) @@ -236,7 +238,7 @@ func TestClientTimeout(t *testing.T) { case <-timer: t.Fatalf("test timed out") default: - _, err := nsq.ReadResponse(conn) + _, err := nsq.ReadResponse(conn, maxMsgSize) if err != nil { goto done } @@ -265,7 +267,7 @@ func TestClientHeartbeat(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) _, data, _ := nsq.UnpackResponse(resp) test.Equal(t, []byte("_heartbeat_"), data) @@ -386,7 +388,7 @@ func TestPausing(t *testing.T) { topic.PutMessage(msg) // receive the first message via the client, finish it, and send new RDY - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) _, data, _ := nsq.UnpackResponse(resp) msg, _ = decodeMessage(data) test.Equal(t, []byte("test body"), msg.Body) @@ -421,7 +423,7 @@ func TestPausing(t *testing.T) { msg = NewMessage(topic.GenerateID(), []byte("test body3")) topic.PutMessage(msg) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) _, data, _ = nsq.UnpackResponse(resp) msg, _ = decodeMessage(data) test.Equal(t, []byte("test body3"), msg.Body) @@ -465,7 +467,7 @@ func TestSizeLimits(t *testing.T) { // PUB that's valid nsq.Publish(topicName, make([]byte, 95)).WriteTo(conn) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -473,7 +475,7 @@ func TestSizeLimits(t *testing.T) { // PUB that's invalid (too big) nsq.Publish(topicName, make([]byte, 105)).WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -486,7 +488,7 @@ func TestSizeLimits(t *testing.T) { // PUB thats empty nsq.Publish(topicName, []byte{}).WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -504,7 +506,7 @@ func TestSizeLimits(t *testing.T) { } cmd, _ := nsq.MultiPublish(topicName, mpub) cmd.WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -517,7 +519,7 @@ func TestSizeLimits(t *testing.T) { } cmd, _ = nsq.MultiPublish(topicName, mpub) cmd.WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -536,7 +538,7 @@ func TestSizeLimits(t *testing.T) { mpub = append(mpub, []byte{}) cmd, _ = nsq.MultiPublish(topicName, mpub) cmd.WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -554,7 +556,7 @@ func TestSizeLimits(t *testing.T) { } cmd, _ = nsq.MultiPublish(topicName, mpub) cmd.WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -580,7 +582,7 @@ func TestDPUB(t *testing.T) { // valid nsq.DeferredPublish(topicName, time.Second, make([]byte, 100)).WriteTo(conn) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -597,7 +599,7 @@ func TestDPUB(t *testing.T) { // duration out of range nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -630,7 +632,7 @@ func TestTouch(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) @@ -681,7 +683,7 @@ func TestMaxRdyCount(t *testing.T) { _, err = nsq.Ready(int(opts.MaxRdyCount)).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) @@ -691,7 +693,7 @@ func TestMaxRdyCount(t *testing.T) { _, err = nsq.Ready(int(opts.MaxRdyCount) + 1).WriteTo(conn) test.Nil(t, err) - resp, err = nsq.ReadResponse(conn) + resp, err = nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ = nsq.UnpackResponse(resp) test.Equal(t, int32(1), frameType) @@ -712,13 +714,13 @@ func TestFatalError(t *testing.T) { _, err = conn.Write([]byte("ASDF\n")) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) test.Equal(t, int32(1), frameType) test.Equal(t, "E_INVALID invalid command ASDF", string(data)) - _, err = nsq.ReadResponse(conn) + _, err = nsq.ReadResponse(conn, maxMsgSize) test.NotNil(t, err) } @@ -762,7 +764,7 @@ func TestOutputBuffering(t *testing.T) { _, err = nsq.Ready(10).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) end := time.Now() @@ -849,7 +851,7 @@ func TestTLS(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -898,7 +900,7 @@ func TestTLSRequired(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -935,7 +937,7 @@ func TestTLSAuthRequire(t *testing.T) { InsecureSkipVerify: true, } tlsConn := tls.Client(conn, tlsConfig) - _, err = nsq.ReadResponse(tlsConn) + _, err = nsq.ReadResponse(tlsConn, maxMsgSize) test.NotNil(t, err) // With Unsigned Cert @@ -963,7 +965,7 @@ func TestTLSAuthRequire(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1001,7 +1003,7 @@ func TestTLSAuthRequireVerify(t *testing.T) { InsecureSkipVerify: true, } tlsConn := tls.Client(conn, tlsConfig) - _, err = nsq.ReadResponse(tlsConn) + _, err = nsq.ReadResponse(tlsConn, maxMsgSize) test.NotNil(t, err) // with invalid cert @@ -1025,7 +1027,7 @@ func TestTLSAuthRequireVerify(t *testing.T) { InsecureSkipVerify: true, } tlsConn = tls.Client(conn, tlsConfig) - _, err = nsq.ReadResponse(tlsConn) + _, err = nsq.ReadResponse(tlsConn, maxMsgSize) test.NotNil(t, err) // with valid cert @@ -1052,7 +1054,7 @@ func TestTLSAuthRequireVerify(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1083,7 +1085,7 @@ func TestDeflate(t *testing.T) { test.Equal(t, true, r.Deflate) compressConn := flate.NewReader(conn) - resp, _ := nsq.ReadResponse(compressConn) + resp, _ := nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1119,7 +1121,7 @@ func TestSnappy(t *testing.T) { test.Equal(t, true, r.Snappy) compressConn := snappy.NewReader(conn) - resp, _ := nsq.ReadResponse(compressConn) + resp, _ := nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1141,7 +1143,7 @@ func TestSnappy(t *testing.T) { msg := NewMessage(topic.GenerateID(), msgBody) topic.PutMessage(msg) - resp, _ = nsq.ReadResponse(compressConn) + resp, _ = nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) test.Equal(t, frameTypeMessage, frameType) @@ -1185,7 +1187,7 @@ func TestTLSDeflate(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1193,7 +1195,7 @@ func TestTLSDeflate(t *testing.T) { compressConn := flate.NewReader(tlsConn) - resp, _ = nsq.ReadResponse(compressConn) + resp, _ = nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1246,7 +1248,7 @@ func TestSampling(t *testing.T) { go func() { for { - _, err := nsq.ReadResponse(conn) + _, err := nsq.ReadResponse(conn, maxMsgSize) if err != nil { return } @@ -1309,7 +1311,7 @@ func TestTLSSnappy(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1317,7 +1319,7 @@ func TestTLSSnappy(t *testing.T) { compressConn := snappy.NewReader(tlsConn) - resp, _ = nsq.ReadResponse(compressConn) + resp, _ = nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1359,7 +1361,7 @@ func TestClientMsgTimeout(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) _, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) test.Equal(t, msg.ID, msgOut.ID) @@ -1376,7 +1378,7 @@ func TestClientMsgTimeout(t *testing.T) { _, err = nsq.Finish(nsq.MessageID(msgOut.ID)).WriteTo(conn) test.Nil(t, err) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) test.Equal(t, frameTypeError, frameType) test.Equal(t, fmt.Sprintf("E_FIN_FAILED FIN %s failed ID not in flight", msgOut.ID), @@ -1403,7 +1405,7 @@ func TestBadFin(t *testing.T) { _, err = fin.WriteTo(conn) test.Nil(t, err) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) test.Equal(t, frameTypeError, frameType) test.Equal(t, "E_INVALID invalid message ID", string(data)) @@ -1435,7 +1437,7 @@ func TestReqTimeoutRange(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) @@ -1446,7 +1448,7 @@ func TestReqTimeoutRange(t *testing.T) { test.Nil(t, err) // It should be immediately available for another attempt - resp, err = nsq.ReadResponse(conn) + resp, err = nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ = nsq.UnpackResponse(resp) msgOut, _ = decodeMessage(data) @@ -1558,7 +1560,7 @@ func runAuthTest(t *testing.T, authResponse string, authSecret string, authError test.Nil(t, err) c = tlsConn - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1676,7 +1678,7 @@ func benchmarkProtocolV2PubMultiTopic(b *testing.B, numTopics int) { subWG.Add(1) go func() { for i := 0; i < num; i++ { - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { panic(err.Error()) } @@ -1755,7 +1757,7 @@ func benchmarkProtocolV2Pub(b *testing.B, size int) { subWg.Add(1) go func() { for i := 0; i < num; i++ { - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { panic(err.Error()) } @@ -1846,7 +1848,7 @@ func subWorker(n int, workers int, tcpAddr net.Addr, topicName string, rdyChan c rw.Flush() num := n / workers for i := 0; i < num; i++ { - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { panic(err.Error()) } diff --git a/nsqd/protocol_v2_unixsocket_test.go b/nsqd/protocol_v2_unixsocket_test.go index 6c3f8485d..30d080ce6 100644 --- a/nsqd/protocol_v2_unixsocket_test.go +++ b/nsqd/protocol_v2_unixsocket_test.go @@ -84,7 +84,7 @@ func TestUnixSocketBasicV2(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) @@ -123,7 +123,7 @@ func TestUnixSocketMultipleConsumerV2(t *testing.T) { test.Nil(t, err) go func(c net.Conn) { - resp, err := nsq.ReadResponse(c) + resp, err := nsq.ReadResponse(c, maxMsgSize) test.Nil(t, err) _, data, err := nsq.UnpackResponse(resp) test.Nil(t, err) @@ -171,7 +171,7 @@ func TestUnixSocketClientTimeout(t *testing.T) { case <-timer: t.Fatalf("test timed out") default: - _, err := nsq.ReadResponse(conn) + _, err := nsq.ReadResponse(conn, maxMsgSize) if err != nil { goto done } @@ -200,7 +200,7 @@ func TestUnixSocketClientHeartbeat(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) _, data, _ := nsq.UnpackResponse(resp) test.Equal(t, []byte("_heartbeat_"), data) @@ -321,7 +321,7 @@ func TestUnixSocketPausing(t *testing.T) { topic.PutMessage(msg) // receive the first message via the client, finish it, and send new RDY - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) _, data, _ := nsq.UnpackResponse(resp) msg, _ = decodeMessage(data) test.Equal(t, []byte("test body"), msg.Body) @@ -356,7 +356,7 @@ func TestUnixSocketPausing(t *testing.T) { msg = NewMessage(topic.GenerateID(), []byte("test body3")) topic.PutMessage(msg) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) _, data, _ = nsq.UnpackResponse(resp) msg, _ = decodeMessage(data) test.Equal(t, []byte("test body3"), msg.Body) @@ -400,7 +400,7 @@ func TestUnixSocketSizeLimits(t *testing.T) { // PUB that's valid nsq.Publish(topicName, make([]byte, 95)).WriteTo(conn) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -408,7 +408,7 @@ func TestUnixSocketSizeLimits(t *testing.T) { // PUB that's invalid (too big) nsq.Publish(topicName, make([]byte, 105)).WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -421,7 +421,7 @@ func TestUnixSocketSizeLimits(t *testing.T) { // PUB thats empty nsq.Publish(topicName, []byte{}).WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -439,7 +439,7 @@ func TestUnixSocketSizeLimits(t *testing.T) { } cmd, _ := nsq.MultiPublish(topicName, mpub) cmd.WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -452,7 +452,7 @@ func TestUnixSocketSizeLimits(t *testing.T) { } cmd, _ = nsq.MultiPublish(topicName, mpub) cmd.WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -471,7 +471,7 @@ func TestUnixSocketSizeLimits(t *testing.T) { mpub = append(mpub, []byte{}) cmd, _ = nsq.MultiPublish(topicName, mpub) cmd.WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -489,7 +489,7 @@ func TestUnixSocketSizeLimits(t *testing.T) { } cmd, _ = nsq.MultiPublish(topicName, mpub) cmd.WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -515,7 +515,7 @@ func TestUnixSocketDPUB(t *testing.T) { // valid nsq.DeferredPublish(topicName, time.Second, make([]byte, 100)).WriteTo(conn) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -532,7 +532,7 @@ func TestUnixSocketDPUB(t *testing.T) { // duration out of range nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) @@ -565,7 +565,7 @@ func TestUnixSocketTouch(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) @@ -616,7 +616,7 @@ func TestUnixSocketMaxRdyCount(t *testing.T) { _, err = nsq.Ready(int(opts.MaxRdyCount)).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) @@ -626,7 +626,7 @@ func TestUnixSocketMaxRdyCount(t *testing.T) { _, err = nsq.Ready(int(opts.MaxRdyCount) + 1).WriteTo(conn) test.Nil(t, err) - resp, err = nsq.ReadResponse(conn) + resp, err = nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ = nsq.UnpackResponse(resp) test.Equal(t, int32(1), frameType) @@ -647,13 +647,13 @@ func TestUnixSocketFatalError(t *testing.T) { _, err = conn.Write([]byte("ASDF\n")) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) test.Equal(t, int32(1), frameType) test.Equal(t, "E_INVALID invalid command ASDF", string(data)) - _, err = nsq.ReadResponse(conn) + _, err = nsq.ReadResponse(conn, maxMsgSize) test.NotNil(t, err) } @@ -697,7 +697,7 @@ func TestUnixSocketOutputBuffering(t *testing.T) { _, err = nsq.Ready(10).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) end := time.Now() @@ -784,7 +784,7 @@ func TestUnixSocketTLS(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -833,7 +833,7 @@ func TestUnixSocketTLSRequired(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -870,7 +870,7 @@ func TestUnixSocketTLSAuthRequire(t *testing.T) { InsecureSkipVerify: true, } tlsConn := tls.Client(conn, tlsConfig) - _, err = nsq.ReadResponse(tlsConn) + _, err = nsq.ReadResponse(tlsConn, maxMsgSize) test.NotNil(t, err) // With Unsigned Cert @@ -898,7 +898,7 @@ func TestUnixSocketTLSAuthRequire(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -936,7 +936,7 @@ func TestUnixSocketTLSAuthRequireVerify(t *testing.T) { InsecureSkipVerify: true, } tlsConn := tls.Client(conn, tlsConfig) - _, err = nsq.ReadResponse(tlsConn) + _, err = nsq.ReadResponse(tlsConn, maxMsgSize) test.NotNil(t, err) // with invalid cert @@ -960,7 +960,7 @@ func TestUnixSocketTLSAuthRequireVerify(t *testing.T) { InsecureSkipVerify: true, } tlsConn = tls.Client(conn, tlsConfig) - _, err = nsq.ReadResponse(tlsConn) + _, err = nsq.ReadResponse(tlsConn, maxMsgSize) test.NotNil(t, err) // with valid cert @@ -987,7 +987,7 @@ func TestUnixSocketTLSAuthRequireVerify(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1018,7 +1018,7 @@ func TestUnixSocketDeflate(t *testing.T) { test.Equal(t, true, r.Deflate) compressConn := flate.NewReader(conn) - resp, _ := nsq.ReadResponse(compressConn) + resp, _ := nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1049,7 +1049,7 @@ func TestUnixSocketSnappy(t *testing.T) { test.Equal(t, true, r.Snappy) compressConn := snappy.NewReader(conn) - resp, _ := nsq.ReadResponse(compressConn) + resp, _ := nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1071,7 +1071,7 @@ func TestUnixSocketSnappy(t *testing.T) { msg := NewMessage(topic.GenerateID(), msgBody) topic.PutMessage(msg) - resp, _ = nsq.ReadResponse(compressConn) + resp, _ = nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) test.Equal(t, frameTypeMessage, frameType) @@ -1115,7 +1115,7 @@ func TestUnixSocketTLSDeflate(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1123,7 +1123,7 @@ func TestUnixSocketTLSDeflate(t *testing.T) { compressConn := flate.NewReader(tlsConn) - resp, _ = nsq.ReadResponse(compressConn) + resp, _ = nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1176,7 +1176,7 @@ func TestUnixSocketSampling(t *testing.T) { go func() { for { - _, err := nsq.ReadResponse(conn) + _, err := nsq.ReadResponse(conn, maxMsgSize) if err != nil { return } @@ -1239,7 +1239,7 @@ func TestUnixSocketTLSSnappy(t *testing.T) { err = tlsConn.Handshake() test.Nil(t, err) - resp, _ := nsq.ReadResponse(tlsConn) + resp, _ := nsq.ReadResponse(tlsConn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1247,7 +1247,7 @@ func TestUnixSocketTLSSnappy(t *testing.T) { compressConn := snappy.NewReader(tlsConn) - resp, _ = nsq.ReadResponse(compressConn) + resp, _ = nsq.ReadResponse(compressConn, maxMsgSize) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeResponse, frameType) @@ -1289,7 +1289,7 @@ func TestUnixSocketClientMsgTimeout(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) _, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) test.Equal(t, msg.ID, msgOut.ID) @@ -1306,7 +1306,7 @@ func TestUnixSocketClientMsgTimeout(t *testing.T) { _, err = nsq.Finish(nsq.MessageID(msgOut.ID)).WriteTo(conn) test.Nil(t, err) - resp, _ = nsq.ReadResponse(conn) + resp, _ = nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) test.Equal(t, frameTypeError, frameType) test.Equal(t, fmt.Sprintf("E_FIN_FAILED FIN %s failed ID not in flight", msgOut.ID), @@ -1333,7 +1333,7 @@ func TestUnixSocketBadFin(t *testing.T) { _, err = fin.WriteTo(conn) test.Nil(t, err) - resp, _ := nsq.ReadResponse(conn) + resp, _ := nsq.ReadResponse(conn, maxMsgSize) frameType, data, _ := nsq.UnpackResponse(resp) test.Equal(t, frameTypeError, frameType) test.Equal(t, "E_INVALID invalid message ID", string(data)) @@ -1365,7 +1365,7 @@ func TestUnixSocketReqTimeoutRange(t *testing.T) { _, err = nsq.Ready(1).WriteTo(conn) test.Nil(t, err) - resp, err := nsq.ReadResponse(conn) + resp, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ := nsq.UnpackResponse(resp) msgOut, _ := decodeMessage(data) @@ -1376,7 +1376,7 @@ func TestUnixSocketReqTimeoutRange(t *testing.T) { test.Nil(t, err) // It should be immediately available for another attempt - resp, err = nsq.ReadResponse(conn) + resp, err = nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) frameType, data, _ = nsq.UnpackResponse(resp) msgOut, _ = decodeMessage(data) @@ -1501,7 +1501,7 @@ func benchmarkUnixSocketProtocolV2PubMultiTopic(b *testing.B, numTopics int) { subWg.Add(1) go func() { for i := 0; i < num; i++ { - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { panic(err.Error()) } @@ -1594,7 +1594,7 @@ func benchmarkUnixSocketProtocolV2Pub(b *testing.B, size int) { subWg.Add(1) go func() { for i := 0; i < num; i++ { - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { panic(err.Error()) } @@ -1692,7 +1692,7 @@ func subUnixSocketWorker(n int, workers int, addr net.Addr, topicName string, rd rw.Flush() num := n / workers for i := 0; i < num; i++ { - resp, err := nsq.ReadResponse(rw) + resp, err := nsq.ReadResponse(rw, maxMsgSize) if err != nil { panic(err.Error()) } diff --git a/nsqlookupd/nsqlookupd_test.go b/nsqlookupd/nsqlookupd_test.go index 492d09b30..4f3d2581b 100644 --- a/nsqlookupd/nsqlookupd_test.go +++ b/nsqlookupd/nsqlookupd_test.go @@ -69,7 +69,7 @@ func identify(t *testing.T, conn net.Conn) { cmd, _ := nsq.Identify(ci) _, err := cmd.WriteTo(conn) test.Nil(t, err) - _, err = nsq.ReadResponse(conn) + _, err = nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) } @@ -89,7 +89,7 @@ func TestBasicLookupd(t *testing.T) { identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) - v, err := nsq.ReadResponse(conn) + v, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) test.Equal(t, []byte("OK"), v) @@ -164,7 +164,7 @@ func TestChannelUnregister(t *testing.T) { identify(t, conn) nsq.Register(topicName, "ch1").WriteTo(conn) - v, err := nsq.ReadResponse(conn) + v, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) test.Equal(t, []byte("OK"), v) @@ -175,7 +175,7 @@ func TestChannelUnregister(t *testing.T) { test.Equal(t, 1, len(channels)) nsq.UnRegister(topicName, "ch1").WriteTo(conn) - v, err = nsq.ReadResponse(conn) + v, err = nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) test.Equal(t, []byte("OK"), v) @@ -211,11 +211,11 @@ func TestTombstoneRecover(t *testing.T) { identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) - _, err := nsq.ReadResponse(conn) + _, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) nsq.Register(topicName2, "channel2").WriteTo(conn) - _, err = nsq.ReadResponse(conn) + _, err = nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s:%d", @@ -258,7 +258,7 @@ func TestTombstoneUnregister(t *testing.T) { identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) - _, err := nsq.ReadResponse(conn) + _, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s:%d", @@ -274,7 +274,7 @@ func TestTombstoneUnregister(t *testing.T) { test.Equal(t, 0, len(pr.Producers)) nsq.UnRegister(topicName, "").WriteTo(conn) - _, err = nsq.ReadResponse(conn) + _, err = nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) time.Sleep(55 * time.Millisecond) @@ -302,7 +302,7 @@ func TestInactiveNodes(t *testing.T) { identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) - _, err := nsq.ReadResponse(conn) + _, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) ci := clusterinfo.New(nil, http_api.NewClient(nil, ConnectTimeout, RequestTimeout)) @@ -335,7 +335,7 @@ func TestTombstonedNodes(t *testing.T) { identify(t, conn) nsq.Register(topicName, "channel1").WriteTo(conn) - _, err := nsq.ReadResponse(conn) + _, err := nsq.ReadResponse(conn, maxMsgSize) test.Nil(t, err) ci := clusterinfo.New(nil, http_api.NewClient(nil, ConnectTimeout, RequestTimeout)) From b7b97a33db92be2219d3ea9bcea67f42617587e9 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 12 May 2024 15:27:49 +0200 Subject: [PATCH 6/6] update go-nsq --- go.mod | 2 +- go.sum | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index dbcbd9704..2e62d2409 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/klauspost/compress v1.17.8 github.com/mreiferson/go-options v1.0.0 github.com/nsqio/go-diskqueue v1.1.0 - github.com/nsqio/go-nsq v1.1.0 + github.com/nsqio/go-nsq v1.1.1-0.20230918004844-c2c38427f295 ) require ( diff --git a/go.sum b/go.sum index c53facc12..1a94a65fe 100644 --- a/go.sum +++ b/go.sum @@ -11,7 +11,6 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= @@ -24,8 +23,8 @@ github.com/mreiferson/go-svc v1.2.2-0.20210815184239-7a96e00010f6 h1:NbuBXARvEXr github.com/mreiferson/go-svc v1.2.2-0.20210815184239-7a96e00010f6/go.mod h1:mo/P2JNX8C07ywpP9YtO2gnBgnUiFTHqtsZekJrUuTk= github.com/nsqio/go-diskqueue v1.1.0 h1:r0dJ0DMXT3+2mOq+79cvCjnhoBxyGC2S9O+OjQrpe4Q= github.com/nsqio/go-diskqueue v1.1.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI= -github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= -github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= +github.com/nsqio/go-nsq v1.1.1-0.20230918004844-c2c38427f295 h1:P5E8d4pd99K4UxkBmqt5y3klvz6hlZcLjyQGLmjwgxM= +github.com/nsqio/go-nsq v1.1.1-0.20230918004844-c2c38427f295/go.mod h1:gi3A+O9Z+6PsytlChJFv3ofbCEpnwmwdYNJkcDM1cxM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=