Skip to content

Commit 960c46b

Browse files
authored
feat: streaming upload (#668)
* streaming upload * fix test * fix return * switch to commPv1
1 parent bc716f4 commit 960c46b

File tree

7 files changed

+655
-5
lines changed

7 files changed

+655
-5
lines changed

cmd/pdptool/main.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/minio/sha256-simd"
2626
"github.com/schollz/progressbar/v3"
2727
"github.com/urfave/cli/v2"
28+
"golang.org/x/sync/errgroup"
2829

2930
"github.com/filecoin-project/go-commp-utils/nonffi"
3031
commcid "github.com/filecoin-project/go-fil-commcid"
@@ -66,6 +67,8 @@ func main() {
6667
uploadFileCmd, // upload a file to a pdp service in many chunks
6768
downloadFileCmd, // download a file from curio
6869

70+
streamingPieceUploadCmd, // upload a piece to a pdp service in streaming mode
71+
6972
createDataSetCmd, // create a new data set on the PDP service
7073
getDataSetStatusCmd, // get the status of a data set creation on the PDP service
7174
getDataSetCmd, // retrieve the details of a data set from the PDP service
@@ -1505,3 +1508,255 @@ var removePiecesCmd = &cli.Command{
15051508
return nil
15061509
},
15071510
}
1511+
1512+
var streamingPieceUploadCmd = &cli.Command{
1513+
Name: "upload",
1514+
Usage: "Upload a piece to a PDP service",
1515+
ArgsUsage: "<input-file>",
1516+
Flags: []cli.Flag{
1517+
&cli.StringFlag{
1518+
Name: "service-url",
1519+
Usage: "URL of the PDP service",
1520+
Required: true,
1521+
},
1522+
&cli.StringFlag{
1523+
Name: "jwt-token",
1524+
Usage: "JWT token for authentication (optional if --service-name is provided)",
1525+
},
1526+
&cli.StringFlag{
1527+
Name: "service-name",
1528+
Usage: "Service Name to include in the JWT token (used if --jwt-token is not provided)",
1529+
},
1530+
&cli.StringFlag{
1531+
Name: "notify-url",
1532+
Usage: "Notification URL",
1533+
Required: false,
1534+
},
1535+
&cli.StringFlag{
1536+
Name: "hash-type",
1537+
Usage: "Hash type to use for verification (sha256 or commp)",
1538+
Value: "commp",
1539+
},
1540+
&cli.BoolFlag{
1541+
Name: "local-notif-wait",
1542+
Usage: "Wait for server notification by spawning a temporary local HTTP server",
1543+
},
1544+
},
1545+
Action: func(cctx *cli.Context) error {
1546+
inputFile := cctx.Args().Get(0)
1547+
if inputFile == "" {
1548+
return fmt.Errorf("input file is required")
1549+
}
1550+
1551+
serviceURL := cctx.String("service-url")
1552+
jwtToken := cctx.String("jwt-token")
1553+
notifyURL := cctx.String("notify-url")
1554+
serviceName := cctx.String("service-name")
1555+
hashType := cctx.String("hash-type")
1556+
localNotifWait := cctx.Bool("local-notif-wait")
1557+
1558+
if jwtToken == "" {
1559+
if serviceName == "" {
1560+
return fmt.Errorf("either --jwt-token or --service-name must be provided")
1561+
}
1562+
var err error
1563+
jwtToken, err = getJWTTokenForService(serviceName)
1564+
if err != nil {
1565+
return err
1566+
}
1567+
}
1568+
1569+
if hashType != "sha256" && hashType != "commp" {
1570+
return fmt.Errorf("invalid hash type: %s", hashType)
1571+
}
1572+
1573+
if localNotifWait && notifyURL != "" {
1574+
return fmt.Errorf("cannot specify both --notify-url and --local-notif-wait")
1575+
}
1576+
1577+
var notifyReceived chan struct{}
1578+
var err error
1579+
1580+
if localNotifWait {
1581+
notifyURL, notifyReceived, err = startLocalNotifyServer()
1582+
if err != nil {
1583+
return fmt.Errorf("failed to start local HTTP server: %v", err)
1584+
}
1585+
}
1586+
1587+
// Open the input file
1588+
file, err := os.Open(inputFile)
1589+
if err != nil {
1590+
return fmt.Errorf("failed to open input file: %v", err)
1591+
}
1592+
defer func() {
1593+
_ = file.Close()
1594+
}()
1595+
1596+
// Get the piece size
1597+
fi, err := file.Stat()
1598+
if err != nil {
1599+
return fmt.Errorf("failed to stat input file: %v", err)
1600+
}
1601+
raw_size := fi.Size()
1602+
1603+
client := &http.Client{}
1604+
1605+
req, err := http.NewRequest("GET", serviceURL+"/pdp/piece/uploads", nil)
1606+
if err != nil {
1607+
return fmt.Errorf("failed to create upload request: %v", err)
1608+
}
1609+
if jwtToken != "" {
1610+
req.Header.Set("Authorization", "Bearer "+jwtToken)
1611+
}
1612+
req.Header.Set("Content-Type", "application/json")
1613+
1614+
resp, err := client.Do(req)
1615+
if err != nil {
1616+
return fmt.Errorf("failed to send request: %v", err)
1617+
}
1618+
defer func() {
1619+
_ = resp.Body.Close()
1620+
}()
1621+
1622+
if resp.StatusCode != http.StatusCreated {
1623+
ret, err := io.ReadAll(resp.Body)
1624+
if err != nil {
1625+
return fmt.Errorf("failed to get upload URL, status code %d, failed to read body %s", resp.StatusCode, err.Error())
1626+
}
1627+
return fmt.Errorf("failed to create upload, status code %d: %s", resp.StatusCode, string(ret))
1628+
}
1629+
1630+
location := resp.Header.Get("Location")
1631+
if location == "" {
1632+
return fmt.Errorf("failed to get upload URL, status code %d, no Location header", resp.StatusCode)
1633+
}
1634+
1635+
cp := commp.Calc{}
1636+
1637+
pipeReader, pipeWriter := io.Pipe()
1638+
1639+
// Set up a MultiWriter to write to both cp and the pipe
1640+
multiWriter := io.MultiWriter(&cp, pipeWriter)
1641+
1642+
// Create an error group to handle goroutines
1643+
var g errgroup.Group
1644+
1645+
// Start goroutine to read the file and write to the MultiWriter
1646+
g.Go(func() error {
1647+
defer func() {
1648+
_ = pipeWriter.Close() // Ensure the pipeWriter is closed
1649+
}()
1650+
n, err := io.Copy(multiWriter, file)
1651+
if err != nil {
1652+
return fmt.Errorf("failed to copy data to multiwriter: %v", err)
1653+
}
1654+
if n != raw_size {
1655+
return fmt.Errorf("failed to copy all data to multiwriter, only copied %d/%d bytes", n, raw_size)
1656+
}
1657+
return nil
1658+
})
1659+
1660+
// Start a goroutine to handle the HTTP request
1661+
g.Go(func() error {
1662+
defer func() {
1663+
_ = pipeReader.Close() // Ensure the pipeReader is closed
1664+
}()
1665+
// Prepare the HTTP request for file upload
1666+
req, err := http.NewRequest("PUT", serviceURL+location, pipeReader)
1667+
if err != nil {
1668+
return fmt.Errorf("failed to create upload request: %v", err)
1669+
}
1670+
if jwtToken != "" {
1671+
req.Header.Set("Authorization", "Bearer "+jwtToken)
1672+
}
1673+
req.Header.Set("Content-Type", "application/octet-stream")
1674+
1675+
// Execute the request
1676+
resp, err := client.Do(req)
1677+
if err != nil {
1678+
return fmt.Errorf("failed to send upload request: %v", err)
1679+
}
1680+
defer func() {
1681+
_ = resp.Body.Close()
1682+
}()
1683+
1684+
if resp.StatusCode != http.StatusNoContent {
1685+
ret, err := io.ReadAll(resp.Body)
1686+
if err != nil {
1687+
return fmt.Errorf("failed to upload, status code %d, failed to read body %s", resp.StatusCode, err.Error())
1688+
}
1689+
return fmt.Errorf("upload failed, status code %d: %s", resp.StatusCode, string(ret))
1690+
}
1691+
return nil
1692+
})
1693+
1694+
// Wait for all goroutines to complete
1695+
if err := g.Wait(); err != nil {
1696+
return fmt.Errorf("upload process failed: %v", err)
1697+
}
1698+
1699+
digest, _, err := cp.Digest()
1700+
if err != nil {
1701+
return fmt.Errorf("failed to calculate digest: %v", err)
1702+
}
1703+
1704+
pcid2, err := commcid.DataCommitmentToPieceCidv2(digest, uint64(raw_size))
1705+
if err != nil {
1706+
return fmt.Errorf("failed to compute piece CID: %v", err)
1707+
}
1708+
1709+
// At this point, the commp calculation is complete
1710+
fmt.Printf("CommP: %s\n", pcid2.String())
1711+
1712+
type finalize struct {
1713+
PieceCID string `json:"pieceCid"`
1714+
Notify string `json:"notify,omitempty"`
1715+
}
1716+
1717+
bd := finalize{
1718+
PieceCID: pcid2.String(),
1719+
}
1720+
1721+
if notifyURL != "" {
1722+
bd.Notify = notifyURL
1723+
}
1724+
1725+
bodyBytes, err := json.Marshal(bd)
1726+
if err != nil {
1727+
return fmt.Errorf("failed to marshal finalize request body: %v", err)
1728+
}
1729+
1730+
req, err = http.NewRequest("POST", serviceURL+location, bytes.NewBuffer(bodyBytes))
1731+
if err != nil {
1732+
return fmt.Errorf("failed to create finalize request: %v", err)
1733+
}
1734+
if jwtToken != "" {
1735+
req.Header.Set("Authorization", "Bearer "+jwtToken)
1736+
}
1737+
req.Header.Set("Content-Type", "application/json")
1738+
1739+
resp, err = client.Do(req)
1740+
if err != nil {
1741+
return fmt.Errorf("failed to send finalize request: %v", err)
1742+
}
1743+
defer func() {
1744+
_ = resp.Body.Close()
1745+
}()
1746+
1747+
if resp.StatusCode != http.StatusOK {
1748+
ret, err := io.ReadAll(resp.Body)
1749+
if err != nil {
1750+
return fmt.Errorf("failed to finalize, status code %d, failed to read body %s", resp.StatusCode, err.Error())
1751+
}
1752+
return fmt.Errorf("failed to finalize, status code %d: %s", resp.StatusCode, string(ret))
1753+
}
1754+
1755+
fmt.Println("Piece uploaded successfully.")
1756+
if localNotifWait {
1757+
fmt.Println("Waiting for server notification...")
1758+
<-notifyReceived
1759+
}
1760+
return nil
1761+
},
1762+
}

cuhttp/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ func attachRouters(ctx context.Context, r *chi.Mux, d *deps.Deps, sd *ServiceDep
295295
libp2p.Router(r, rd)
296296

297297
if sd.EthSender != nil {
298-
pdsvc := pdp.NewPDPService(d.DB, d.LocalStore, must.One(d.EthClient.Get()), d.Chain, sd.EthSender)
298+
pdsvc := pdp.NewPDPService(ctx, d.DB, d.LocalStore, must.One(d.EthClient.Get()), d.Chain, sd.EthSender)
299299
pdp.Routes(r, pdsvc)
300300
}
301301

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
CREATE TABLE pdp_piece_streaming_uploads (
2+
id UUID PRIMARY KEY NOT NULL,
3+
service TEXT NOT NULL, -- pdp_services.id
4+
5+
piece_cid TEXT, -- piece cid v1
6+
piece_size BIGINT,
7+
raw_size BIGINT,
8+
9+
piece_ref BIGINT, -- packed_piece_refs.ref_id
10+
11+
created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()),
12+
complete bool,
13+
completed_at TIMESTAMPTZ
14+
);

lib/proof/merkle_sha254_memtree.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/filecoin-project/lotus/storage/sealer/fr32"
1313
)
1414

15-
const MaxMemtreeSize = 256 << 20
15+
const MaxMemtreeSize = 1 << 30
1616

1717
// BuildSha254Memtree builds a sha256 memtree from the input data
1818
// Returned slice should be released to the pool after use

pdp/handlers.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"path"
1313
"strconv"
1414
"strings"
15+
"time"
1516

1617
"github.com/ethereum/go-ethereum/common"
1718
"github.com/ethereum/go-ethereum/core/types"
@@ -52,8 +53,8 @@ type PDPServiceNodeApi interface {
5253
}
5354

5455
// NewPDPService creates a new instance of PDPService with the provided stores
55-
func NewPDPService(db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService {
56-
return &PDPService{
56+
func NewPDPService(ctx context.Context, db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService {
57+
p := &PDPService{
5758
Auth: &NullAuth{},
5859
db: db,
5960
storage: stor,
@@ -62,6 +63,9 @@ func NewPDPService(db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client
6263
ethClient: ec,
6364
filClient: fc,
6465
}
66+
67+
go p.cleanup(ctx)
68+
return p
6569
}
6670

6771
// Routes registers the HTTP routes with the provided router
@@ -113,6 +117,15 @@ func Routes(r *chi.Mux, p *PDPService) {
113117

114118
// PUT /pdp/piece/upload/{uploadUUID}
115119
r.Put(path.Join(PDPRoutePath, "/piece/upload/{uploadUUID}"), p.handlePieceUpload)
120+
121+
// POST /pdp/piece/uploads
122+
r.Post(path.Join(PDPRoutePath, "/piece/uploads"), p.handleStreamingUploadURL)
123+
124+
// PUT /pdp/piece/uploads/{uploadUUID}
125+
r.Put(path.Join(PDPRoutePath, "/piece/uploads/{uploadUUID}"), p.handleStreamingUpload)
126+
127+
// POST /pdp/piece/uploads/{uploadUUID}
128+
r.Post(path.Join(PDPRoutePath, "/piece/uploads/{uploadUUID}"), p.handleFinalizeStreamingUpload)
116129
}
117130

118131
// Handler functions
@@ -908,6 +921,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
908921
}
909922
if height > 50 {
910923
http.Error(w, "Invalid height", http.StatusBadRequest)
924+
return
911925
}
912926

913927
// Get raw size by summing up the sizes of subPieces
@@ -1590,3 +1604,36 @@ func asPieceCIDv2(cidStr string, size uint64) (cid.Cid, uint64, error) {
15901604
return cid.Undef, 0, fmt.Errorf("unsupported piece CID type: %d", pieceCid.Prefix().MhType)
15911605
}
15921606
}
1607+
1608+
func (p *PDPService) cleanup(ctx context.Context) {
1609+
rm := func(ctx context.Context, db *harmonydb.DB) {
1610+
1611+
var RefIDs []int64
1612+
1613+
err := db.QueryRow(ctx, `SELECT COALESCE(array_agg(ref_id), '{}') AS ref_ids
1614+
FROM pdp_piece_streaming_uploads
1615+
WHERE complete = TRUE
1616+
AND completed_at <= TIMEZONE('UTC', NOW()) - INTERVAL '60 minutes';`).Scan(&RefIDs)
1617+
if err != nil {
1618+
log.Errorw("failed to get non-finalized uploads", "error", err)
1619+
}
1620+
1621+
if len(RefIDs) > 0 {
1622+
_, err := db.Exec(ctx, `DELETE FROM parked_piece_refs WHERE ref_id = ANY($1);`, RefIDs)
1623+
if err != nil {
1624+
log.Errorw("failed to delete non-finalized uploads", "error", err)
1625+
}
1626+
}
1627+
}
1628+
1629+
ticker := time.NewTicker(time.Minute * 5)
1630+
defer ticker.Stop()
1631+
for {
1632+
select {
1633+
case <-ticker.C:
1634+
rm(ctx, p.db)
1635+
case <-ctx.Done():
1636+
return
1637+
}
1638+
}
1639+
}

0 commit comments

Comments
 (0)