routing: support multiple namespaced MissionControls

This commit is contained in:
Elle Mouton
2024-08-13 19:19:06 +02:00
parent 5eff056933
commit 8126930d6c
6 changed files with 250 additions and 53 deletions

View File

@@ -168,7 +168,10 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32,
mcController, err := NewMissionController(db, c.source.pubkey, &c.mcCfg)
require.NoError(c.t, err)
mc := mcController.GetDefaultStore()
mc, err := mcController.GetNamespacedStore(
DefaultMissionControlNamespace,
)
require.NoError(c.t, err)
getBandwidthHints := func(_ Graph) (bandwidthHints, error) {
// Create bandwidth hints based on local channel balances.

View File

@@ -135,12 +135,12 @@ type MissionControl struct {
}
// MissionController manages MissionControl instances in various namespaces.
//
// NOTE: currently it only has a MissionControl in the default namespace.
type MissionController struct {
cfg *mcConfig
db kvdb.Backend
cfg *mcConfig
defaultMCCfg *MissionControlConfig
mc *MissionControl
mc map[string]*MissionControl
mu sync.Mutex
// TODO(roasbeef): further counters, if vertex continually unavailable,
@@ -149,12 +149,33 @@ type MissionController struct {
// TODO(roasbeef): also add favorable metrics for nodes
}
// GetDefaultStore returns the MissionControl in the default namespace.
func (m *MissionController) GetDefaultStore() *MissionControl {
// GetNamespacedStore returns the MissionControl in the given namespace. If one
// does not yet exist, then it is initialised.
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.mc
if mc, ok := m.mc[ns]; ok {
return mc, nil
}
return m.initMissionControl(ns)
}
// ListNamespaces returns a list of the namespaces that the MissionController
// is aware of.
func (m *MissionController) ListNamespaces() []string {
m.mu.Lock()
defer m.mu.Unlock()
namespaces := make([]string, 0, len(m.mc))
for ns := range m.mc {
namespaces = append(namespaces, ns)
}
return namespaces
}
// MissionControlConfig defines parameters that control mission control
@@ -259,61 +280,143 @@ func NewMissionController(db kvdb.Backend, self route.Vertex,
return nil, err
}
mcCfg := &mcConfig{
clock: clock.NewDefaultClock(),
selfNode: self,
}
mgr := &MissionController{
db: db,
defaultMCCfg: cfg,
cfg: mcCfg,
mc: make(map[string]*MissionControl),
}
if err := mgr.loadMissionControls(); err != nil {
return nil, err
}
for _, mc := range mgr.mc {
if err := mc.init(); err != nil {
return nil, err
}
}
return mgr, nil
}
// loadMissionControls initialises a MissionControl in the default namespace if
// one does not yet exist. It then initialises a MissionControl for all other
// namespaces found in the DB.
//
// NOTE: this should only be called once during MissionController construction.
func (m *MissionController) loadMissionControls() error {
m.mu.Lock()
defer m.mu.Unlock()
// Always initialise the default namespace.
_, err := m.initMissionControl(DefaultMissionControlNamespace)
if err != nil {
return err
}
namespaces := make(map[string]struct{})
err = m.db.View(func(tx walletdb.ReadTx) error {
mcStoreBkt := tx.ReadBucket(resultsKey)
if mcStoreBkt == nil {
return fmt.Errorf("top level mission control bucket " +
"not found")
}
// Iterate through all the keys in the bucket and collect the
// namespaces.
return mcStoreBkt.ForEach(func(k, _ []byte) error {
// We've already initialised the default namespace so
// we can skip it.
if string(k) == DefaultMissionControlNamespace {
return nil
}
namespaces[string(k)] = struct{}{}
return nil
})
}, func() {})
if err != nil {
return err
}
// Now, iterate through all the namespaces and initialise them.
for ns := range namespaces {
_, err = m.initMissionControl(ns)
if err != nil {
return err
}
}
return nil
}
// initMissionControl creates a new MissionControl instance with the given
// namespace if one does not yet exist.
//
// NOTE: the MissionController's mutex must be held before calling this method.
func (m *MissionController) initMissionControl(namespace string) (
*MissionControl, error) {
// If a mission control with this namespace has already been initialised
// then there is nothing left to do.
if mc, ok := m.mc[namespace]; ok {
return mc, nil
}
cfg := m.defaultMCCfg
store, err := newMissionControlStore(
newDefaultNamespacedStore(db), cfg.MaxMcHistory,
newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
cfg.McFlushInterval,
)
if err != nil {
return nil, err
}
mcCfg := &mcConfig{
clock: clock.NewDefaultClock(),
selfNode: self,
}
// Create a mission control in the default namespace.
defaultMC := &MissionControl{
cfg: mcCfg,
mc := &MissionControl{
cfg: m.cfg,
state: newMissionControlState(cfg.MinFailureRelaxInterval),
store: store,
estimator: cfg.Estimator,
log: build.NewPrefixLog(
fmt.Sprintf("[%s]:", DefaultMissionControlNamespace),
log,
fmt.Sprintf("[%s]:", namespace), log,
),
onConfigUpdate: cfg.OnConfigUpdate,
}
mc := &MissionController{
cfg: mcCfg,
mc: defaultMC,
}
if err := mc.mc.init(); err != nil {
return nil, err
}
m.mc[namespace] = mc
return mc, nil
}
// RunStoreTicker runs the mission control store's ticker.
func (m *MissionController) RunStoreTicker() {
// RunStoreTickers runs the mission controller store's tickers.
func (m *MissionController) RunStoreTickers() {
m.mu.Lock()
defer m.mu.Unlock()
m.mc.store.run()
for _, mc := range m.mc {
mc.store.run()
}
}
// StopStoreTicker stops the mission control store's ticker.
func (m *MissionController) StopStoreTicker() {
// StopStoreTickers stops the mission control store's tickers.
func (m *MissionController) StopStoreTickers() {
log.Debug("Stopping mission control store ticker")
defer log.Debug("Mission control store ticker stopped")
m.mu.Lock()
defer m.mu.Unlock()
m.mc.store.stop()
for _, mc := range m.mc {
mc.store.stop()
}
}
// init initializes mission control with historical data.

View File

@@ -41,10 +41,11 @@ var (
)
type mcTestContext struct {
t *testing.T
mc *MissionControl
t *testing.T
clock *testClock
mcController *MissionController
mc *MissionControl
clock *testClock
db kvdb.Backend
dbPath string
@@ -88,8 +89,10 @@ func createMcTestContext(t *testing.T) *mcTestContext {
func (ctx *mcTestContext) restartMc() {
// Since we don't run a timer to store results in unit tests, we store
// them here before fetching back everything in NewMissionController.
if ctx.mc != nil {
require.NoError(ctx.t, ctx.mc.store.storeResults())
if ctx.mcController != nil {
for _, mc := range ctx.mcController.mc {
require.NoError(ctx.t, mc.store.storeResults())
}
}
aCfg := AprioriConfig{
@@ -101,7 +104,7 @@ func (ctx *mcTestContext) restartMc() {
estimator, err := NewAprioriEstimator(aCfg)
require.NoError(ctx.t, err)
mc, err := NewMissionController(
ctx.mcController, err = NewMissionController(
ctx.db, mcTestSelf,
&MissionControlConfig{Estimator: estimator},
)
@@ -109,8 +112,18 @@ func (ctx *mcTestContext) restartMc() {
ctx.t.Fatal(err)
}
mc.cfg.clock = ctx.clock
ctx.mc = mc.GetDefaultStore()
ctx.mcController.cfg.clock = ctx.clock
// By default, select the default namespace.
ctx.setNamespacedMC(DefaultMissionControlNamespace)
}
// setNamespacedMC sets the currently selected MissionControl instance of the
// mcTextContext to the one with the given namespace.
func (ctx *mcTestContext) setNamespacedMC(namespace string) {
var err error
ctx.mc, err = ctx.mcController.GetNamespacedStore(namespace)
require.NoError(ctx.t, err)
}
// Assert that mission control returns a probability for an edge.
@@ -233,6 +246,71 @@ func TestMissionControlChannelUpdate(t *testing.T) {
ctx.expectP(100, 0)
}
// TestMissionControlNamespaces tests that the results reported to a
// MissionControl instance in one namespace does not affect the query results in
// another namespace.
func TestMissionControlNamespaces(t *testing.T) {
// Create a new MC context. This will select the default namespace
// MissionControl instance.
ctx := createMcTestContext(t)
// Initially, the controller should only be aware of the default
// namespace.
require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{
DefaultMissionControlNamespace,
})
// Initial probability is expected to be the apriori.
ctx.expectP(1000, testAprioriHopProbability)
// Expect probability to be zero after reporting the edge as failed.
ctx.reportFailure(1000, lnwire.NewTemporaryChannelFailure(nil))
ctx.expectP(1000, 0)
// Now, switch namespaces.
const newNs = "new-namespace"
ctx.setNamespacedMC(newNs)
// Now, the controller should only be aware of the default namespace and
// the new one.
require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{
DefaultMissionControlNamespace,
newNs,
})
// Since this new namespace has no idea about the reported failure, the
// expected probability should once again be the apriori probability.
ctx.expectP(1000, testAprioriHopProbability)
// Report a success in the new namespace.
ctx.reportSuccess()
// The probability of the pair should now have increased.
ctx.expectP(1000, testAprioriHopProbability+0.05)
// Switch back to the default namespace.
ctx.setNamespacedMC(DefaultMissionControlNamespace)
// The probability in the default namespace should still be zero.
ctx.expectP(1000, 0)
// We also want to test that the initial loading of the namespaces is
// done correctly. So let's reload the controller and assert that the
// probabilities in both namespaces remain the same after restart.
ctx.restartMc()
// Assert that both namespaces were loaded.
require.ElementsMatch(t, ctx.mcController.ListNamespaces(), []string{
DefaultMissionControlNamespace,
newNs,
})
// Assert that the probabilities in both namespaces remain unchanged.
ctx.expectP(1000, 0)
ctx.setNamespacedMC(newNs)
ctx.expectP(1000, testAprioriHopProbability+0.05)
}
// testClock is an implementation of clock.Clock that lets the caller overwrite
// the current time at any point.
type testClock struct {

View File

@@ -132,7 +132,11 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
graphInstance.graphBackend, route.Vertex{}, mcConfig,
)
require.NoError(t, err, "failed to create missioncontrol")
mc := mcController.GetDefaultStore()
mc, err := mcController.GetNamespacedStore(
DefaultMissionControlNamespace,
)
require.NoError(t, err)
sourceNode, err := graphInstance.graph.SourceNode()
require.NoError(t, err)

View File

@@ -729,7 +729,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
return info.NodeKey1Bytes, info.NodeKey2Bytes, nil
},
FindRoute: s.chanRouter.FindRoute,
MissionControl: s.missionControl.GetDefaultStore(),
MissionControl: s.defaultMC,
ActiveNetParams: r.cfg.ActiveNetParams.Params,
Tower: s.controlTower,
MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry,
@@ -6071,8 +6071,7 @@ func (r *rpcServer) AddInvoice(ctx context.Context,
return r.server.chanRouter.FindBlindedPaths(
r.selfNode, amt,
r.server.missionControl.GetDefaultStore().
GetProbability,
r.server.defaultMC.GetProbability,
blindingRestrictions,
)
},

View File

@@ -275,7 +275,8 @@ type server struct {
breachArbitrator *contractcourt.BreachArbitrator
missionControl *routing.MissionController
missionController *routing.MissionController
defaultMC *routing.MissionControl
graphBuilder *graph.Builder
@@ -955,11 +956,20 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
McFlushInterval: routingConfig.McFlushInterval,
MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
}
s.missionControl, err = routing.NewMissionController(
s.missionController, err = routing.NewMissionController(
dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg,
)
if err != nil {
return nil, fmt.Errorf("can't create mission control: %w", err)
return nil, fmt.Errorf("can't create mission control "+
"manager: %w", err)
}
s.defaultMC, err = s.missionController.GetNamespacedStore(
routing.DefaultMissionControlNamespace,
)
if err != nil {
return nil, fmt.Errorf("can't create mission control in the "+
"default namespace: %w", err)
}
srvrLog.Debugf("Instantiating payment session source with config: "+
@@ -985,7 +995,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
chanGraph,
),
SourceNode: sourceNode,
MissionControl: s.missionControl.GetDefaultStore(),
MissionControl: s.defaultMC,
GetLink: s.htlcSwitch.GetLinkByShortID,
PathFindingConfig: pathFindingConfig,
}
@@ -1020,7 +1030,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
Chain: cc.ChainIO,
Payer: s.htlcSwitch,
Control: s.controlTower,
MissionControl: s.missionControl.GetDefaultStore(),
MissionControl: s.defaultMC,
SessionSource: paymentSessionSource,
GetLink: s.htlcSwitch.GetLinkByShortID,
NextPaymentID: sequencer.NextID,
@@ -2182,10 +2192,10 @@ func (s *server) Start() error {
}
cleanup.add(func() error {
s.missionControl.StopStoreTicker()
s.missionController.StopStoreTickers()
return nil
})
s.missionControl.RunStoreTicker()
s.missionController.RunStoreTickers()
// Before we start the connMgr, we'll check to see if we have
// any backups to recover. We do this now as we want to ensure
@@ -2467,7 +2477,7 @@ func (s *server) Stop() error {
srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
err)
}
s.missionControl.StopStoreTicker()
s.missionController.StopStoreTickers()
// Disconnect from each active peers to ensure that
// peerTerminationWatchers signal completion to each peer.