-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsession.go
76 lines (69 loc) · 2.19 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package w3rc
import (
"context"
"fmt"
"github.com/ipfs-shipyard/w3rc/contentrouting"
"github.com/ipfs-shipyard/w3rc/exchange"
"github.com/ipfs-shipyard/w3rc/planning"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
)
type simpleSession struct {
ls ipld.LinkSystem
router contentrouting.Routing
scheduler planning.Scheduler
mux *exchange.ExchangeMux
}
func (s *simpleSession) Get(ctx context.Context, root cid.Cid, selector datamodel.Node) (ipld.Node, error) {
getCtx, cancel := context.WithCancel(ctx)
defer cancel()
records := s.router.FindProviders(getCtx, root)
plan := s.scheduler.Schedule(getCtx, root, selector, records)
work := s.mux.Subscribe()
workDone := false
for !workDone {
select {
case nextPlan, more := <-plan:
if !more {
return nil, fmt.Errorf("no provider found")
}
if nextPlan.Error != nil {
log.Warnf("planning error: %s\n", nextPlan.Error)
continue
}
for _, tr := range nextPlan.TransportRequests {
s.scheduler.Begin(tr)
if err := s.mux.Add(getCtx, tr); err != nil {
s.scheduler.Reconcile(tr, false)
log.Warnf("could not honor transport req: %s\n", err)
continue
}
}
case transportEvent, more := <-work:
if transportEvent.Event == exchange.ErrorEvent {
log.Warnf("error in transport: %s\n", transportEvent.State)
continue
}
if transportEvent.State == exchange.FailureEvent {
s.scheduler.Reconcile(transportEvent.Source, false)
} else if transportEvent.State == exchange.SuccessEvent {
s.scheduler.Reconcile(transportEvent.Source, true)
}
if !more {
// if we have everything, things are good.
link := cidlink.Link{Cid: root}
// TODO: this isn't enough to know that we've actually loaded the full selector, just that
// the mux has ended without an error. we need to know we can load the full requested selector
// to conclude that work is done.
return s.ls.Load(ipld.LinkContext{Ctx: getCtx}, link, basicnode.Prototype.Any)
}
}
}
return nil, nil
}
func (s *simpleSession) Close() error {
return nil
}