diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 075982af8..4b98ec9df 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -12,6 +12,8 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" + "github.com/lightninglabs/neutrino/cache" + "github.com/lightninglabs/neutrino/cache/lru" "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -38,6 +40,10 @@ const ( // determine how often we should allow a new update for a specific // channel and direction. DefaultChannelUpdateInterval = time.Minute + + // maxPrematureUpdates tracks the max amount of premature channel + // updates that we'll hold onto. + maxPrematureUpdates = 100 ) var ( @@ -268,6 +274,19 @@ type Config struct { ChannelUpdateInterval time.Duration } +// cachedNetworkMsg is a wrapper around a network message that can be used with +// *lru.Cache. +type cachedNetworkMsg struct { + msgs []*networkMsg +} + +// Size returns the "size" of an entry. We return the number of items as we +// just want to limit the total amount of entires rather than do accurate size +// accounting. +func (c *cachedNetworkMsg) Size() (uint64, error) { + return uint64(len(c.msgs)), nil +} + // AuthenticatedGossiper is a subsystem which is responsible for receiving // announcements, validating them and applying the changes to router, syncing // lightning network with newly connected nodes, broadcasting announcements @@ -302,8 +321,7 @@ type AuthenticatedGossiper struct { // that wasn't associated with any channel we know about. We store // them temporarily, such that we can reprocess them when a // ChannelAnnouncement for the channel is received. - prematureChannelUpdates map[uint64][]*networkMsg - pChanUpdMtx sync.Mutex + prematureChannelUpdates *lru.Cache // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the @@ -368,7 +386,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper networkMsgs: make(chan *networkMsg), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), - prematureChannelUpdates: make(map[uint64][]*networkMsg), + prematureChannelUpdates: lru.NewCache(maxPrematureUpdates), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), @@ -1774,13 +1792,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID := msg.ShortChannelID.ToUint64() var channelUpdates []*networkMsg - d.pChanUpdMtx.Lock() - channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...) - - // Now delete the premature ChannelUpdates, since we added them - // all to the queue of network messages. - delete(d.prematureChannelUpdates, shortChanID) - d.pChanUpdMtx.Unlock() + earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) + if err == nil { + // There was actually an entry in the map, so we'll + // accumulate it. We don't worry about deletion, since + // it'll eventually fall out anyway. + chanMsgs := earlyChanUpdates.(*cachedNetworkMsg) + channelUpdates = append(channelUpdates, chanMsgs.msgs...) + } // Launch a new goroutine to handle each ChannelUpdate, this to // ensure we don't block here, as we can handle only one @@ -1929,11 +1948,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // of this, we temporarily add it to a map, and // reprocess it after our own ChannelAnnouncement has // been processed. - d.pChanUpdMtx.Lock() - d.prematureChannelUpdates[shortChanID] = append( - d.prematureChannelUpdates[shortChanID], nMsg, + earlyMsgs, err := d.prematureChannelUpdates.Get( + shortChanID, ) - d.pChanUpdMtx.Unlock() + switch { + // Nothing in the cache yeyt, we can just directly + // insert this element. + case err == cache.ErrElementNotFound: + d.prematureChannelUpdates.Put(shortChanID, &cachedNetworkMsg{ + msgs: []*networkMsg{nMsg}, + }) + + // There's already something in the cache, so we'll + // combine the set of messagesa into a single value. + default: + msgs := earlyMsgs.(*cachedNetworkMsg).msgs + msgs = append(msgs, nMsg) + d.prematureChannelUpdates.Put(shortChanID, &cachedNetworkMsg{ + msgs: msgs, + }) + } log.Debugf("Got ChannelUpdate for edge not found in "+ "graph(shortChanID=%v), saving for "+