From a6abb3c960d627d231ab70c9dea8d7caacdc3b78 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Feb 2023 04:23:41 +0800 Subject: [PATCH 1/4] discovery: increase allowed max future message size This commit adds a new const to increase the max future messages allowed from 100 to 1000, which is needed as during IBD a node with lots of channels might receive many future messages. --- discovery/gossiper.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 96f6b0755..f77390387 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -47,6 +47,10 @@ const ( // updates that we'll hold onto. maxPrematureUpdates = 100 + // maxFutureMessages tracks the max amount of future messages that + // we'll hold onto. + maxFutureMessages = 1000 + // DefaultSubBatchDelay is the default delay we'll use when // broadcasting the next announcement batch. DefaultSubBatchDelay = 5 * time.Second @@ -483,7 +487,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper cfg: &cfg, networkMsgs: make(chan *networkMsg), futureMsgs: lru.NewCache[uint32, *cachedNetworkMsg]( - maxPrematureUpdates, + maxFutureMessages, ), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), From 52facd3e5a3f457c75a296d1f48329469931773d Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Feb 2023 04:28:40 +0800 Subject: [PATCH 2/4] mod: bump neutrino lru cache version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 353aa55e7..5569832ee 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/jrick/logrotate v1.0.0 github.com/kkdai/bstream v1.0.0 github.com/lightninglabs/neutrino v0.15.0 - github.com/lightninglabs/neutrino/cache v1.1.0 + github.com/lightninglabs/neutrino/cache v1.1.1 github.com/lightninglabs/protobuf-hex-display v1.4.3-hex-display github.com/lightningnetwork/lightning-onion v1.2.1-0.20221202012345-ca23184850a1 github.com/lightningnetwork/lnd/cert v1.2.1 diff --git a/go.sum b/go.sum index e4700756a..76d2ebd0c 100644 --- a/go.sum +++ b/go.sum @@ -393,8 +393,8 @@ github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= github.com/lightninglabs/neutrino v0.15.0 h1:yr3uz36fLAq8hyM0TRUVlef1TRNoWAqpmmNlVtKUDtI= github.com/lightninglabs/neutrino v0.15.0/go.mod h1:pmjwElN/091TErtSE9Vd5W4hpxoG2/+xlb+HoPm9Gug= -github.com/lightninglabs/neutrino/cache v1.1.0 h1:szZIhVabiQIsGzJjhvo76sj05Au+zVotj2M34EquGME= -github.com/lightninglabs/neutrino/cache v1.1.0/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo= +github.com/lightninglabs/neutrino/cache v1.1.1 h1:TllWOSlkABhpgbWJfzsrdUaDH2fBy/54VSIB4vVqV8M= +github.com/lightninglabs/neutrino/cache v1.1.1/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo= github.com/lightninglabs/protobuf-hex-display v1.4.3-hex-display h1:RZJ8H4ueU/aQ9pFtx5wqsuD3B/DezrewJeVwDKKYY8E= github.com/lightninglabs/protobuf-hex-display v1.4.3-hex-display/go.mod h1:2oKOBU042GKFHrdbgGiKax4xVrFiZu51lhacUZQ9MnE= github.com/lightningnetwork/lightning-onion v1.2.1-0.20221202012345-ca23184850a1 h1:Wm0g70gkcAu2pGpNZwfWPSVOY21j8IyYsNewwK4OkT4= From 78a983c014172a3f3f2dd4f2477b117592a71110 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 24 Feb 2023 04:29:35 +0800 Subject: [PATCH 3/4] discovery: flatten future message cache This commit removes the slice used when saving future messages into the cache. Instead, each message is now saved independently into the cache with a monotonically increasing integer as its ID. --- discovery/gossiper.go | 101 +++++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 40 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f77390387..be7be8516 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/btcsuite/btcd/btcec/v2" @@ -428,8 +429,9 @@ type AuthenticatedGossiper struct { // height specified in the future. We will save them and resend it to // the chan networkMsgs once the block height has reached. The cached // map format is, - // {blockHeight: [msg1, msg2, ...], ...} - futureMsgs *lru.Cache[uint32, *cachedNetworkMsg] + // {msgID1: msg1, msgID2: msg2, ...} + futureMsgs *lru.Cache[uint64, *cachedFutureMsg] + futureMsgID atomic.Uint64 // chanPolicyUpdates is a channel that requests to update the // forwarding policy of a set of channels is sent over. @@ -486,7 +488,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper selfKeyLoc: selfKeyDesc.KeyLocator, cfg: &cfg, networkMsgs: make(chan *networkMsg), - futureMsgs: lru.NewCache[uint32, *cachedNetworkMsg]( + futureMsgs: lru.NewCache[uint64, *cachedFutureMsg]( maxFutureMessages, ), quit: make(chan struct{}), @@ -636,33 +638,64 @@ func (d *AuthenticatedGossiper) syncBlockHeight() { } } +// cachedFutureMsg is a future message that's saved to the `futureMsgs` cache. +type cachedFutureMsg struct { + // msg is the network message. + msg *networkMsg + + // height is the block height. + height uint32 +} + +// Size returns the size of the message. +func (c *cachedFutureMsg) Size() (uint64, error) { + // Return a constant 1. + return 1, nil +} + // resendFutureMessages takes a block height, resends all the future messages -// found at that height and deletes those messages found in the gossiper's -// futureMsgs. +// found below and equal to that height and deletes those messages found in the +// gossiper's futureMsgs. func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) { - result, err := d.futureMsgs.Get(height) + var ( + // msgs are the target messages. + msgs []*networkMsg + + // keys are the target messages' caching keys. + keys []uint64 + ) + + // filterMsgs is the visitor used when iterating the future cache. + filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool { + if cmsg.height <= height { + msgs = append(msgs, cmsg.msg) + keys = append(keys, k) + } + + return true + } + + // Filter out the target messages. + d.futureMsgs.Range(filterMsgs) // Return early if no messages found. - if err == cache.ErrElementNotFound { + if len(msgs) == 0 { return } - // The error must nil, we will log an error and exit. - if err != nil { - log.Errorf("Reading future messages got error: %v", err) - return + // Remove the filtered messages. + for _, key := range keys { + d.futureMsgs.Delete(key) } - msgs := result.msgs - log.Debugf("Resending %d network messages at height %d", len(msgs), height) - for _, pMsg := range msgs { + for _, msg := range msgs { select { - case d.networkMsgs <- pMsg.msg: + case d.networkMsgs <- msg: case <-d.quit: - pMsg.msg.err <- ErrGossiperShuttingDown + msg.err <- ErrGossiperShuttingDown } } } @@ -1879,23 +1912,11 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, return false } - // Add the premature message to our future messages which will - // be resent once the block height has reached. + // Add the premature message to our future messages which will be + // resent once the block height has reached. // - // Init an empty cached message and overwrite it if there are cached - // messages found. - cachedMsgs := &cachedNetworkMsg{ - msgs: make([]*processedNetworkMsg, 0), - } - - result, err := d.futureMsgs.Get(msgHeight) - // No error returned means we have old messages cached. - if err == nil { - cachedMsgs = result - } - - // Copy the networkMsgs since the old message's err chan will - // be consumed. + // Copy the networkMsgs since the old message's err chan will be + // consumed. copied := &networkMsg{ peer: msg.peer, source: msg.source, @@ -1905,15 +1926,15 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, err: make(chan error, 1), } - // The processed boolean is unused in the futureMsgs case. - pMsg := &processedNetworkMsg{msg: copied} + // Create the cached message. + cachedMsg := &cachedFutureMsg{ + msg: copied, + height: msgHeight, + } - // Add the network message. - msgs := cachedMsgs.msgs - msgs = append(msgs, pMsg) - _, err = d.futureMsgs.Put(msgHeight, &cachedNetworkMsg{ - msgs: msgs, - }) + // Increment the msg ID and add it to the cache. + nextMsgID := d.futureMsgID.Add(1) + _, err := d.futureMsgs.Put(nextMsgID, cachedMsg) if err != nil { log.Errorf("Adding future message got error: %v", err) } From 9bc7f0fb21a44b1762ecf08bf44293226d505955 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 3 Mar 2023 15:20:25 +0800 Subject: [PATCH 4/4] discovery: make `futureMsgs` into a struct and test This commit adds a simple struct `futureMsgCache` that embeds a lru cache with the message ID. A unit test is added to check the eviction behaves as expected. --- discovery/gossiper.go | 44 ++++++++++++++++++++++++++++---------- discovery/gossiper_test.go | 36 +++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index be7be8516..5eac87d91 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -430,8 +430,7 @@ type AuthenticatedGossiper struct { // the chan networkMsgs once the block height has reached. The cached // map format is, // {msgID1: msg1, msgID2: msg2, ...} - futureMsgs *lru.Cache[uint64, *cachedFutureMsg] - futureMsgID atomic.Uint64 + futureMsgs *futureMsgCache // chanPolicyUpdates is a channel that requests to update the // forwarding policy of a set of channels is sent over. @@ -484,13 +483,11 @@ type AuthenticatedGossiper struct { // passed configuration parameters. func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper { gossiper := &AuthenticatedGossiper{ - selfKey: selfKeyDesc.PubKey, - selfKeyLoc: selfKeyDesc.KeyLocator, - cfg: &cfg, - networkMsgs: make(chan *networkMsg), - futureMsgs: lru.NewCache[uint64, *cachedFutureMsg]( - maxFutureMessages, - ), + selfKey: selfKeyDesc.PubKey, + selfKeyLoc: selfKeyDesc.KeyLocator, + cfg: &cfg, + networkMsgs: make(chan *networkMsg), + futureMsgs: newFutureMsgCache(maxFutureMessages), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: lll @@ -638,7 +635,32 @@ func (d *AuthenticatedGossiper) syncBlockHeight() { } } -// cachedFutureMsg is a future message that's saved to the `futureMsgs` cache. +// futureMsgCache embeds a `lru.Cache` with a message counter that's served as +// the unique ID when saving the message. +type futureMsgCache struct { + *lru.Cache[uint64, *cachedFutureMsg] + + // msgID is a monotonically increased integer. + msgID atomic.Uint64 +} + +// nextMsgID returns a unique message ID. +func (f *futureMsgCache) nextMsgID() uint64 { + return f.msgID.Add(1) +} + +// newFutureMsgCache creates a new future message cache with the underlying lru +// cache being initialized with the specified capacity. +func newFutureMsgCache(capacity uint64) *futureMsgCache { + // Create a new cache. + cache := lru.NewCache[uint64, *cachedFutureMsg](capacity) + + return &futureMsgCache{ + Cache: cache, + } +} + +// cachedFutureMsg is a future message that's saved to the `futureMsgCache`. type cachedFutureMsg struct { // msg is the network message. msg *networkMsg @@ -1933,7 +1955,7 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, } // Increment the msg ID and add it to the cache. - nextMsgID := d.futureMsgID.Add(1) + nextMsgID := d.futureMsgs.nextMsgID() _, err := d.futureMsgs.Put(nextMsgID, cachedMsg) if err != nil { log.Errorf("Adding future message got error: %v", err) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 37823469c..cc2fdef05 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" + "github.com/lightninglabs/neutrino/cache" "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -4098,3 +4099,38 @@ func TestRejectCacheChannelAnn(t *testing.T) { t.Fatal("did not process remote announcement") } } + +// TestFutureMsgCacheEviction checks that when the cache's capacity is reached, +// saving one more item will evict the oldest item. +func TestFutureMsgCacheEviction(t *testing.T) { + t.Parallel() + + // Create a future message cache with size 1. + c := newFutureMsgCache(1) + + // Send two messages to the cache, which ends in the first message + // being evicted. + // + // Put the first item. + id := c.nextMsgID() + evicted, err := c.Put(id, &cachedFutureMsg{height: uint32(id)}) + require.NoError(t, err) + require.False(t, evicted, "should not be evicted") + + // Put the second item. + id = c.nextMsgID() + evicted, err = c.Put(id, &cachedFutureMsg{height: uint32(id)}) + require.NoError(t, err) + require.True(t, evicted, "should be evicted") + + // The first item should have been evicted. + // + // NOTE: msg ID starts at 1, not 0. + _, err = c.Get(1) + require.ErrorIs(t, err, cache.ErrElementNotFound) + + // The second item should be found. + item, err := c.Get(2) + require.NoError(t, err) + require.EqualValues(t, 2, item.height, "should be the second item") +}