Skip to content

Commit

Permalink
routing: support multiple namespaced MissionControls
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Aug 13, 2024
1 parent cb691cf commit e49da3e
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 46 deletions.
3 changes: 2 additions & 1 deletion routing/integrated_routing_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32,
mcMgr, err := NewMissionControlMgr(db, c.source.pubkey, &c.mcCfg)
require.NoError(c.t, err)

mc := mcMgr.GetDefaultStore()
mc, err := mcMgr.GetNamespacedStore(DefaultMissionControlNamespace)
require.NoError(c.t, err)

getBandwidthHints := func(_ Graph) (bandwidthHints, error) {
// Create bandwidth hints based on local channel balances.
Expand Down
133 changes: 101 additions & 32 deletions routing/missioncontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,13 @@ type MissionControl struct {
//
// NOTE: currently it only has a MissionControl in the default namespace.
type MissionControlManager struct {
db kvdb.Backend

defaultMCCfg *MissionControlConfig

*mcConfig

mc *MissionControl
mc map[string]*MissionControl
mu sync.Mutex

// TODO(roasbeef): further counters, if vertex continually unavailable,
Expand All @@ -144,12 +148,19 @@ type MissionControlManager struct {
// TODO(roasbeef): also add favorable metrics for nodes
}

// GetDefaultStore returns the MissionControl in the default namespace.
func (m *MissionControlManager) GetDefaultStore() *MissionControl {
// GetNamespacedStore returns the MissionControl in the given namespace. If one
// does not yet exist, then it is initialised.
func (m *MissionControlManager) GetNamespacedStore(ns string) (*MissionControl,
error) {

m.mu.Lock()
defer m.mu.Unlock()

return m.mc
if mc, ok := m.mc[ns]; ok {
return mc, nil
}

return m.initMissionControl(ns)
}

// MissionControlConfig defines parameters that control mission control
Expand Down Expand Up @@ -250,78 +261,136 @@ func NewMissionControlMgr(db kvdb.Backend, self route.Vertex,
return nil, err
}

mcCfg := &mcConfig{
now: time.Now,
selfNode: self,
}

mgr := &MissionControlManager{
db: db,
defaultMCCfg: cfg,
mcConfig: mcCfg,
mc: make(map[string]*MissionControl),
}

if err := mgr.loadMissionControls(); err != nil {
return nil, err
}

for _, mc := range mgr.mc {
if err := mc.init(); err != nil {
return nil, err
}
}

return mgr, nil
}

// loadMissionControls initialises a MissionControl in the default namespace if
// one does not yet exist. It then initialises a MissionControl for all other
// namespaces found in the DB.
func (m *MissionControlManager) loadMissionControls() error {
// Always initialise the default namespace.
_, err := m.initMissionControl(DefaultMissionControlNamespace)
if err != nil {
return err
}

return m.db.View(func(tx walletdb.ReadTx) error {
mcStoreBkt := tx.ReadBucket(resultsKey)
if mcStoreBkt == nil {
return fmt.Errorf("top level mission control bucket " +
"not found")
}

// Iterate through all the keys in bucket and initialise an
// in-memory mission control for each namespace.
return mcStoreBkt.ForEach(func(k, _ []byte) error {
_, err := m.initMissionControl(string(k))

return err
})
}, func() {})
}

// initMissionControl creates a new MissionControl instance with the given
// namespace if one does not yet exist.
func (m *MissionControlManager) initMissionControl(namespace string) (
*MissionControl, error) {

m.mu.Lock()
defer m.mu.Unlock()

// If a mission control with this namespace has already been initialised
// then there is nothing left to do.
if mc, ok := m.mc[namespace]; ok {
return mc, nil
}

cfg := m.defaultMCCfg

store, err := newMissionControlStore(
newDefaultNamespacedStore(db), cfg.MaxMcHistory,
newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
cfg.McFlushInterval,
)
if err != nil {
return nil, err
}

mcCfg := &mcConfig{
now: time.Now,
selfNode: self,
}

// Create a mission control in the default namespace.
defaultMC := &MissionControl{
mcConfig: mcCfg,
mc := &MissionControl{
mcConfig: m.mcConfig,
state: newMissionControlState(cfg.MinFailureRelaxInterval),
store: store,
estimator: cfg.Estimator,
log: build.NewPrefixLog(
fmt.Sprintf("[%s]:", DefaultMissionControlNamespace),
log,
fmt.Sprintf("[%s]:", namespace), log,
),
}

mc := &MissionControlManager{
mcConfig: mcCfg,
mc: defaultMC,
}

if err := mc.init(); err != nil {
return nil, err
}
m.mc[namespace] = mc

return mc, nil
}

// RunStoreTicker runs the mission control store's ticker.
func (m *MissionControlManager) RunStoreTicker() {
// RunStoreTickers runs the mission control store's tickers.
func (m *MissionControlManager) RunStoreTickers() {
m.mu.Lock()
defer m.mu.Unlock()

m.mc.store.run()
for _, mc := range m.mc {
mc.store.run()
}
}

// StopStoreTicker stops the mission control store's ticker.
func (m *MissionControlManager) StopStoreTicker() {
// StopStoreTickers stops the mission control store's tickers.
func (m *MissionControlManager) StopStoreTickers() {
log.Debug("Stopping mission control store ticker")
defer log.Debug("Mission control store ticker stopped")

m.mu.Lock()
defer m.mu.Unlock()

m.mc.store.stop()
for _, mc := range m.mc {
mc.store.stop()
}
}

// init initializes mission control with historical data.
func (m *MissionControlManager) init() error {
func (m *MissionControl) init() error {
log.Debugf("Mission control state reconstruction started")

m.mu.Lock()
defer m.mu.Unlock()

start := time.Now()

results, err := m.mc.store.fetchAll()
results, err := m.store.fetchAll()
if err != nil {
return err
}

for _, result := range results {
_ = m.mc.applyPaymentResult(result)
_ = m.applyPaymentResult(result)
}

log.Debugf("Mission control state reconstruction finished: "+
Expand Down
3 changes: 2 additions & 1 deletion routing/missioncontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func (ctx *mcTestContext) restartMc() {
}

mc.now = func() time.Time { return ctx.now }
ctx.mc = mc.GetDefaultStore()
ctx.mc, err = mc.GetNamespacedStore(DefaultMissionControlNamespace)
require.NoError(ctx.t, err)
}

// Assert that mission control returns a probability for an edge.
Expand Down
4 changes: 3 additions & 1 deletion routing/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
graphInstance.graphBackend, route.Vertex{}, mcConfig,
)
require.NoError(t, err, "failed to create missioncontrol")
mc := mcMgr.GetDefaultStore()

mc, err := mcMgr.GetNamespacedStore(DefaultMissionControlNamespace)
require.NoError(t, err)

sourceNode, err := graphInstance.graph.SourceNode()
require.NoError(t, err)
Expand Down
5 changes: 2 additions & 3 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
return info.NodeKey1Bytes, info.NodeKey2Bytes, nil
},
FindRoute: s.chanRouter.FindRoute,
MissionControl: s.missionControl.GetDefaultStore(),
MissionControl: s.defaultMC,
ActiveNetParams: r.cfg.ActiveNetParams.Params,
Tower: s.controlTower,
MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry,
Expand Down Expand Up @@ -5868,8 +5868,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context,

return r.server.chanRouter.FindBlindedPaths(
r.selfNode, amt,
r.server.missionControl.GetDefaultStore().
GetProbability,
r.server.defaultMC.GetProbability,
blindingRestrictions,
)
},
Expand Down
25 changes: 17 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ type server struct {

breachArbitrator *contractcourt.BreachArbitrator

missionControl *routing.MissionControlManager
missionControlMgr *routing.MissionControlManager
defaultMC *routing.MissionControl

graphBuilder *graph.Builder

Expand Down Expand Up @@ -933,11 +934,19 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
McFlushInterval: routingConfig.McFlushInterval,
MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
}
s.missionControl, err = routing.NewMissionControlMgr(
s.missionControlMgr, err = routing.NewMissionControlMgr(
dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg,
)
if err != nil {
return nil, fmt.Errorf("can't create mission control: %w", err)
return nil, fmt.Errorf("can't create mission control "+
"manager: %w", err)
}
s.defaultMC, err = s.missionControlMgr.GetNamespacedStore(
routing.DefaultMissionControlNamespace,
)
if err != nil {
return nil, fmt.Errorf("can't create mission control in the "+
"default namespace: %w", err)
}

srvrLog.Debugf("Instantiating payment session source with config: "+
Expand All @@ -963,7 +972,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
chanGraph,
),
SourceNode: sourceNode,
MissionControl: s.missionControl.GetDefaultStore(),
MissionControl: s.defaultMC,
GetLink: s.htlcSwitch.GetLinkByShortID,
PathFindingConfig: pathFindingConfig,
}
Expand Down Expand Up @@ -998,7 +1007,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
Chain: cc.ChainIO,
Payer: s.htlcSwitch,
Control: s.controlTower,
MissionControl: s.missionControl.GetDefaultStore(),
MissionControl: s.defaultMC,
SessionSource: paymentSessionSource,
GetLink: s.htlcSwitch.GetLinkByShortID,
NextPaymentID: sequencer.NextID,
Expand Down Expand Up @@ -2115,10 +2124,10 @@ func (s *server) Start() error {
}

cleanup.add(func() error {
s.missionControl.StopStoreTicker()
s.missionControlMgr.StopStoreTickers()
return nil
})
s.missionControl.RunStoreTicker()
s.missionControlMgr.RunStoreTickers()

// Before we start the connMgr, we'll check to see if we have
// any backups to recover. We do this now as we want to ensure
Expand Down Expand Up @@ -2392,7 +2401,7 @@ func (s *server) Stop() error {
srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
err)
}
s.missionControl.StopStoreTicker()
s.missionControlMgr.StopStoreTickers()

// Disconnect from each active peers to ensure that
// peerTerminationWatchers signal completion to each peer.
Expand Down

0 comments on commit e49da3e

Please sign in to comment.