Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update boxo with new Provide interface #441

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 53 additions & 52 deletions delegatedrouting/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,20 +294,23 @@ func (listener *Listener) FindProviders(ctx context.Context, key cid.Cid, limit
return nil, errors.New("unsupported find providers request")
}

func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {
const printFrequency = 10_000
cids := req.Keys
pid := req.ID
paddrs := req.Addrs
func (listener *Listener) Provide(ctx context.Context, rec *types.AnnouncementRecord) (time.Duration, error) {
cid := rec.Payload.CID
pid := *rec.Payload.ID
paddrs := make([]multiaddr.Multiaddr, len(rec.Payload.Addrs))
for i, addr := range rec.Payload.Addrs {
paddrs[i] = addr.Multiaddr
}

startTime := time.Now()
listener.lock.Lock()
defer func() {
listener.stats.incDelegatedRoutingCallsProcessed()
log.Infow("Finished processing Provide request.", "time", time.Since(startTime), "len", len(cids))
log.Infow("Finished processing Provide request.", "time", time.Since(startTime))
listener.lock.Unlock()
}()

log.Infof("Received Provide request with %d cids.", len(cids))
log.Infof("Received Provide request")
listener.stats.incDelegatedRoutingCallsReceived()

// shadowing the calling function's context so that cancellation of it doesn't affect processing
Expand All @@ -327,64 +330,57 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa
listener.lastSeenProviderInfo.ID = pid
listener.lastSeenProviderInfo.Addrs = paddrs

for i, c := range cids {
// persisting timestamp only if this is not a snapshot
if len(cids) < listener.snapshotSize {
err := listener.dsWrapper.recordCidTimestamp(ctx, c, startTime)
if err != nil {
log.Errorw("Error persisting timestamp. Continuing.", "cid", c, "err", err)
continue
}
// TODO: REMOVE?
// persisting timestamp only if this is not a snapshot
// if len(cids) < listener.snapshotSize {
err := listener.dsWrapper.recordCidTimestamp(ctx, cid, startTime)
if err != nil {
log.Errorw("Error persisting timestamp. Continuing.", "cid", cid, "err", err)
}
// }

listElem := listener.cidQueue.getNodeByCid(cid)
if listElem == nil {
listener.cidQueue.recordCidNode(&cidNode{
C: cid,
Timestamp: startTime,
})
err := listener.chunker.addCidToCurrentChunk(ctx, cid, func(cc *cidsChunk) error {
return listener.notifyPutAndPersist(ctx, cc)
})
if err != nil {
log.Errorw("Error adding a cid to the current chunk. Continuing.", "cid", cid, "err", err)
listener.cidQueue.removeCidNode(cid)
}

listElem := listener.cidQueue.getNodeByCid(c)
if listElem == nil {
listener.cidQueue.recordCidNode(&cidNode{
C: c,
Timestamp: startTime,
})
err := listener.chunker.addCidToCurrentChunk(ctx, c, func(cc *cidsChunk) error {
} else {
node := listElem.Value.(*cidNode)
node.Timestamp = startTime
listener.cidQueue.recordCidNode(node)
// if no existing chunk has been found for the cid - adding it to the current one
// This can happen in the following cases:
// * when currentChunk disappears between restarts as it doesn't get persisted until it's advertised
// * when the same cid comes multiple times within the lifespan of the same chunk
// * after a error to generate a replacement chunk
if node.chunk == nil {
err := listener.chunker.addCidToCurrentChunk(ctx, cid, func(cc *cidsChunk) error {
return listener.notifyPutAndPersist(ctx, cc)
})
if err != nil {
log.Errorw("Error adding a cid to the current chunk. Continuing.", "cid", c, "err", err)
listener.cidQueue.removeCidNode(c)
continue
log.Errorw("Error adding a cid to the current chunk. Continuing.", "cid", cid, "err", err)
}
} else {
node := listElem.Value.(*cidNode)
node.Timestamp = startTime
listener.cidQueue.recordCidNode(node)
// if no existing chunk has been found for the cid - adding it to the current one
// This can happen in the following cases:
// * when currentChunk disappears between restarts as it doesn't get persisted until it's advertised
// * when the same cid comes multiple times within the lifespan of the same chunk
// * after a error to generate a replacement chunk
if node.chunk == nil {
err := listener.chunker.addCidToCurrentChunk(ctx, c, func(cc *cidsChunk) error {
return listener.notifyPutAndPersist(ctx, cc)
})
if err != nil {
log.Errorw("Error adding a cid to the current chunk. Continuing.", "cid", c, "err", err)
continue
}
}
listener.stats.incExistingCidsProcessed()
}

listener.stats.incCidsProcessed()
// Doing some logging for larger requests
if i != 0 && i%printFrequency == 0 {
log.Infof("Processed %d out of %d CIDs. startTime=%v", i, len(cids), startTime)
}
listener.stats.incExistingCidsProcessed()
}

listener.stats.incCidsProcessed()

removedSomething, err := listener.removeExpiredCids(ctx)
if err != nil {
log.Warnw("Error removing expired cids.", "err", err)
}

// if that was a snapshot or some cids have expired - persisting timestamps as binary blob
if removedSomething || len(cids) >= listener.snapshotSize {
if removedSomething { // TODO: remove ? || len(cids) >= listener.snapshotSize {
listener.dsWrapper.recordTimestampsSnapshot(ctx, listener.cidQueue.getTimestampsSnapshot())
}
return time.Duration(listener.cidTtl), nil
Expand Down Expand Up @@ -550,6 +546,11 @@ func (listener *Listener) notifyPutAndPersist(ctx context.Context, chunk *cidsCh
return nil
}

func (listener *Listener) ProvidePeer(ctx context.Context, rec *types.AnnouncementRecord) (time.Duration, error) {
log.Warn("Received unsupported ProvidePeer request")
return 0, errors.New("unsupported provide peer request")
}

func (listener *Listener) provider() peer.ID {
if listener.configuredProviderInfo == nil {
return listener.lastSeenProviderInfo.ID
Expand Down
33 changes: 27 additions & 6 deletions delegatedrouting/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/ipfs/boxo/routing/http/client"
"github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/boxo/routing/http/server"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand Down Expand Up @@ -140,7 +142,10 @@ func TestProvideRoundtrip(t *testing.T) {
testCid4 := newCid("test4")
testCid5 := newCid("test5")

_, err = errorClient.ProvideBitswap(ctx, []cid.Cid{testCid1}, time.Hour)
_, err = errorClient.Provide(ctx, types.AnnouncementRequest{
CID: testCid1,
TTL: time.Hour,
})
require.Error(t, err, "should get sync error on unsigned provide request.")
errorServer.Close()

Expand Down Expand Up @@ -197,7 +202,10 @@ func TestProvideRoundtripWithRemove(t *testing.T) {
testCid2 := newCid("test2")
testCid3 := newCid("test3")

_, err = errorClient.ProvideBitswap(ctx, []cid.Cid{testCid1}, time.Hour)
_, err = errorClient.Provide(ctx, types.AnnouncementRequest{
CID: testCid1,
TTL: time.Hour,
})
require.Error(t, err, "should get sync error on unsigned provide request.")
errorServer.Close()

Expand Down Expand Up @@ -1380,9 +1388,22 @@ func provide(t *testing.T, cc contentrouter.Client, ctx context.Context, c cid.C
}

func provideMany(t *testing.T, cc contentrouter.Client, ctx context.Context, cids []cid.Cid) time.Duration {
rc, err := cc.ProvideBitswap(ctx, cids, 2*time.Hour)
var announcements []types.AnnouncementRequest
for _, cid := range cids {
announcements = append(announcements, types.AnnouncementRequest{
CID: cid,
TTL: 2 * time.Hour,
})
}

resultIter, err := cc.Provide(ctx, announcements...)
require.NoError(t, err)
return rc

results, err := iter.ReadAllResults(resultIter)
require.NoError(t, err)
require.Len(t, results, len(cids))

return results[0].TTL
}

func generateContextID(cids []string, nonce []byte) []byte {
Expand All @@ -1408,10 +1429,10 @@ func createClientAndServer(t *testing.T, router server.ContentRouter, p *peer.Ad
var c contentrouter.Client
var err error
if p != nil {
c, err = client.New(s.URL, client.WithIdentity(identity), client.WithProviderInfo(p.ID, p.Addrs))
c, err = client.New(s.URL, client.WithProviderInfo(identity, p.ID, p.Addrs, nil))
require.NoError(t, err)
} else {
c, err = client.New(s.URL, client.WithIdentity(identity))
c, err = client.New(s.URL)
require.NoError(t, err)
}

Expand Down
53 changes: 26 additions & 27 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/mock v1.6.0
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/boxo v0.19.0
github.com/ipfs/boxo v0.19.1-0.20240508083100-abf64fcd93a9
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
Expand All @@ -23,13 +23,13 @@ require (
github.com/multiformats/go-multiaddr v0.12.3
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
github.com/prometheus/client_golang v1.18.0
github.com/rogpeppe/go-internal v1.12.0
github.com/prometheus/client_golang v1.19.0
github.com/rogpeppe/go-internal v1.10.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.2
go.opentelemetry.io/otel v1.21.0
github.com/urfave/cli/v2 v2.25.7
go.opentelemetry.io/otel v1.25.0
go.opentelemetry.io/otel/exporters/prometheus v0.39.0
go.opentelemetry.io/otel/metric v1.21.0
go.opentelemetry.io/otel/metric v1.25.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
)

Expand All @@ -41,7 +41,7 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gammazero/channelqueue v0.2.1 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand All @@ -53,12 +53,12 @@ require (
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
github.com/onsi/ginkgo/v2 v2.17.1 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.42.0 // indirect
github.com/quic-go/webtransport-go v0.6.0 // indirect
github.com/quic-go/webtransport-go v0.7.0 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20240109153615-66e95c3e8a87 // indirect
github.com/whyrusleeping/cbor-gen v0.1.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.20.1 // indirect
Expand All @@ -72,13 +72,13 @@ require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bep/debounce v1.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/filecoin-project/go-address v1.1.0 // indirect
Expand All @@ -97,16 +97,15 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20230214144701-5d17c9d5243c // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-blockservice v0.5.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-cbor v0.1.0 // indirect
Expand All @@ -119,7 +118,7 @@ require (
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.17.6 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand Down Expand Up @@ -151,29 +150,29 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.47.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.52.3 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/twmb/murmur3 v1.1.6 // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opentelemetry.io/otel/sdk v1.25.0 // indirect
go.opentelemetry.io/otel/trace v1.25.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/tools v0.18.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
lukechampine.com/blake3 v1.2.2 // indirect
)
Loading