From 8627b5d128f79d720a0359884651e0c4ed41040b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 28 Oct 2021 17:16:12 -0700 Subject: [PATCH] discovery: revamp premature update map Turns out we need it right now to handle some low latency race conditions in our integration tests, so we'll opt to simply cap the size of it to a low amount. We use a basic LRU caching mechainsm. Fixes https://github.com/lightningnetwork/lnd/issues/5076 --- discovery/gossiper.go | 62 +++++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 075982af8..4b98ec9df 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -12,6 +12,8 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" + "github.com/lightninglabs/neutrino/cache" + "github.com/lightninglabs/neutrino/cache/lru" "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -38,6 +40,10 @@ const ( // determine how often we should allow a new update for a specific // channel and direction. DefaultChannelUpdateInterval = time.Minute + + // maxPrematureUpdates tracks the max amount of premature channel + // updates that we'll hold onto. + maxPrematureUpdates = 100 ) var ( @@ -268,6 +274,19 @@ type Config struct { ChannelUpdateInterval time.Duration } +// cachedNetworkMsg is a wrapper around a network message that can be used with +// *lru.Cache. +type cachedNetworkMsg struct { + msgs []*networkMsg +} + +// Size returns the "size" of an entry. We return the number of items as we +// just want to limit the total amount of entires rather than do accurate size +// accounting. +func (c *cachedNetworkMsg) Size() (uint64, error) { + return uint64(len(c.msgs)), nil +} + // AuthenticatedGossiper is a subsystem which is responsible for receiving // announcements, validating them and applying the changes to router, syncing // lightning network with newly connected nodes, broadcasting announcements @@ -302,8 +321,7 @@ type AuthenticatedGossiper struct { // that wasn't associated with any channel we know about. We store // them temporarily, such that we can reprocess them when a // ChannelAnnouncement for the channel is received. - prematureChannelUpdates map[uint64][]*networkMsg - pChanUpdMtx sync.Mutex + prematureChannelUpdates *lru.Cache // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the @@ -368,7 +386,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper networkMsgs: make(chan *networkMsg), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), - prematureChannelUpdates: make(map[uint64][]*networkMsg), + prematureChannelUpdates: lru.NewCache(maxPrematureUpdates), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), @@ -1774,13 +1792,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID := msg.ShortChannelID.ToUint64() var channelUpdates []*networkMsg - d.pChanUpdMtx.Lock() - channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...) - - // Now delete the premature ChannelUpdates, since we added them - // all to the queue of network messages. - delete(d.prematureChannelUpdates, shortChanID) - d.pChanUpdMtx.Unlock() + earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) + if err == nil { + // There was actually an entry in the map, so we'll + // accumulate it. We don't worry about deletion, since + // it'll eventually fall out anyway. + chanMsgs := earlyChanUpdates.(*cachedNetworkMsg) + channelUpdates = append(channelUpdates, chanMsgs.msgs...) + } // Launch a new goroutine to handle each ChannelUpdate, this to // ensure we don't block here, as we can handle only one @@ -1929,11 +1948,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // of this, we temporarily add it to a map, and // reprocess it after our own ChannelAnnouncement has // been processed. - d.pChanUpdMtx.Lock() - d.prematureChannelUpdates[shortChanID] = append( - d.prematureChannelUpdates[shortChanID], nMsg, + earlyMsgs, err := d.prematureChannelUpdates.Get( + shortChanID, ) - d.pChanUpdMtx.Unlock() + switch { + // Nothing in the cache yeyt, we can just directly + // insert this element. + case err == cache.ErrElementNotFound: + d.prematureChannelUpdates.Put(shortChanID, &cachedNetworkMsg{ + msgs: []*networkMsg{nMsg}, + }) + + // There's already something in the cache, so we'll + // combine the set of messagesa into a single value. + default: + msgs := earlyMsgs.(*cachedNetworkMsg).msgs + msgs = append(msgs, nMsg) + d.prematureChannelUpdates.Put(shortChanID, &cachedNetworkMsg{ + msgs: msgs, + }) + } log.Debugf("Got ChannelUpdate for edge not found in "+ "graph(shortChanID=%v), saving for "+