Skip to content

Commit

Permalink
plumb through an endpoint to get a recent piece (#48)
Browse files Browse the repository at this point in the history
(of active deals, one of the ones that started in the most recent epoch
in which one started)
  • Loading branch information
willscott authored Oct 3, 2023
2 parents 95d1fed + bc869fb commit c3c32eb
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 0 deletions.
16 changes: 16 additions & 0 deletions api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ func (hf *heyFil) handleSPSubtree(w http.ResponseWriter, r *http.Request) {
} else {
hf.handleGetSP(w, id)
}
case 2:
id := segments[0]
subreq := segments[1]
if id == "" || subreq != "recentPiece" {
// Path does not contain SP ID.
http.Error(w, "SP ID must be specified as URL parameter", http.StatusBadRequest)
} else {
hf.recentPiecesMutex.RLock()
rp, ok := hf.recentPieces[id]
hf.recentPiecesMutex.RUnlock()
if !ok {
http.Error(w, "No piece found for provided id", http.StatusNotFound)
} else if err := json.NewEncoder(w).Encode(rp); err != nil {
logger.Errorw("Failed to encode piece id", "id", rp, "err", err)
}
}
default:
// Path has multiple segments and therefore 404
http.NotFound(w, r)
Expand Down
3 changes: 3 additions & 0 deletions fil_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type (
}
StateMarketDeal struct {
Proposal struct {
PieceCID struct {
CID string `json:"/"`
} `json:"PieceCID"`
Provider string `json:"Provider"`
StartEpoch int64 `json:"StartEpoch"`
EndEpoch int64 `json:"EndEpoch"`
Expand Down
18 changes: 18 additions & 0 deletions fil_deal_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,25 @@ func (ds *dealStats) refresh(ctx context.Context) error {
}

dealCountByParticipant := make(map[string]dealCounts)
latestPieceByParticipant := make(map[string]struct {
epoch int64
cid string
})
var totalDealCount, totalDealCountWithinWeek, totalDealCountWithinDay int64
for {
select {
case <-ctx.Done():
return ctx.Err()
case dr, ok := <-deals:
if !ok {
lp := make(recentPieces)
for k, v := range latestPieceByParticipant {
lp[k] = v.cid
}
ds.refreshLock.Lock()
ds.dealCountByParticipant = dealCountByParticipant
ds.refreshLock.Unlock()
ds.hf.storeRecentPieces(lp)
ds.hf.metrics.notifyDealCount(totalDealCount)
ds.hf.metrics.notifyDealCountWithinDay(totalDealCountWithinDay)
ds.hf.metrics.notifyDealCountWithinWeek(totalDealCountWithinWeek)
Expand Down Expand Up @@ -106,6 +115,15 @@ func (ds *dealStats) refresh(ctx context.Context) error {
}
dealCountByParticipant[provider] = providerDealCount
}
if lp, ok := latestPieceByParticipant[dr.Deal.Proposal.Provider]; !ok || dr.Deal.Proposal.StartEpoch > lp.epoch {
latestPieceByParticipant[dr.Deal.Proposal.Provider] = struct {
epoch int64
cid string
}{
epoch: dr.Deal.Proposal.StartEpoch,
cid: dr.Deal.Proposal.PieceCID.CID,
}
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions heyfil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type (
targetsMutex sync.RWMutex
targets map[string]*Target

recentPiecesMutex sync.RWMutex
recentPieces recentPieces

toCheck chan *Target
checked chan *Target

Expand Down Expand Up @@ -51,6 +54,9 @@ func (hf *heyFil) Start(ctx context.Context) error {
if err := hf.loadTargets(); err != nil {
logger.Warnw("Failed to load targets; continuing operation without pre-existing data.", "err", err)
}
if err := hf.loadRecentPieces(); err != nil {
logger.Warnw("Failed to load recent pieces; continuing operation without pre-existing data.", "err", err)
}
if err := hf.metrics.start(); err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ paths:
responses:
'200':
description: Returns allowed methods
/sp/{id}/recentPiece:
get:
summary: Get the CID of a recent piece that is held by a specific SP
responses:
'200':
description: The recent PieceID held by the requested SP
content:
application/json:
schema:
type: string
'404':
description: SP not found
/sp/{id}:
get:
summary: Get detailed information about a specific SP by its ID
Expand Down
47 changes: 47 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,50 @@ func (hf *heyFil) loadTargets() error {
return nil
})
}

type recentPieces map[string]string

func (hf *heyFil) storeRecentPieces(rp recentPieces) error {
if hf.storePath == "" || rp == nil {
return nil
}
dest, err := os.OpenFile(path.Join(hf.storePath, "recent.pieces"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
logger.Errorw("Failed to open store file for recent pieces", "err", err)
return err
}
defer dest.Close()
if err := json.NewEncoder(dest).Encode(rp); err != nil {
return err
}
hf.recentPiecesMutex.Lock()
hf.recentPieces = rp
hf.recentPiecesMutex.Unlock()
return nil
}

func (hf *heyFil) loadRecentPieces() error {
if hf.storePath == "" {
return nil
}

hf.recentPiecesMutex.Lock()
defer func() {
hf.recentPiecesMutex.Unlock()
logger.Infow("finished loading recent pieces")
}()

source, err := os.Open(path.Join(hf.storePath, "recent.pieces"))
if err != nil {
logger.Errorw("failed to open recent pieces", "err", err)
return nil
}

var target recentPieces
if err := json.NewDecoder(source).Decode(&target); err != nil {
logger.Errorw("failed to decode recent pieces", "err", err)
return nil
}
hf.recentPieces = target
return nil
}

0 comments on commit c3c32eb

Please sign in to comment.