From 6d80ddfe911dd696a3ca696898d751837a7653f2 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Fri, 25 Jun 2021 22:22:12 +0200 Subject: [PATCH 1/3] routing: store missioncontrol state in blocks and eliminate cursor use This commit changes missioncontrol's store update from per payment to every second. Updating the missioncontrol store on every payment caused gradual slowdown when using etcd. We also completely eliminate the use of the cursor, further reducing the performance bottleneck. --- lnrpc/routerrpc/config.go | 2 + lnrpc/routerrpc/routing_config.go | 4 + routing/missioncontrol.go | 27 +++- routing/missioncontrol_store.go | 196 +++++++++++++++++++++------ routing/missioncontrol_store_test.go | 31 ++--- routing/missioncontrol_test.go | 7 + server.go | 8 ++ 7 files changed, 214 insertions(+), 61 deletions(-) diff --git a/lnrpc/routerrpc/config.go b/lnrpc/routerrpc/config.go index 1d1a6f8d4..ec036258f 100644 --- a/lnrpc/routerrpc/config.go +++ b/lnrpc/routerrpc/config.go @@ -49,6 +49,7 @@ func DefaultConfig() *Config { AttemptCost: routing.DefaultAttemptCost.ToSatoshis(), AttemptCostPPM: routing.DefaultAttemptCostPPM, MaxMcHistory: routing.DefaultMaxMcHistory, + McFlushInterval: routing.DefaultMcFlushInterval, } return &Config{ @@ -66,5 +67,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig { AttemptCostPPM: cfg.AttemptCostPPM, PenaltyHalfLife: cfg.PenaltyHalfLife, MaxMcHistory: cfg.MaxMcHistory, + McFlushInterval: cfg.McFlushInterval, } } diff --git a/lnrpc/routerrpc/routing_config.go b/lnrpc/routerrpc/routing_config.go index dd0fe93d1..2c93cb28d 100644 --- a/lnrpc/routerrpc/routing_config.go +++ b/lnrpc/routerrpc/routing_config.go @@ -43,4 +43,8 @@ type RoutingConfig struct { // MaxMcHistory defines the maximum number of payment results that // are held on disk by mission control. MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"` + + // McFlushInterval defines the timer interval to use to flush mission + // control state to the DB. + McFlushInterval time.Duration `long:"mcflushinterval" description:"the timer interval to use to flush mission control state to the DB"` } diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index d1dc4294e..a9fad3938 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -45,6 +45,10 @@ const ( // DefaultMaxMcHistory is the default maximum history size. DefaultMaxMcHistory = 1000 + // DefaultMcFlushInterval is the defaul inteval we use to flush MC state + // to the database. + DefaultMcFlushInterval = time.Second + // prevSuccessProbability is the assumed probability for node pairs that // successfully relayed the previous attempt. prevSuccessProbability = 0.95 @@ -119,6 +123,10 @@ type MissionControlConfig struct { // held on disk. MaxMcHistory int + // McFlushInterval defines the ticker interval when we flush the + // accumulated state to the DB. + McFlushInterval time.Duration + // MinFailureRelaxInterval is the minimum time that must have passed // since the previously recorded failure before the failure amount may // be raised. @@ -209,7 +217,9 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, return nil, err } - store, err := newMissionControlStore(db, cfg.MaxMcHistory) + store, err := newMissionControlStore( + db, cfg.MaxMcHistory, cfg.McFlushInterval, + ) if err != nil { return nil, err } @@ -234,6 +244,16 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, return mc, nil } +// RunStoreTicker runs the mission control store's ticker. +func (m *MissionControl) RunStoreTicker() { + m.store.run() +} + +// StopStoreTicker stops the mission control store's ticker. +func (m *MissionControl) StopStoreTicker() { + m.store.stop() +} + // init initializes mission control with historical data. func (m *MissionControl) init() error { log.Debugf("Mission control state reconstruction started") @@ -265,6 +285,7 @@ func (m *MissionControl) GetConfig() *MissionControlConfig { return &MissionControlConfig{ ProbabilityEstimatorCfg: m.estimator.ProbabilityEstimatorCfg, MaxMcHistory: m.store.maxRecords, + McFlushInterval: m.store.flushInterval, MinFailureRelaxInterval: m.state.minFailureRelaxInterval, } } @@ -429,9 +450,7 @@ func (m *MissionControl) processPaymentResult(result *paymentResult) ( *channeldb.FailureReason, error) { // Store complete result in database. - if err := m.store.AddResult(result); err != nil { - return nil, err - } + m.store.AddResult(result) m.Lock() defer m.Unlock() diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index 9e9974e4a..193f7d13a 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -2,8 +2,10 @@ package routing import ( "bytes" + "container/list" "encoding/binary" "fmt" + "sync" "time" "github.com/btcsuite/btcd/wire" @@ -35,13 +37,38 @@ const ( // Also changes to mission control parameters can be applied to historical data. // Finally, it enables importing raw data from an external source. type missionControlStore struct { - db kvdb.Backend + done chan struct{} + wg sync.WaitGroup + db kvdb.Backend + queueMx sync.Mutex + + // queue stores all pending payment results not yet added to the store. + queue *list.List + + // keys holds the stored MC store item keys in the order of storage. + // We use this list when adding/deleting items from the database to + // avoid cursor use which may be slow in the remote DB case. + keys *list.List + + // keysMap holds the stored MC store item keys. We use this map to check + // if a new payment result has already been stored. + keysMap map[string]struct{} + + // maxRecords is the maximum amount of records we will store in the db. maxRecords int - numRecords int + + // flushInterval is the configured interval we use to store new results + // and delete outdated ones from the db. + flushInterval time.Duration } -func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlStore, error) { - var store *missionControlStore +func newMissionControlStore(db kvdb.Backend, maxRecords int, + flushInterval time.Duration) (*missionControlStore, error) { + + var ( + keys *list.List + keysMap map[string]struct{} + ) // Create buckets if not yet existing. err := kvdb.Update(db, func(tx kvdb.RwTx) error { @@ -51,32 +78,40 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlSto err) } - // Count initial number of results and track this number in - // memory to avoid calling Stats().KeyN. The reliability of - // Stats() is doubtful and seemed to have caused crashes in the - // past (see #1874). + // Collect all keys to be able to quickly calculate the + // difference when updating the DB state. c := resultsBucket.ReadCursor() for k, _ := c.First(); k != nil; k, _ = c.Next() { - store.numRecords++ + keys.PushBack(k) + keysMap[string(k)] = struct{}{} } return nil }, func() { - store = &missionControlStore{ - db: db, - maxRecords: maxRecords, - } + keys = list.New() + keysMap = make(map[string]struct{}) }) if err != nil { return nil, err } - return store, nil + return &missionControlStore{ + done: make(chan struct{}), + db: db, + queue: list.New(), + keys: keys, + keysMap: keysMap, + maxRecords: maxRecords, + flushInterval: flushInterval, + }, nil } // clear removes all results from the db. func (b *missionControlStore) clear() error { - return kvdb.Update(b.db, func(tx kvdb.RwTx) error { + b.queueMx.Lock() + defer b.queueMx.Unlock() + + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { if err := tx.DeleteTopLevelBucket(resultsKey); err != nil { return err } @@ -84,6 +119,13 @@ func (b *missionControlStore) clear() error { _, err := tx.CreateTopLevelBucket(resultsKey) return err }, func() {}) + + if err != nil { + return err + } + + b.queue = list.New() + return nil } // fetchAll returns all results currently stored in the database. @@ -221,39 +263,117 @@ func deserializeResult(k, v []byte) (*paymentResult, error) { } // AddResult adds a new result to the db. -func (b *missionControlStore) AddResult(rp *paymentResult) error { - return kvdb.Update(b.db, func(tx kvdb.RwTx) error { - bucket := tx.ReadWriteBucket(resultsKey) +func (b *missionControlStore) AddResult(rp *paymentResult) { + b.queueMx.Lock() + defer b.queueMx.Unlock() + b.queue.PushBack(rp) +} - // Prune oldest entries. - if b.maxRecords > 0 { - for b.numRecords >= b.maxRecords { - cursor := bucket.ReadWriteCursor() - cursor.First() - if err := cursor.Delete(); err != nil { - return err +// stop stops the store ticker goroutine. +func (b *missionControlStore) stop() { + close(b.done) + b.wg.Wait() +} + +// run runs the MC store ticker goroutine. +func (b *missionControlStore) run() { + b.wg.Add(1) + + go func() { + ticker := time.NewTicker(b.flushInterval) + defer ticker.Stop() + defer b.wg.Done() + + for { + select { + case <-ticker.C: + if err := b.storeResults(); err != nil { + log.Errorf("Failed to update mission "+ + "control store: %v", err) } - b.numRecords-- + case <-b.done: + return } } + }() +} - // Serialize result into key and value byte slices. - k, v, err := serializeResult(rp) - if err != nil { - return err +// storeResults stores all accumulated results. +func (b *missionControlStore) storeResults() error { + b.queueMx.Lock() + l := b.queue + b.queue = list.New() + b.queueMx.Unlock() + + var ( + keys *list.List + keysMap map[string]struct{} + ) + + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + bucket := tx.ReadWriteBucket(resultsKey) + + for e := l.Front(); e != nil; e = e.Next() { + pr := e.Value.(*paymentResult) + // Serialize result into key and value byte slices. + k, v, err := serializeResult(pr) + if err != nil { + return err + } + + // The store is assumed to be idempotent. It could be + // that the same result is added twice and in that case + // we don't need to put the value again. + if _, ok := keysMap[string(k)]; ok { + continue + } + + // Put into results bucket. + if err := bucket.Put(k, v); err != nil { + return err + } + + keys.PushBack(k) + keysMap[string(k)] = struct{}{} } - // The store is assumed to be idempotent. It could be that the - // same result is added twice and in that case the counter - // shouldn't be increased. - if bucket.Get(k) == nil { - b.numRecords++ + // Prune oldest entries. + for { + if b.maxRecords == 0 || keys.Len() <= b.maxRecords { + break + } + + front := keys.Front() + key := front.Value.([]byte) + + if err := bucket.Delete(key); err != nil { + return err + } + + keys.Remove(front) + delete(keysMap, string(key)) } - // Put into results bucket. - return bucket.Put(k, v) - }, func() {}) + return nil + }, func() { + keys = list.New() + keys.PushBackList(b.keys) + + keysMap = make(map[string]struct{}) + for k := range b.keysMap { + keysMap[k] = struct{}{} + } + }) + + if err != nil { + return err + } + + b.keys = keys + b.keysMap = keysMap + + return nil } // getResultKey returns a byte slice representing a unique key for this payment diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go index 4653c51eb..f1e3a45f0 100644 --- a/routing/missioncontrol_store_test.go +++ b/routing/missioncontrol_store_test.go @@ -10,8 +10,8 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" ) const testMaxRecords = 2 @@ -40,7 +40,7 @@ func TestMissionControlStore(t *testing.T) { defer db.Close() defer os.Remove(dbPath) - store, err := newMissionControlStore(db, testMaxRecords) + store, err := newMissionControlStore(db, testMaxRecords, time.Second) if err != nil { t.Fatal(err) } @@ -80,27 +80,21 @@ func TestMissionControlStore(t *testing.T) { result2.id = 2 // Store result. - err = store.AddResult(&result2) - if err != nil { - t.Fatal(err) - } + store.AddResult(&result2) // Store again to test idempotency. - err = store.AddResult(&result2) - if err != nil { - t.Fatal(err) - } + store.AddResult(&result2) // Store second result which has an earlier timestamp. - err = store.AddResult(&result1) - if err != nil { - t.Fatal(err) - } + store.AddResult(&result1) + require.NoError(t, store.storeResults()) results, err = store.fetchAll() if err != nil { t.Fatal(err) } + require.Equal(t, 2, len(results)) + if len(results) != 2 { t.Fatal("expected two results") } @@ -116,7 +110,7 @@ func TestMissionControlStore(t *testing.T) { } // Recreate store to test pruning. - store, err = newMissionControlStore(db, testMaxRecords) + store, err = newMissionControlStore(db, testMaxRecords, time.Second) if err != nil { t.Fatal(err) } @@ -128,16 +122,15 @@ func TestMissionControlStore(t *testing.T) { result3.id = 3 result3.failure = &lnwire.FailMPPTimeout{} - err = store.AddResult(&result3) - if err != nil { - t.Fatal(err) - } + store.AddResult(&result3) + require.NoError(t, store.storeResults()) // Check that results are pruned. results, err = store.fetchAll() if err != nil { t.Fatal(err) } + require.Equal(t, 2, len(results)) if len(results) != 2 { t.Fatal("expected two results") } diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 6be9c4cb6..2e79ffb0f 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -9,6 +9,7 @@ import ( "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" ) var ( @@ -77,6 +78,12 @@ func createMcTestContext(t *testing.T) *mcTestContext { // restartMc creates a new instances of mission control on the same database. func (ctx *mcTestContext) restartMc() { + // Since we don't run a timer to store results in unit tests, we store + // them here before fetching back everything in NewMissionControl. + if ctx.mc != nil { + require.NoError(ctx.t, ctx.mc.store.storeResults()) + } + mc, err := NewMissionControl( ctx.db, mcTestSelf, &MissionControlConfig{ diff --git a/server.go b/server.go index bc2d75c00..2b914b90a 100644 --- a/server.go +++ b/server.go @@ -746,6 +746,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, &routing.MissionControlConfig{ ProbabilityEstimatorCfg: estimatorCfg, MaxMcHistory: routingConfig.MaxMcHistory, + McFlushInterval: routingConfig.McFlushInterval, MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, }, ) @@ -1663,6 +1664,12 @@ func (s *server) Start() error { return nil }) + s.missionControl.RunStoreTicker() + cleanup.add(func() error { + s.missionControl.StopStoreTicker() + return nil + }) + // Before we start the connMgr, we'll check to see if we have // any backups to recover. We do this now as we want to ensure // that have all the information we need to handle channel @@ -1869,6 +1876,7 @@ func (s *server) Stop() error { srvrLog.Warnf("failed to stop chanSubSwapper: %v", err) } s.chanEventStore.Stop() + s.missionControl.StopStoreTicker() // Disconnect from each active peers to ensure that // peerTerminationWatchers signal completion to each peer. From f13a348c9795e98efcf1ec0656453b1b85407275 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Mon, 19 Jul 2021 18:45:28 +0200 Subject: [PATCH 2/3] config: update sample-lnd.conf with routerrpc.mcflushinterval --- sample-lnd.conf | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sample-lnd.conf b/sample-lnd.conf index 0e6c70a4b..56fba9d67 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -1023,6 +1023,9 @@ litecoin.node=ltcd ; (default: 1000) ; routerrpc.maxmchistory=900 +; The time interval with which the MC store state is flushed to the DB. +; routerrpc.mcflushinterval=1m + ; Path to the router macaroon ; routerrpc.routermacaroonpath=~/.lnd/data/chain/bitcoin/simnet/router.macaroon From cfa3f708f188f5eba8be7f73a6dbf20625586dc4 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Mon, 19 Jul 2021 18:43:05 +0200 Subject: [PATCH 3/3] docs: update release notes --- docs/release-notes/release-notes-0.14.0.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 016e42386..3bb870c7c 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -43,6 +43,11 @@ addholdinvoice call](https://github.com/lightningnetwork/lnd/pull/5533). [Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/) to make LNDs payment throughput (and latency) with better when using etcd. +## Performance improvements + +* [Update MC store in blocks](https://github.com/lightningnetwork/lnd/pull/5515) + to make payment throughput better when using etcd. + # Contributors (Alphabetical Order) * ErikEk * Zero-1729