Skip to content

Commit

Permalink
Move to DataDog zstd library and pool decode buffers
Browse files Browse the repository at this point in the history
Use `DataDog/zstd` library for:
* a more performant implementation
* consistent dependency with the one used in Lotus

See Benchamark results below.

Before:

```
BenchmarkCborEncoding
BenchmarkCborEncoding-12    	  467352	      2376 ns/op	   14680 B/op	      10 allocs/op
BenchmarkCborDecoding
BenchmarkCborDecoding-12    	  104410	     11347 ns/op	   14944 B/op	      27 allocs/op
BenchmarkZstdEncoding
BenchmarkZstdEncoding-12    	  286735	      3897 ns/op	   46748 B/op	      12 allocs/op
BenchmarkZstdDecoding
BenchmarkZstdDecoding-12    	  110794	     10783 ns/op	   28512 B/op	      28 allocs/op
```

After:

```
BenchmarkCborEncoding
BenchmarkCborEncoding-12    	  439345	      2436 ns/op	   14680 B/op	      10 allocs/op
BenchmarkCborDecoding
BenchmarkCborDecoding-12    	  105511	     11395 ns/op	   14944 B/op	      27 allocs/op
BenchmarkZstdEncoding
BenchmarkZstdEncoding-12    	  247182	      4790 ns/op	   28248 B/op	      11 allocs/op
BenchmarkZstdDecoding
BenchmarkZstdDecoding-12    	  113475	     10694 ns/op	   20617 B/op	      27 allocs/op
```

Fixes: #850 #849
  • Loading branch information
masih committed Jan 27, 2025
1 parent bae93e2 commit 767eeae
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 48 deletions.
6 changes: 1 addition & 5 deletions chainexchange/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,12 @@ func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) {
if err != nil {
return nil, err
}
zstd, err := encoding.NewZSTD[*Message]()
if err != nil {
return nil, err
}
return &PubSubChainExchange{
options: opts,
chainsWanted: map[uint64]*lru.Cache[gpbft.ECChainKey, *chainPortion]{},
chainsDiscovered: map[uint64]*lru.Cache[gpbft.ECChainKey, *chainPortion]{},
pendingCacheAsWanted: make(chan Message, 100), // TODO: parameterise.
encoding: zstd,
encoding: encoding.NewZSTD[*Message](),
}, nil
}

Expand Down
28 changes: 22 additions & 6 deletions encoding_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ func BenchmarkCborDecoding(b *testing.B) {
for pb.Next() {
var got PartialGMessage
require.NoError(b, encoder.Decode(data, &got))
require.Equal(b, msg, &got)
requireEqualPartialMessages(b, msg, &got)
}
})
}

func BenchmarkZstdEncoding(b *testing.B) {
rng := rand.New(rand.NewSource(seed))
encoder, err := encoding.NewZSTD[*PartialGMessage]()
require.NoError(b, err)
encoder := encoding.NewZSTD[*PartialGMessage]()
msg := generateRandomPartialGMessage(b, rng)

b.ResetTimer()
Expand All @@ -67,8 +66,7 @@ func BenchmarkZstdEncoding(b *testing.B) {

func BenchmarkZstdDecoding(b *testing.B) {
rng := rand.New(rand.NewSource(seed))
encoder, err := encoding.NewZSTD[*PartialGMessage]()
require.NoError(b, err)
encoder := encoding.NewZSTD[*PartialGMessage]()
msg := generateRandomPartialGMessage(b, rng)
data, err := encoder.Encode(msg)
require.NoError(b, err)
Expand All @@ -79,11 +77,29 @@ func BenchmarkZstdDecoding(b *testing.B) {
for pb.Next() {
var got PartialGMessage
require.NoError(b, encoder.Decode(data, &got))
require.Equal(b, msg, &got)
requireEqualPartialMessages(b, msg, &got)
}
})
}

func requireEqualPartialMessages(b *testing.B, expected, actual *PartialGMessage) {
// Because empty ECChain gets marshaled as null, we need to use ECChain.Eq for
// checking equality. Hence, the custom equality check.
require.Equal(b, expected.Sender, actual.Sender)
require.Equal(b, expected.Signature, actual.Signature)
require.Equal(b, expected.VoteValueKey, actual.VoteValueKey)
require.Equal(b, expected.Ticket, actual.Ticket)
require.True(b, expected.Vote.Eq(&actual.Vote))
if expected.Justification == nil {
require.Nil(b, actual.Justification)
} else {
require.NotNil(b, actual.Justification)
require.Equal(b, expected.Justification.Signature, actual.Justification.Signature)
require.Equal(b, expected.Justification.Signers, actual.Justification.Signers)
require.True(b, expected.Justification.Vote.Eq(&actual.Justification.Vote))
}
}

func generateRandomPartialGMessage(b *testing.B, rng *rand.Rand) *PartialGMessage {
var pgmsg PartialGMessage
pgmsg.GMessage = generateRandomGMessage(b, rng)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/filecoin-project/go-f3
go 1.22.7

require (
github.com/DataDog/zstd v1.5.6
github.com/consensys/gnark-crypto v0.12.1
github.com/filecoin-project/go-bitfield v0.2.4
github.com/filecoin-project/go-clock v0.1.0
Expand All @@ -12,7 +13,6 @@ require (
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/klauspost/compress v1.17.11
github.com/libp2p/go-libp2p v0.37.2
github.com/libp2p/go-libp2p-pubsub v0.11.0
github.com/marcboeker/go-duckdb v1.8.2
Expand Down Expand Up @@ -68,6 +68,7 @@ require (
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY=
github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54=
github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc=
Expand Down
5 changes: 1 addition & 4 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,7 @@ func newRunner(
runner.participant = p

if runner.manifest.PubSub.CompressionEnabled {
runner.msgEncoding, err = encoding.NewZSTD[*PartialGMessage]()
if err != nil {
return nil, err
}
runner.msgEncoding = encoding.NewZSTD[*PartialGMessage]()
} else {
runner.msgEncoding = encoding.NewCBOR[*PartialGMessage]()
}
Expand Down
44 changes: 22 additions & 22 deletions internal/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package encoding
import (
"bytes"
"fmt"
"sync"

"github.com/klauspost/compress/zstd"
"github.com/DataDog/zstd"
cbg "github.com/whyrusleeping/cbor-gen"
)

Expand All @@ -13,6 +14,13 @@ import (
// size in GossipSub.
const maxDecompressedSize = 1 << 20

var bufferPool = sync.Pool{
New: func() any {
buf := make([]byte, maxDecompressedSize)
return &buf
},
}

type CBORMarshalUnmarshaler interface {
cbg.CBORMarshaler
cbg.CBORUnmarshaler
Expand All @@ -30,11 +38,11 @@ func NewCBOR[T CBORMarshalUnmarshaler]() *CBOR[T] {
}

func (c *CBOR[T]) Encode(m T) ([]byte, error) {
var buf bytes.Buffer
if err := m.MarshalCBOR(&buf); err != nil {
var out bytes.Buffer
if err := m.MarshalCBOR(&out); err != nil {
return nil, err
}
return buf.Bytes(), nil
return out.Bytes(), nil
}

func (c *CBOR[T]) Decode(v []byte, t T) error {
Expand All @@ -44,24 +52,12 @@ func (c *CBOR[T]) Decode(v []byte, t T) error {

type ZSTD[T CBORMarshalUnmarshaler] struct {
cborEncoding *CBOR[T]
compressor *zstd.Encoder
decompressor *zstd.Decoder
}

func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) {
writer, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}
reader, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(maxDecompressedSize))
if err != nil {
return nil, err
}
func NewZSTD[T CBORMarshalUnmarshaler]() *ZSTD[T] {
return &ZSTD[T]{
cborEncoding: &CBOR[T]{},
compressor: writer,
decompressor: reader,
}, nil
}
}

func (c *ZSTD[T]) Encode(m T) ([]byte, error) {
Expand All @@ -73,14 +69,18 @@ func (c *ZSTD[T]) Encode(m T) ([]byte, error) {
if err != nil {
return nil, err
}
compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded)))
return compressed, nil
// Internally, zstd instantiates a new buffer using zstd.CompressBound if the
// input buffer is nil.
return zstd.Compress(nil, cborEncoded)
}

func (c *ZSTD[T]) Decode(v []byte, t T) error {
cborEncoded, err := c.decompressor.DecodeAll(v, make([]byte, 0, len(v)))
buf := bufferPool.Get().(*[]byte)
defer bufferPool.Put(buf)

length, err := zstd.DecompressInto(*buf, v)
if err != nil {
return err
}
return c.cborEncoding.Decode(cborEncoded, t)
return c.cborEncoding.Decode((*buf)[:length], t)
}
16 changes: 6 additions & 10 deletions internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"io"
"testing"

"github.com/DataDog/zstd"
"github.com/filecoin-project/go-f3/internal/encoding"
"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/require"
cbg "github.com/whyrusleeping/cbor-gen"
)
Expand Down Expand Up @@ -45,8 +45,7 @@ func TestCBOR(t *testing.T) {
}

func TestZSTD(t *testing.T) {
encoder, err := encoding.NewZSTD[*testValue]()
require.NoError(t, err)
encoder := encoding.NewZSTD[*testValue]()
data := &testValue{Value: "lobster"}
encoded, err := encoder.Encode(data)
require.NoError(t, err)
Expand All @@ -57,23 +56,20 @@ func TestZSTD(t *testing.T) {
}

func TestZSTDLimits(t *testing.T) {
subject, err := encoding.NewZSTD[*testValue]()
require.NoError(t, err)

writer, err := zstd.NewWriter(nil)
require.NoError(t, err)
subject := encoding.NewZSTD[*testValue]()

var v testValue
v.Value = string(make([]byte, cbg.ByteArrayMaxLen*2))

var buf bytes.Buffer
require.NoError(t, v.MarshalCBOR(&buf))

tooLargeACompression := writer.EncodeAll(buf.Bytes(), nil)
tooLargeACompression, err := zstd.Compress(nil, buf.Bytes())
require.NoError(t, err)
// Assert the compressed size is less than 1MiB, in other words, transportable by
// the default GossipSub message size limit.
require.Less(t, len(tooLargeACompression), 1<<20)

var dest testValue
require.ErrorContains(t, subject.Decode(tooLargeACompression, &dest), "decompressed size exceeds configured limit")
require.True(t, zstd.IsDstSizeTooSmallError(subject.Decode(tooLargeACompression, &dest)))
}

0 comments on commit 767eeae

Please sign in to comment.