Skip to content

Commit 1fedd16

Browse files
Kubuxurvagg
authored andcommitted
feat(pdp): support create and add endpoint (#692)
* feat(pdp): support create and upload Signed-off-by: Jakub Sztandera <[email protected]> * use nullable data_set id in pdp_data_set_piece_adds to bypass FK Signed-off-by: Jakub Sztandera <[email protected]> * combine pdp watchers for optimistic sequence create and add Signed-off-by: Jakub Sztandera <[email protected]> * Rename the migration file to trigger migration Signed-off-by: Jakub Sztandera <[email protected]> * Check error for insertPieceAdds in combined flow Signed-off-by: Jakub Sztandera <[email protected]> * pieceAddHandler: Remove WHERE dataSet = $id because it doesn't work with NULL Signed-off-by: Jakub Sztandera <[email protected]> * lint clean up and overwrite possible bad entry.DataSet Signed-off-by: Jakub Sztandera <[email protected]> * use decodeExtraData Signed-off-by: Jakub Sztandera <[email protected]> * log when transformAddPiecesRequest fails Signed-off-by: Jakub Sztandera <[email protected]> * Rename proofset -> data set in new files Signed-off-by: Jakub Sztandera <[email protected]> --------- Signed-off-by: Jakub Sztandera <[email protected]>
1 parent ae74c86 commit 1fedd16

File tree

8 files changed

+517
-310
lines changed

8 files changed

+517
-310
lines changed

cmd/curio/tasks/tasks.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
293293
es := getSenderEth()
294294
sdeps.EthSender = es
295295

296-
pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched)
297-
pdp.NewWatcherPieceAdd(db, must.One(dependencies.EthClient.Val()), chainSched)
296+
pdp.NewDataSetWatch(db, must.One(dependencies.EthClient.Val()), chainSched)
298297

299298
pdpProveTask := pdp.NewProveTask(chainSched, db, must.One(dependencies.EthClient.Val()), dependencies.Chain, es, dependencies.CachedPieceReader)
300299
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
@@ -512,7 +511,7 @@ func machineDetails(deps *deps.Deps, activeTasks []harmonytask.TaskInterface, ma
512511
})
513512
sort.Strings(miners)
514513

515-
_, err := deps.DB.Exec(context.Background(), `INSERT INTO harmony_machine_details
514+
_, err := deps.DB.Exec(context.Background(), `INSERT INTO harmony_machine_details
516515
(tasks, layers, startup_time, miners, machine_id, machine_name) VALUES ($1, $2, $3, $4, $5, $6)
517516
ON CONFLICT (machine_id) DO UPDATE SET tasks=$1, layers=$2, startup_time=$3, miners=$4, machine_id=$5, machine_name=$6`,
518517
strings.Join(taskNames, ","), strings.Join(deps.Layers, ","),
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
-- Changes the data_set column to be nullable in the pdp_data_set_piece_adds table to faciliate create-and-add workflow.
2+
-- Combined migration: make `data_set` nullable and adjust PK
3+
-- New primary key: (add_message_hash HASH, add_message_index ASC)
4+
-- Old primary key: (data_set HASH, add_message_hash ASC, add_message_index ASC)
5+
6+
DO $$
7+
BEGIN
8+
-- Step 1: Drop existing PK if it still uses data_set
9+
IF EXISTS (
10+
SELECT 1
11+
FROM pg_constraint
12+
WHERE conname = 'pdp_data_set_piece_adds_pk'
13+
AND conrelid = 'pdp_data_set_piece_adds'::regclass
14+
) THEN
15+
ALTER TABLE pdp_data_set_piece_adds
16+
DROP CONSTRAINT pdp_data_set_piece_adds_pk;
17+
END IF;
18+
19+
-- Step 2: Create new PK with add_message_hash as HASH key
20+
ALTER TABLE pdp_data_set_piece_adds
21+
ADD CONSTRAINT pdp_data_set_piece_adds_pk
22+
PRIMARY KEY (add_message_hash HASH, add_message_index ASC);
23+
24+
-- Step 3: Make `data_set` nullable if it is currently NOT NULL
25+
IF EXISTS (
26+
SELECT 1
27+
FROM information_schema.columns
28+
WHERE table_name = 'pdp_data_set_piece_adds'
29+
AND column_name = 'data_set'
30+
AND is_nullable = 'NO'
31+
) THEN
32+
ALTER TABLE pdp_data_set_piece_adds
33+
ALTER COLUMN data_set DROP NOT NULL;
34+
END IF;
35+
END $$;

pdp/handlers.go

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ func Routes(r *chi.Mux, p *PDPService) {
7474
// POST /pdp/data-sets - Create a new data set
7575
r.Post("/", p.handleCreateDataSet)
7676

77+
// POST /pdp/data-sets/create-and-add - Create a new data set and add pieces at the same time
78+
r.Post("/create-and-add", p.handleCreateDataSetAndAddPieces)
79+
7780
// GET /pdp/data-sets/created/{txHash} - Get the status of a data set creation
7881
r.Get("/created/{txHash}", p.handleGetDataSetCreationStatus)
7982

@@ -289,53 +292,6 @@ func (p *PDPService) getSenderAddress(ctx context.Context) (common.Address, erro
289292
return address, nil
290293
}
291294

292-
// insertMessageWaitsAndDataSetCreate inserts records into message_waits_eth and pdp_data_set_creates
293-
func (p *PDPService) insertMessageWaitsAndDataSetCreate(ctx context.Context, txHashHex string, serviceLabel string) error {
294-
// Begin a database transaction
295-
_, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
296-
// Insert into message_waits_eth
297-
log.Debugw("Inserting into message_waits_eth",
298-
"txHash", txHashHex,
299-
"status", "pending")
300-
_, err := tx.Exec(`
301-
INSERT INTO message_waits_eth (signed_tx_hash, tx_status)
302-
VALUES ($1, $2)
303-
`, txHashHex, "pending")
304-
if err != nil {
305-
log.Errorw("Failed to insert into message_waits_eth",
306-
"txHash", txHashHex,
307-
"error", err)
308-
return false, err // Return false to rollback the transaction
309-
}
310-
311-
// Insert into pdp_data_set_creates
312-
log.Debugw("Inserting into pdp_data_set_creates",
313-
"txHash", txHashHex,
314-
"service", serviceLabel)
315-
_, err = tx.Exec(`
316-
INSERT INTO pdp_data_set_creates (create_message_hash, service)
317-
VALUES ($1, $2)
318-
`, txHashHex, serviceLabel)
319-
if err != nil {
320-
log.Errorw("Failed to insert into pdp_data_set_creates",
321-
"txHash", txHashHex,
322-
"error", err)
323-
return false, err // Return false to rollback the transaction
324-
}
325-
326-
log.Infow("Successfully inserted orphaned transaction for watching",
327-
"txHash", txHashHex,
328-
"service", serviceLabel,
329-
"waiter_machine_id", "NULL")
330-
// Return true to commit the transaction
331-
return true, nil
332-
}, harmonydb.OptionRetry())
333-
if err != nil {
334-
return err
335-
}
336-
return nil
337-
}
338-
339295
// handleGetDataSetCreationStatus handles the GET request to retrieve the status of a data set creation
340296
func (p *PDPService) handleGetDataSetCreationStatus(w http.ResponseWriter, r *http.Request) {
341297
ctx := r.Context()

0 commit comments

Comments
 (0)