Skip to content

Commit bc716f4

Browse files
ZenGround0rvagg
andauthored
Backport index and ipni pdp tasks to PDPv0 (#670)
* update pieceref schema and indexing task rough draft * Bring in state view * WIP * IPNI task done * Fixes * Move alteration below creation * Fix lint errors and actually use new tasks * review response * Fix calibnet curio bug * Better errors to debug pdptool * allow simple pdp service listener for testing * fix * Allow 0 listener * SimpleService * sql syntax error * SQL missing AND keyword * Chill out ipni announce * More and less logging * more debug * Only publish new heads * fix * init * Use filecoinpin.contact/announce as default direct announce url * Review Response * Fix inconsistent sp_id * Whoops bad sql * further fixes for negativevalue * less verbose before any publish * Review Response * Update tasks/indexing/task_pdp_ipni.go Co-authored-by: Rod Vagg <[email protected]> * Review Response * Lint * Trying very small publish interval * docsgen --------- Co-authored-by: zenground0 <[email protected]> Co-authored-by: Rod Vagg <[email protected]>
1 parent 725dabb commit bc716f4

File tree

15 files changed

+1088
-17
lines changed

15 files changed

+1088
-17
lines changed

cmd/curio/tasks/tasks.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
287287
return nil, err
288288
}
289289
var sdeps cuhttp.ServiceDeps
290+
idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)
290291

291292
if cfg.Subsystems.EnablePDP {
292293
es := getSenderEth()
@@ -299,11 +300,11 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
299300
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
300301
pdpInitProvingPeriodTask := pdp.NewInitProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
301302
pdpNotifTask := pdp.NewPDPNotifyTask(db)
302-
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask)
303+
pdpIndexingTask := indexing.NewPDPIndexingTask(db, iStore, dependencies.CachedPieceReader, cfg, idxMax)
304+
pdpIpniTask := indexing.NewPDPIPNITask(db, sc, dependencies.CachedPieceReader, cfg, idxMax)
305+
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask)
303306
}
304307

305-
idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)
306-
307308
indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
308309
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax)
309310
activeTasks = append(activeTasks, ipniTask, indexingTask)

deps/config/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func DefaultCurioConfig() *CurioConfig {
114114
},
115115
IPNI: IPNIConfig{
116116
ServiceURL: []string{"https://cid.contact"},
117-
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"},
117+
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce", "https://filecoinpin.contact/announce"},
118118
},
119119
},
120120
},

documentation/en/configuration/default-curio-configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ description: The default curio configuration
722722
# heads.
723723
#
724724
# type: []string
725-
#DirectAnnounceURLs = ["https://cid.contact/ingest/announce"]
725+
#DirectAnnounceURLs = ["https://cid.contact/ingest/announce", "https://filecoinpin.contact/announce"]
726726

727727
# Indexing configuration for deal indexing
728728
#

harmony/harmonydb/sql/20240823-ipni.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ CREATE TABLE ipni_peerid (
88
CREATE TABLE ipni (
99
order_number BIGSERIAL PRIMARY KEY, -- Unique increasing order number
1010
ad_cid TEXT NOT NULL,
11-
context_id BYTEA NOT NULL, -- abi.PieceInfo in Curio
11+
context_id BYTEA NOT NULL, -- abi.PieceInfo || PDPIPNIContext in Curio
1212
-- metadata column in not required as Curio only supports one type of metadata(HTTP)
1313
is_rm BOOLEAN NOT NULL,
1414

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- fields tracking indexing and ipni jobs over pdp pieces
2+
ALTER TABLE pdp_piecerefs ADD COLUMN indexing_task_id BIGINT DEFAULT NULL;
3+
ALTER TABLE pdp_piecerefs ADD COLUMN needs_indexing BOOLEAN DEFAULT FALSE;
4+
ALTER TABLE pdp_piecerefs ADD COLUMN ipni_task_id BIGINT DEFAULT NULL;
5+
ALTER TABLE pdp_piecerefs ADD COLUMN needs_ipni BOOLEAN DEFAULT FALSE;

market/ipni/ipni-provider/ipni-provider.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/libp2p/go-libp2p/core/peer"
2828
"github.com/multiformats/go-multiaddr"
2929
"github.com/samber/lo"
30+
"github.com/yugabyte/pgx/v5"
3031
"golang.org/x/xerrors"
3132

3233
"github.com/filecoin-project/go-address"
@@ -49,8 +50,8 @@ const IPNIRoutePath = "/ipni-provider/"
4950
const IPNIPath = "/ipni/v1/ad/"
5051

5152
// publishInterval represents the time interval between each publishing operation.
52-
// It is set to 10 minutes.
53-
const publishInterval = 10 * time.Minute
53+
// It is set to 30 seconds for the purposes of PDP index publishing
54+
const publishInterval = 5 * time.Second
5455
const publishProviderSpacing = 10 * time.Second
5556

5657
var (
@@ -73,6 +74,7 @@ type Provider struct {
7374
indexStore *indexstore.IndexStore
7475
sc *chunker.ServeChunker
7576
keys map[string]*peerInfo // map[peerID String]Private_Key
77+
latest map[string]cid.Cid // map[peerID String]last published head, used to avoid duplicate announce
7678
// announceURLs enables sending direct announcements via HTTP. This is
7779
// the list of indexer URLs to send direct HTTP announce messages to.
7880
announceURLs []*url.URL
@@ -103,8 +105,9 @@ func NewProvider(d *deps.Deps) (*Provider, error) {
103105
for rows.Next() && rows.Err() == nil {
104106
var priv []byte
105107
var peerID string
108+
var sp int64
106109
var spID abi.ActorID
107-
err := rows.Scan(&priv, &peerID, &spID)
110+
err := rows.Scan(&priv, &peerID, &sp)
108111
if err != nil {
109112
return nil, xerrors.Errorf("failed to scan the row: %w", err)
110113
}
@@ -123,6 +126,12 @@ func NewProvider(d *deps.Deps) (*Provider, error) {
123126
return nil, xerrors.Errorf("peer ID mismatch: got %s (calculated), expected %s (DB)", id.String(), peerID)
124127
}
125128

129+
if sp < 0 {
130+
spID = abi.ActorID(0)
131+
} else {
132+
spID = abi.ActorID(sp)
133+
}
134+
126135
maddr, err := address.NewIDAddress(uint64(spID))
127136
if err != nil {
128137
return nil, xerrors.Errorf("parsing miner ID: %w", err)
@@ -193,6 +202,7 @@ func NewProvider(d *deps.Deps) (*Provider, error) {
193202
keys: keyMap,
194203
announceURLs: announceURLs,
195204
httpServerAddresses: httpServerAddresses,
205+
latest: make(map[string]cid.Cid),
196206
}, nil
197207
}
198208

@@ -364,7 +374,7 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
364374
start := time.Now()
365375

366376
defer func() {
367-
log.Infow("Served IPNI request", "path", r.URL.Path, "cid", reqCid, "providerId", providerID, "took", time.Since(start))
377+
log.Infow("Served IPNI request", "path", r.URL.Path, "cid", reqCid, "providerId", providerID, "took", time.Since(start), "remote_addr", r.RemoteAddr)
368378
}()
369379

370380
b, err := cid.Parse(reqCid)
@@ -477,7 +487,6 @@ func RemoveCidContact(slice []*url.URL) []*url.URL {
477487
// StartPublishing starts a poller which publishes the head for each provider every 10 minutes.
478488
func (p *Provider) StartPublishing(ctx context.Context) {
479489
var ticker *time.Ticker
480-
481490
// A poller which publishes head for each provider
482491
// every 10 minutes for mainnet build
483492
if build.BuildType == build.BuildMainnet {
@@ -489,12 +498,22 @@ func (p *Provider) StartPublishing(ctx context.Context) {
489498
return
490499
}
491500
log.Info("Starting IPNI provider publishing for testnet build")
501+
ticker = time.NewTicker(publishInterval)
492502
if build.BuildType != build.BuildCalibnet {
493503
ticker = time.NewTicker(time.Second * 10)
494504
log.Info("Resetting IPNI provider publishing ticker to 10 seconds for devnet build")
495505
}
496506
}
497507

508+
// Populated latest head cid from the ipni_head table
509+
for provider := range p.keys {
510+
c, err := p.getHeadCID(ctx, provider)
511+
if err != nil {
512+
log.Errorw("failed to get head CID", "provider", provider, "error", err)
513+
continue
514+
}
515+
p.latest[provider] = c
516+
}
498517
go func(ticker *time.Ticker) {
499518
for {
500519
select {
@@ -518,6 +537,10 @@ func (p *Provider) StartPublishing(ctx context.Context) {
518537
func (p *Provider) getHeadCID(ctx context.Context, provider string) (cid.Cid, error) {
519538
var headStr string
520539
err := p.db.QueryRow(ctx, `SELECT head FROM ipni_head WHERE provider = $1`, provider).Scan(&headStr)
540+
if err == pgx.ErrNoRows {
541+
log.Debugw("no head CID yet for provider", "provider", provider)
542+
return cid.Undef, nil
543+
}
521544
if err != nil {
522545
return cid.Undef, xerrors.Errorf("querying previous head: %w", err)
523546
}
@@ -544,10 +567,17 @@ func (p *Provider) publishHead(ctx context.Context) {
544567
log.Errorw("failed to get head CID", "provider", provider, "error", err)
545568
continue
546569
}
570+
if _, ok := p.latest[provider]; ok && p.latest[provider] == c {
571+
log.Debugw("Skipping duplicate announce for provider", "provider", provider, "cid", c.String())
572+
continue
573+
}
574+
547575
log.Infow("Publishing head for provider", "provider", provider, "cid", c.String())
548576
err = p.publishhttp(ctx, c, provider)
549577
if err != nil {
550578
log.Errorw("failed to publish head for provide", "provider", provider, "error", err)
579+
} else {
580+
p.latest[provider] = c
551581
}
552582

553583
i++
@@ -567,6 +597,7 @@ func (p *Provider) publishProviderSpacingWait() {
567597
// It obtains the HTTP addresses for the peer and sends the announce message to those addresses.
568598
func (p *Provider) publishhttp(ctx context.Context, adCid cid.Cid, peer string) error {
569599
// Create the http announce sender.
600+
log.Infow("Creating http announce sender", "urls", p.announceURLs)
570601
httpSender, err := httpsender.New(p.announceURLs, p.keys[peer].ID)
571602
if err != nil {
572603
return fmt.Errorf("cannot create http announce sender: %w", err)
@@ -577,7 +608,6 @@ func (p *Provider) publishhttp(ctx context.Context, adCid cid.Cid, peer string)
577608
return fmt.Errorf("cannot create provider http addresses: %w", err)
578609
}
579610

580-
log.Infow("Announcing advertisements over HTTP", "urls", p.announceURLs)
581611
return announce.Send(ctx, adCid, addrs, httpSender)
582612
}
583613

market/ipni/ipni-provider/spark.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ import (
2626

2727
func (p *Provider) updateSparkContract(ctx context.Context) error {
2828
for _, pInfo := range p.keys {
29-
pInfo := pInfo
29+
if pInfo.SPID <= 0 {
30+
log.Debugf("spark does not yet support pdp data")
31+
continue
32+
}
3033
mInfo, err := p.full.StateMinerInfo(ctx, pInfo.Miner, types.EmptyTSK)
3134
if err != nil {
3235
return err
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
[
2+
{
3+
"type": "function",
4+
"name": "getDataSetMetadata",
5+
"inputs": [
6+
{
7+
"name": "dataSetId",
8+
"type": "uint256",
9+
"internalType": "uint256"
10+
},
11+
{
12+
"name": "key",
13+
"type": "string",
14+
"internalType": "string"
15+
}
16+
],
17+
"outputs": [
18+
{
19+
"name": "exists",
20+
"type": "bool",
21+
"internalType": "bool"
22+
},
23+
{
24+
"name": "value",
25+
"type": "string",
26+
"internalType": "string"
27+
}
28+
],
29+
"stateMutability": "view"
30+
}
31+
]

0 commit comments

Comments
 (0)