Skip to content

Commit c18dbf2

Browse files
committed
combine pdp watchers for optimistic sequence create and add
Signed-off-by: Jakub Sztandera <[email protected]>
1 parent 3431b2d commit c18dbf2

File tree

4 files changed

+40
-36
lines changed

4 files changed

+40
-36
lines changed

cmd/curio/tasks/tasks.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
293293
es := getSenderEth()
294294
sdeps.EthSender = es
295295

296-
pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched)
297-
pdp.NewWatcherPieceAdd(db, must.One(dependencies.EthClient.Val()), chainSched)
296+
pdp.NewProofSetWatch(db, must.One(dependencies.EthClient.Val()), chainSched)
298297

299298
pdpProveTask := pdp.NewProveTask(chainSched, db, must.One(dependencies.EthClient.Val()), dependencies.Chain, es, dependencies.CachedPieceReader)
300299
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
@@ -512,7 +511,7 @@ func machineDetails(deps *deps.Deps, activeTasks []harmonytask.TaskInterface, ma
512511
})
513512
sort.Strings(miners)
514513

515-
_, err := deps.DB.Exec(context.Background(), `INSERT INTO harmony_machine_details
514+
_, err := deps.DB.Exec(context.Background(), `INSERT INTO harmony_machine_details
516515
(tasks, layers, startup_time, miners, machine_id, machine_name) VALUES ($1, $2, $3, $4, $5, $6)
517516
ON CONFLICT (machine_id) DO UPDATE SET tasks=$1, layers=$2, startup_time=$3, miners=$4, machine_id=$5, machine_name=$6`,
518517
strings.Join(taskNames, ","), strings.Join(deps.Layers, ","),

tasks/pdp/proofset_addroot_watch.go

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@ import (
1212
"golang.org/x/xerrors"
1313

1414
"github.com/filecoin-project/curio/harmony/harmonydb"
15-
"github.com/filecoin-project/curio/lib/chainsched"
1615
"github.com/filecoin-project/curio/pdp/contract"
17-
18-
chainTypes "github.com/filecoin-project/lotus/chain/types"
1916
)
2017

2118
// Structures to represent database records
@@ -37,21 +34,8 @@ type PieceAddEntry struct {
3734
AddMessageOK *bool `db:"add_message_ok"`
3835
}
3936

40-
// NewWatcherPieceAdd sets up the watcher for data set piece additions
41-
func NewWatcherPieceAdd(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chainsched.CurioChainSched) {
42-
if err := pcs.AddHandler(func(ctx context.Context, revert, apply *chainTypes.TipSet) error {
43-
err := processPendingDataSetPieceAdds(ctx, db, ethClient)
44-
if err != nil {
45-
log.Warnf("Failed to process pending data set piece adds: %v", err)
46-
}
47-
48-
return nil
49-
}); err != nil {
50-
panic(err)
51-
}
52-
}
53-
5437
// processPendingDataSetPieceAdds processes piece additions that have been confirmed on-chain
38+
// it is called from proofset_watch.go
5539
func processPendingDataSetPieceAdds(ctx context.Context, db *harmonydb.DB, ethClient *ethclient.Client) error {
5640
// Query for pdp_data_set_piece_adds entries where add_message_ok = TRUE
5741
var pieceAdds []DataSetPieceAdd
@@ -131,7 +115,9 @@ func extractAndInsertPiecesFromReceipt(ctx context.Context, db *harmonydb.DB, re
131115
return fmt.Errorf("failed to check if data set exists: %w", err)
132116
}
133117
if !exists {
134-
// XXX: maybe return nil instead to avoid warning?
118+
// this is a rare case where the transaction is marked as complete between create_watch being called and this function
119+
// if that happens, we return an error which will get logged and ignored
120+
// piece addition will get picked up in the next run of the watcher
135121
return fmt.Errorf("data set %d not found in pdp_data_sets", resolvedDataSetId.Int64)
136122
}
137123
}

tasks/pdp/proofset_create_watch.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,16 @@ import (
1212
"golang.org/x/xerrors"
1313

1414
"github.com/filecoin-project/curio/harmony/harmonydb"
15-
"github.com/filecoin-project/curio/lib/chainsched"
1615
"github.com/filecoin-project/curio/pdp/contract"
17-
18-
chainTypes "github.com/filecoin-project/lotus/chain/types"
1916
)
2017

2118
type DataSetCreate struct {
2219
CreateMessageHash string `db:"create_message_hash"`
2320
Service string `db:"service"`
2421
}
2522

26-
func NewWatcherCreate(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chainsched.CurioChainSched) {
27-
if err := pcs.AddHandler(func(ctx context.Context, revert, apply *chainTypes.TipSet) error {
28-
err := processPendingDataSetCreates(ctx, db, ethClient)
29-
if err != nil {
30-
log.Warnf("Failed to process pending data set creates: %v", err)
31-
}
32-
return nil
33-
}); err != nil {
34-
panic(err)
35-
}
36-
}
37-
23+
// processPendingDataSetCreates finalises data set creation best on transactions logs
24+
// it is called from proofset_watch.go
3825
func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClient *ethclient.Client) error {
3926
// Query for pdp_data_set_creates entries where ok = TRUE and data_set_created = FALSE
4027
var dataSetCreates []DataSetCreate

tasks/pdp/proofset_watch.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package pdp
2+
3+
import (
4+
"context"
5+
6+
"github.com/ethereum/go-ethereum/ethclient"
7+
8+
"github.com/filecoin-project/curio/harmony/harmonydb"
9+
"github.com/filecoin-project/curio/lib/chainsched"
10+
11+
chainTypes "github.com/filecoin-project/lotus/chain/types"
12+
)
13+
14+
// NewProofSetWatch runes processing steps for proofset creation and piece addtion
15+
// These two are run in sequence to allow for combined create-and-add flow to first
16+
// create the proofset, then add the pieces to it.
17+
func NewProofSetWatch(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chainsched.CurioChainSched) {
18+
if err := pcs.AddHandler(func(ctx context.Context, revert, apply *chainTypes.TipSet) error {
19+
err := processPendingDataSetCreates(ctx, db, ethClient)
20+
if err != nil {
21+
log.Warnf("Failed to process pending data set creates: %v", err)
22+
}
23+
24+
err = processPendingDataSetPieceAdds(ctx, db, ethClient)
25+
if err != nil {
26+
log.Warnf("Failed to process pending data set piece adds: %v", err)
27+
}
28+
return nil
29+
}); err != nil {
30+
panic(err)
31+
}
32+
}

0 commit comments

Comments
 (0)