From e49da3e993bd248db4881395699bd8ca960159dd Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 13 Aug 2024 19:19:06 +0200 Subject: [PATCH] routing: support multiple namespaced MissionControls --- routing/integrated_routing_context_test.go | 3 +- routing/missioncontrol.go | 133 ++++++++++++++++----- routing/missioncontrol_test.go | 3 +- routing/router_test.go | 4 +- rpcserver.go | 5 +- server.go | 25 ++-- 6 files changed, 127 insertions(+), 46 deletions(-) diff --git a/routing/integrated_routing_context_test.go b/routing/integrated_routing_context_test.go index cbf548111b..46b4971a5f 100644 --- a/routing/integrated_routing_context_test.go +++ b/routing/integrated_routing_context_test.go @@ -162,7 +162,8 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, mcMgr, err := NewMissionControlMgr(db, c.source.pubkey, &c.mcCfg) require.NoError(c.t, err) - mc := mcMgr.GetDefaultStore() + mc, err := mcMgr.GetNamespacedStore(DefaultMissionControlNamespace) + require.NoError(c.t, err) getBandwidthHints := func(_ Graph) (bandwidthHints, error) { // Create bandwidth hints based on local channel balances. diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 9921fad542..4b043bc99e 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -133,9 +133,13 @@ type MissionControl struct { // // NOTE: currently it only has a MissionControl in the default namespace. type MissionControlManager struct { + db kvdb.Backend + + defaultMCCfg *MissionControlConfig + *mcConfig - mc *MissionControl + mc map[string]*MissionControl mu sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, @@ -144,12 +148,19 @@ type MissionControlManager struct { // TODO(roasbeef): also add favorable metrics for nodes } -// GetDefaultStore returns the MissionControl in the default namespace. -func (m *MissionControlManager) GetDefaultStore() *MissionControl { +// GetNamespacedStore returns the MissionControl in the given namespace. If one +// does not yet exist, then it is initialised. +func (m *MissionControlManager) GetNamespacedStore(ns string) (*MissionControl, + error) { + m.mu.Lock() defer m.mu.Unlock() - return m.mc + if mc, ok := m.mc[ns]; ok { + return mc, nil + } + + return m.initMissionControl(ns) } // MissionControlConfig defines parameters that control mission control @@ -250,64 +261,122 @@ func NewMissionControlMgr(db kvdb.Backend, self route.Vertex, return nil, err } + mcCfg := &mcConfig{ + now: time.Now, + selfNode: self, + } + + mgr := &MissionControlManager{ + db: db, + defaultMCCfg: cfg, + mcConfig: mcCfg, + mc: make(map[string]*MissionControl), + } + + if err := mgr.loadMissionControls(); err != nil { + return nil, err + } + + for _, mc := range mgr.mc { + if err := mc.init(); err != nil { + return nil, err + } + } + + return mgr, nil +} + +// loadMissionControls initialises a MissionControl in the default namespace if +// one does not yet exist. It then initialises a MissionControl for all other +// namespaces found in the DB. +func (m *MissionControlManager) loadMissionControls() error { + // Always initialise the default namespace. + _, err := m.initMissionControl(DefaultMissionControlNamespace) + if err != nil { + return err + } + + return m.db.View(func(tx walletdb.ReadTx) error { + mcStoreBkt := tx.ReadBucket(resultsKey) + if mcStoreBkt == nil { + return fmt.Errorf("top level mission control bucket " + + "not found") + } + + // Iterate through all the keys in bucket and initialise an + // in-memory mission control for each namespace. + return mcStoreBkt.ForEach(func(k, _ []byte) error { + _, err := m.initMissionControl(string(k)) + + return err + }) + }, func() {}) +} + +// initMissionControl creates a new MissionControl instance with the given +// namespace if one does not yet exist. +func (m *MissionControlManager) initMissionControl(namespace string) ( + *MissionControl, error) { + + m.mu.Lock() + defer m.mu.Unlock() + + // If a mission control with this namespace has already been initialised + // then there is nothing left to do. + if mc, ok := m.mc[namespace]; ok { + return mc, nil + } + + cfg := m.defaultMCCfg + store, err := newMissionControlStore( - newDefaultNamespacedStore(db), cfg.MaxMcHistory, + newNamespacedDB(m.db, namespace), cfg.MaxMcHistory, cfg.McFlushInterval, ) if err != nil { return nil, err } - mcCfg := &mcConfig{ - now: time.Now, - selfNode: self, - } - - // Create a mission control in the default namespace. - defaultMC := &MissionControl{ - mcConfig: mcCfg, + mc := &MissionControl{ + mcConfig: m.mcConfig, state: newMissionControlState(cfg.MinFailureRelaxInterval), store: store, estimator: cfg.Estimator, log: build.NewPrefixLog( - fmt.Sprintf("[%s]:", DefaultMissionControlNamespace), - log, + fmt.Sprintf("[%s]:", namespace), log, ), } - mc := &MissionControlManager{ - mcConfig: mcCfg, - mc: defaultMC, - } - - if err := mc.init(); err != nil { - return nil, err - } + m.mc[namespace] = mc return mc, nil } -// RunStoreTicker runs the mission control store's ticker. -func (m *MissionControlManager) RunStoreTicker() { +// RunStoreTickers runs the mission control store's tickers. +func (m *MissionControlManager) RunStoreTickers() { m.mu.Lock() defer m.mu.Unlock() - m.mc.store.run() + for _, mc := range m.mc { + mc.store.run() + } } -// StopStoreTicker stops the mission control store's ticker. -func (m *MissionControlManager) StopStoreTicker() { +// StopStoreTickers stops the mission control store's tickers. +func (m *MissionControlManager) StopStoreTickers() { log.Debug("Stopping mission control store ticker") defer log.Debug("Mission control store ticker stopped") m.mu.Lock() defer m.mu.Unlock() - m.mc.store.stop() + for _, mc := range m.mc { + mc.store.stop() + } } // init initializes mission control with historical data. -func (m *MissionControlManager) init() error { +func (m *MissionControl) init() error { log.Debugf("Mission control state reconstruction started") m.mu.Lock() @@ -315,13 +384,13 @@ func (m *MissionControlManager) init() error { start := time.Now() - results, err := m.mc.store.fetchAll() + results, err := m.store.fetchAll() if err != nil { return err } for _, result := range results { - _ = m.mc.applyPaymentResult(result) + _ = m.applyPaymentResult(result) } log.Debugf("Mission control state reconstruction finished: "+ diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 2286702b7c..30dabb6283 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -108,7 +108,8 @@ func (ctx *mcTestContext) restartMc() { } mc.now = func() time.Time { return ctx.now } - ctx.mc = mc.GetDefaultStore() + ctx.mc, err = mc.GetNamespacedStore(DefaultMissionControlNamespace) + require.NoError(ctx.t, err) } // Assert that mission control returns a probability for an edge. diff --git a/routing/router_test.go b/routing/router_test.go index 94a27ec4f4..2b23928a04 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -132,7 +132,9 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, graphInstance.graphBackend, route.Vertex{}, mcConfig, ) require.NoError(t, err, "failed to create missioncontrol") - mc := mcMgr.GetDefaultStore() + + mc, err := mcMgr.GetNamespacedStore(DefaultMissionControlNamespace) + require.NoError(t, err) sourceNode, err := graphInstance.graph.SourceNode() require.NoError(t, err) diff --git a/rpcserver.go b/rpcserver.go index 9fba1fb60a..f814472768 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -716,7 +716,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, return info.NodeKey1Bytes, info.NodeKey2Bytes, nil }, FindRoute: s.chanRouter.FindRoute, - MissionControl: s.missionControl.GetDefaultStore(), + MissionControl: s.defaultMC, ActiveNetParams: r.cfg.ActiveNetParams.Params, Tower: s.controlTower, MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry, @@ -5868,8 +5868,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context, return r.server.chanRouter.FindBlindedPaths( r.selfNode, amt, - r.server.missionControl.GetDefaultStore(). - GetProbability, + r.server.defaultMC.GetProbability, blindingRestrictions, ) }, diff --git a/server.go b/server.go index 1a623c0f6b..128faf9446 100644 --- a/server.go +++ b/server.go @@ -271,7 +271,8 @@ type server struct { breachArbitrator *contractcourt.BreachArbitrator - missionControl *routing.MissionControlManager + missionControlMgr *routing.MissionControlManager + defaultMC *routing.MissionControl graphBuilder *graph.Builder @@ -933,11 +934,19 @@ func newServer(cfg *Config, listenAddrs []net.Addr, McFlushInterval: routingConfig.McFlushInterval, MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, } - s.missionControl, err = routing.NewMissionControlMgr( + s.missionControlMgr, err = routing.NewMissionControlMgr( dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg, ) if err != nil { - return nil, fmt.Errorf("can't create mission control: %w", err) + return nil, fmt.Errorf("can't create mission control "+ + "manager: %w", err) + } + s.defaultMC, err = s.missionControlMgr.GetNamespacedStore( + routing.DefaultMissionControlNamespace, + ) + if err != nil { + return nil, fmt.Errorf("can't create mission control in the "+ + "default namespace: %w", err) } srvrLog.Debugf("Instantiating payment session source with config: "+ @@ -963,7 +972,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanGraph, ), SourceNode: sourceNode, - MissionControl: s.missionControl.GetDefaultStore(), + MissionControl: s.defaultMC, GetLink: s.htlcSwitch.GetLinkByShortID, PathFindingConfig: pathFindingConfig, } @@ -998,7 +1007,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Chain: cc.ChainIO, Payer: s.htlcSwitch, Control: s.controlTower, - MissionControl: s.missionControl.GetDefaultStore(), + MissionControl: s.defaultMC, SessionSource: paymentSessionSource, GetLink: s.htlcSwitch.GetLinkByShortID, NextPaymentID: sequencer.NextID, @@ -2115,10 +2124,10 @@ func (s *server) Start() error { } cleanup.add(func() error { - s.missionControl.StopStoreTicker() + s.missionControlMgr.StopStoreTickers() return nil }) - s.missionControl.RunStoreTicker() + s.missionControlMgr.RunStoreTickers() // Before we start the connMgr, we'll check to see if we have // any backups to recover. We do this now as we want to ensure @@ -2392,7 +2401,7 @@ func (s *server) Stop() error { srvrLog.Warnf("Unable to stop ChannelEventStore: %v", err) } - s.missionControl.StopStoreTicker() + s.missionControlMgr.StopStoreTickers() // Disconnect from each active peers to ensure that // peerTerminationWatchers signal completion to each peer.