From 767eeaee14a578b229e9c4cc9e0af4db0fe53297 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Mon, 27 Jan 2025 18:33:28 +0000 Subject: [PATCH] Move to DataDog zstd library and pool decode buffers Use `DataDog/zstd` library for: * a more performant implementation * consistent dependency with the one used in Lotus See Benchamark results below. Before: ``` BenchmarkCborEncoding BenchmarkCborEncoding-12 467352 2376 ns/op 14680 B/op 10 allocs/op BenchmarkCborDecoding BenchmarkCborDecoding-12 104410 11347 ns/op 14944 B/op 27 allocs/op BenchmarkZstdEncoding BenchmarkZstdEncoding-12 286735 3897 ns/op 46748 B/op 12 allocs/op BenchmarkZstdDecoding BenchmarkZstdDecoding-12 110794 10783 ns/op 28512 B/op 28 allocs/op ``` After: ``` BenchmarkCborEncoding BenchmarkCborEncoding-12 439345 2436 ns/op 14680 B/op 10 allocs/op BenchmarkCborDecoding BenchmarkCborDecoding-12 105511 11395 ns/op 14944 B/op 27 allocs/op BenchmarkZstdEncoding BenchmarkZstdEncoding-12 247182 4790 ns/op 28248 B/op 11 allocs/op BenchmarkZstdDecoding BenchmarkZstdDecoding-12 113475 10694 ns/op 20617 B/op 27 allocs/op ``` Fixes: #850 #849 --- chainexchange/pubsub.go | 6 +--- encoding_bench_test.go | 28 +++++++++++++++---- go.mod | 3 +- go.sum | 2 ++ host.go | 5 +--- internal/encoding/encoding.go | 44 +++++++++++++++--------------- internal/encoding/encoding_test.go | 16 ++++------- 7 files changed, 56 insertions(+), 48 deletions(-) diff --git a/chainexchange/pubsub.go b/chainexchange/pubsub.go index 8596b1d8..0e9ddfca 100644 --- a/chainexchange/pubsub.go +++ b/chainexchange/pubsub.go @@ -46,16 +46,12 @@ func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) { if err != nil { return nil, err } - zstd, err := encoding.NewZSTD[*Message]() - if err != nil { - return nil, err - } return &PubSubChainExchange{ options: opts, chainsWanted: map[uint64]*lru.Cache[gpbft.ECChainKey, *chainPortion]{}, chainsDiscovered: map[uint64]*lru.Cache[gpbft.ECChainKey, *chainPortion]{}, pendingCacheAsWanted: make(chan Message, 100), // TODO: parameterise. - encoding: zstd, + encoding: encoding.NewZSTD[*Message](), }, nil } diff --git a/encoding_bench_test.go b/encoding_bench_test.go index 4ec1f8a0..5da357c1 100644 --- a/encoding_bench_test.go +++ b/encoding_bench_test.go @@ -43,15 +43,14 @@ func BenchmarkCborDecoding(b *testing.B) { for pb.Next() { var got PartialGMessage require.NoError(b, encoder.Decode(data, &got)) - require.Equal(b, msg, &got) + requireEqualPartialMessages(b, msg, &got) } }) } func BenchmarkZstdEncoding(b *testing.B) { rng := rand.New(rand.NewSource(seed)) - encoder, err := encoding.NewZSTD[*PartialGMessage]() - require.NoError(b, err) + encoder := encoding.NewZSTD[*PartialGMessage]() msg := generateRandomPartialGMessage(b, rng) b.ResetTimer() @@ -67,8 +66,7 @@ func BenchmarkZstdEncoding(b *testing.B) { func BenchmarkZstdDecoding(b *testing.B) { rng := rand.New(rand.NewSource(seed)) - encoder, err := encoding.NewZSTD[*PartialGMessage]() - require.NoError(b, err) + encoder := encoding.NewZSTD[*PartialGMessage]() msg := generateRandomPartialGMessage(b, rng) data, err := encoder.Encode(msg) require.NoError(b, err) @@ -79,11 +77,29 @@ func BenchmarkZstdDecoding(b *testing.B) { for pb.Next() { var got PartialGMessage require.NoError(b, encoder.Decode(data, &got)) - require.Equal(b, msg, &got) + requireEqualPartialMessages(b, msg, &got) } }) } +func requireEqualPartialMessages(b *testing.B, expected, actual *PartialGMessage) { + // Because empty ECChain gets marshaled as null, we need to use ECChain.Eq for + // checking equality. Hence, the custom equality check. + require.Equal(b, expected.Sender, actual.Sender) + require.Equal(b, expected.Signature, actual.Signature) + require.Equal(b, expected.VoteValueKey, actual.VoteValueKey) + require.Equal(b, expected.Ticket, actual.Ticket) + require.True(b, expected.Vote.Eq(&actual.Vote)) + if expected.Justification == nil { + require.Nil(b, actual.Justification) + } else { + require.NotNil(b, actual.Justification) + require.Equal(b, expected.Justification.Signature, actual.Justification.Signature) + require.Equal(b, expected.Justification.Signers, actual.Justification.Signers) + require.True(b, expected.Justification.Vote.Eq(&actual.Justification.Vote)) + } +} + func generateRandomPartialGMessage(b *testing.B, rng *rand.Rand) *PartialGMessage { var pgmsg PartialGMessage pgmsg.GMessage = generateRandomGMessage(b, rng) diff --git a/go.mod b/go.mod index fe27d62b..9724e6de 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/filecoin-project/go-f3 go 1.22.7 require ( + github.com/DataDog/zstd v1.5.6 github.com/consensys/gnark-crypto v0.12.1 github.com/filecoin-project/go-bitfield v0.2.4 github.com/filecoin-project/go-clock v0.1.0 @@ -12,7 +13,6 @@ require ( github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-log/v2 v2.5.1 - github.com/klauspost/compress v1.17.11 github.com/libp2p/go-libp2p v0.37.2 github.com/libp2p/go-libp2p-pubsub v0.11.0 github.com/marcboeker/go-duckdb v1.8.2 @@ -68,6 +68,7 @@ require ( github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jbenet/goprocess v0.1.4 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect diff --git a/go.sum b/go.sum index e1bd54ca..59567e35 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1 dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY= +github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54= github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= diff --git a/host.go b/host.go index 688c3192..b573b57d 100644 --- a/host.go +++ b/host.go @@ -139,10 +139,7 @@ func newRunner( runner.participant = p if runner.manifest.PubSub.CompressionEnabled { - runner.msgEncoding, err = encoding.NewZSTD[*PartialGMessage]() - if err != nil { - return nil, err - } + runner.msgEncoding = encoding.NewZSTD[*PartialGMessage]() } else { runner.msgEncoding = encoding.NewCBOR[*PartialGMessage]() } diff --git a/internal/encoding/encoding.go b/internal/encoding/encoding.go index 1be3cf08..3e9c870f 100644 --- a/internal/encoding/encoding.go +++ b/internal/encoding/encoding.go @@ -3,8 +3,9 @@ package encoding import ( "bytes" "fmt" + "sync" - "github.com/klauspost/compress/zstd" + "github.com/DataDog/zstd" cbg "github.com/whyrusleeping/cbor-gen" ) @@ -13,6 +14,13 @@ import ( // size in GossipSub. const maxDecompressedSize = 1 << 20 +var bufferPool = sync.Pool{ + New: func() any { + buf := make([]byte, maxDecompressedSize) + return &buf + }, +} + type CBORMarshalUnmarshaler interface { cbg.CBORMarshaler cbg.CBORUnmarshaler @@ -30,11 +38,11 @@ func NewCBOR[T CBORMarshalUnmarshaler]() *CBOR[T] { } func (c *CBOR[T]) Encode(m T) ([]byte, error) { - var buf bytes.Buffer - if err := m.MarshalCBOR(&buf); err != nil { + var out bytes.Buffer + if err := m.MarshalCBOR(&out); err != nil { return nil, err } - return buf.Bytes(), nil + return out.Bytes(), nil } func (c *CBOR[T]) Decode(v []byte, t T) error { @@ -44,24 +52,12 @@ func (c *CBOR[T]) Decode(v []byte, t T) error { type ZSTD[T CBORMarshalUnmarshaler] struct { cborEncoding *CBOR[T] - compressor *zstd.Encoder - decompressor *zstd.Decoder } -func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) { - writer, err := zstd.NewWriter(nil) - if err != nil { - return nil, err - } - reader, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(maxDecompressedSize)) - if err != nil { - return nil, err - } +func NewZSTD[T CBORMarshalUnmarshaler]() *ZSTD[T] { return &ZSTD[T]{ cborEncoding: &CBOR[T]{}, - compressor: writer, - decompressor: reader, - }, nil + } } func (c *ZSTD[T]) Encode(m T) ([]byte, error) { @@ -73,14 +69,18 @@ func (c *ZSTD[T]) Encode(m T) ([]byte, error) { if err != nil { return nil, err } - compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded))) - return compressed, nil + // Internally, zstd instantiates a new buffer using zstd.CompressBound if the + // input buffer is nil. + return zstd.Compress(nil, cborEncoded) } func (c *ZSTD[T]) Decode(v []byte, t T) error { - cborEncoded, err := c.decompressor.DecodeAll(v, make([]byte, 0, len(v))) + buf := bufferPool.Get().(*[]byte) + defer bufferPool.Put(buf) + + length, err := zstd.DecompressInto(*buf, v) if err != nil { return err } - return c.cborEncoding.Decode(cborEncoded, t) + return c.cborEncoding.Decode((*buf)[:length], t) } diff --git a/internal/encoding/encoding_test.go b/internal/encoding/encoding_test.go index 1d186e9f..a9e585af 100644 --- a/internal/encoding/encoding_test.go +++ b/internal/encoding/encoding_test.go @@ -5,8 +5,8 @@ import ( "io" "testing" + "github.com/DataDog/zstd" "github.com/filecoin-project/go-f3/internal/encoding" - "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/require" cbg "github.com/whyrusleeping/cbor-gen" ) @@ -45,8 +45,7 @@ func TestCBOR(t *testing.T) { } func TestZSTD(t *testing.T) { - encoder, err := encoding.NewZSTD[*testValue]() - require.NoError(t, err) + encoder := encoding.NewZSTD[*testValue]() data := &testValue{Value: "lobster"} encoded, err := encoder.Encode(data) require.NoError(t, err) @@ -57,11 +56,7 @@ func TestZSTD(t *testing.T) { } func TestZSTDLimits(t *testing.T) { - subject, err := encoding.NewZSTD[*testValue]() - require.NoError(t, err) - - writer, err := zstd.NewWriter(nil) - require.NoError(t, err) + subject := encoding.NewZSTD[*testValue]() var v testValue v.Value = string(make([]byte, cbg.ByteArrayMaxLen*2)) @@ -69,11 +64,12 @@ func TestZSTDLimits(t *testing.T) { var buf bytes.Buffer require.NoError(t, v.MarshalCBOR(&buf)) - tooLargeACompression := writer.EncodeAll(buf.Bytes(), nil) + tooLargeACompression, err := zstd.Compress(nil, buf.Bytes()) + require.NoError(t, err) // Assert the compressed size is less than 1MiB, in other words, transportable by // the default GossipSub message size limit. require.Less(t, len(tooLargeACompression), 1<<20) var dest testValue - require.ErrorContains(t, subject.Decode(tooLargeACompression, &dest), "decompressed size exceeds configured limit") + require.True(t, zstd.IsDstSizeTooSmallError(subject.Decode(tooLargeACompression, &dest))) }