diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 075982af8..89b72ca32 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -12,6 +12,8 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" + "github.com/lightninglabs/neutrino/cache" + "github.com/lightninglabs/neutrino/cache/lru" "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -38,6 +40,15 @@ const ( // determine how often we should allow a new update for a specific // channel and direction. DefaultChannelUpdateInterval = time.Minute + + // maxPrematureUpdates tracks the max amount of premature channel + // updates that we'll hold onto. + maxPrematureUpdates = 100 + + // maxRejectedUpdates tracks the max amount of rejected channel updates + // we'll maintain. This is the global size across all peers. We'll + // allocate ~3 MB max to the cache. + maxRejectedUpdates = 10_000 ) var ( @@ -268,6 +279,46 @@ type Config struct { ChannelUpdateInterval time.Duration } +// cachedNetworkMsg is a wrapper around a network message that can be used with +// *lru.Cache. +type cachedNetworkMsg struct { + msgs []*networkMsg +} + +// Size returns the "size" of an entry. We return the number of items as we +// just want to limit the total amount of entires rather than do accurate size +// accounting. +func (c *cachedNetworkMsg) Size() (uint64, error) { + return uint64(len(c.msgs)), nil +} + +// rejectCacheKey is the cache key that we'll use to track announcements we've +// recently rejected. +type rejectCacheKey struct { + pubkey [33]byte + chanID uint64 +} + +// newRejectCacheKey returns a new cache key for the reject cache. +func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey { + k := rejectCacheKey{ + chanID: cid, + pubkey: pub, + } + + return k +} + +// cachedReject is the empty value used to track the value for rejects. +type cachedReject struct { +} + +// Size returns the "size" of an entry. We return 1 as we just want to limit +// the total size. +func (c *cachedReject) Size() (uint64, error) { + return 1, nil +} + // AuthenticatedGossiper is a subsystem which is responsible for receiving // announcements, validating them and applying the changes to router, syncing // lightning network with newly connected nodes, broadcasting announcements @@ -302,8 +353,7 @@ type AuthenticatedGossiper struct { // that wasn't associated with any channel we know about. We store // them temporarily, such that we can reprocess them when a // ChannelAnnouncement for the channel is received. - prematureChannelUpdates map[uint64][]*networkMsg - pChanUpdMtx sync.Mutex + prematureChannelUpdates *lru.Cache // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the @@ -327,8 +377,7 @@ type AuthenticatedGossiper struct { // consistent between when the DB is first read until it's written. channelMtx *multimutex.Mutex - rejectMtx sync.RWMutex - recentRejects map[uint64]struct{} + recentRejects *lru.Cache // syncMgr is a subsystem responsible for managing the gossip syncers // for peers currently connected. When a new peer is connected, the @@ -368,9 +417,9 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper networkMsgs: make(chan *networkMsg), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), - prematureChannelUpdates: make(map[uint64][]*networkMsg), + prematureChannelUpdates: lru.NewCache(maxPrematureUpdates), channelMtx: multimutex.NewMutex(), - recentRejects: make(map[uint64]struct{}), + recentRejects: lru.NewCache(maxRejectedUpdates), chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), } @@ -1018,7 +1067,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // If this message was recently rejected, then we won't // attempt to re-process it. - if d.isRecentlyRejectedMsg(announcement.msg) { + if announcement.isRemote && d.isRecentlyRejectedMsg( + announcement.msg, announcement.peer.PubKey(), + ) { announcement.err <- fmt.Errorf("recently " + "rejected") continue @@ -1184,22 +1235,23 @@ func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) { // isRecentlyRejectedMsg returns true if we recently rejected a message, and // false otherwise, This avoids expensive reprocessing of the message. -func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { - d.rejectMtx.RLock() - defer d.rejectMtx.RUnlock() +func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message, + peerPub [33]byte) bool { + var scid uint64 switch m := msg.(type) { case *lnwire.ChannelUpdate: - _, ok := d.recentRejects[m.ShortChannelID.ToUint64()] - return ok + scid = m.ShortChannelID.ToUint64() case *lnwire.ChannelAnnouncement: - _, ok := d.recentRejects[m.ShortChannelID.ToUint64()] - return ok + scid = m.ShortChannelID.ToUint64() default: return false } + + _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub)) + return err != cache.ErrElementNotFound } // retransmitStaleAnns examines all outgoing channels that the source node is @@ -1622,9 +1674,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.cfg.ChainHash) log.Errorf(err.Error()) - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() + key := newRejectCacheKey( + msg.ShortChannelID.ToUint64(), + nMsg.peer.PubKey(), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) nMsg.err <- err return nil, false @@ -1662,9 +1716,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( if err := routing.ValidateChannelAnn(msg); err != nil { err := fmt.Errorf("unable to validate "+ "announcement: %v", err) - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() + + key := newRejectCacheKey( + msg.ShortChannelID.ToUint64(), + nMsg.peer.PubKey(), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) log.Error(err) nMsg.err <- err @@ -1735,9 +1792,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // see if we get any new announcements. anns, rErr := d.processRejectedEdge(msg, proof) if rErr != nil { - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() + + key := newRejectCacheKey( + msg.ShortChannelID.ToUint64(), + nMsg.peer.PubKey(), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + nMsg.err <- rErr return nil, false } @@ -1759,9 +1820,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Tracef("Router rejected channel "+ "edge: %v", err) - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() + key := newRejectCacheKey( + msg.ShortChannelID.ToUint64(), + nMsg.peer.PubKey(), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) } nMsg.err <- err @@ -1774,13 +1837,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( shortChanID := msg.ShortChannelID.ToUint64() var channelUpdates []*networkMsg - d.pChanUpdMtx.Lock() - channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...) - - // Now delete the premature ChannelUpdates, since we added them - // all to the queue of network messages. - delete(d.prematureChannelUpdates, shortChanID) - d.pChanUpdMtx.Unlock() + earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) + if err == nil { + // There was actually an entry in the map, so we'll + // accumulate it. We don't worry about deletion, since + // it'll eventually fall out anyway. + chanMsgs := earlyChanUpdates.(*cachedNetworkMsg) + channelUpdates = append(channelUpdates, chanMsgs.msgs...) + } // Launch a new goroutine to handle each ChannelUpdate, this to // ensure we don't block here, as we can handle only one @@ -1843,9 +1907,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( d.cfg.ChainHash) log.Errorf(err.Error()) - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() + key := newRejectCacheKey( + msg.ShortChannelID.ToUint64(), + nMsg.peer.PubKey(), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) nMsg.err <- err return nil, false @@ -1929,11 +1995,28 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // of this, we temporarily add it to a map, and // reprocess it after our own ChannelAnnouncement has // been processed. - d.pChanUpdMtx.Lock() - d.prematureChannelUpdates[shortChanID] = append( - d.prematureChannelUpdates[shortChanID], nMsg, + earlyMsgs, err := d.prematureChannelUpdates.Get( + shortChanID, ) - d.pChanUpdMtx.Unlock() + switch { + // Nothing in the cache yet, we can just directly + // insert this element. + case err == cache.ErrElementNotFound: + _, _ = d.prematureChannelUpdates.Put( + shortChanID, &cachedNetworkMsg{ + msgs: []*networkMsg{nMsg}, + }) + + // There's already something in the cache, so we'll + // combine the set of messages into a single value. + default: + msgs := earlyMsgs.(*cachedNetworkMsg).msgs + msgs = append(msgs, nMsg) + _, _ = d.prematureChannelUpdates.Put( + shortChanID, &cachedNetworkMsg{ + msgs: msgs, + }) + } log.Debugf("Got ChannelUpdate for edge not found in "+ "graph(shortChanID=%v), saving for "+ @@ -1950,9 +2033,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( log.Error(err) nMsg.err <- err - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() + key := newRejectCacheKey( + msg.ShortChannelID.ToUint64(), + nMsg.peer.PubKey(), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + return nil, false } @@ -2055,9 +2141,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( routing.ErrIgnored) { log.Debug(err) } else if err != routing.ErrVBarrierShuttingDown { - d.rejectMtx.Lock() - d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} - d.rejectMtx.Unlock() + + key := newRejectCacheKey( + msg.ShortChannelID.ToUint64(), + nMsg.peer.PubKey(), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + log.Error(err) } diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 755ce07c5..5ace7ed2a 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -370,6 +370,8 @@ messages directly. There is no routing/path finding involved. * Using `go get` to install go executables is now deprecated. Migrate to `go install` our lnrpc proto dockerfile [Migrate `go get` to `go install`](https://github.com/lightningnetwork/lnd/pull/5879) +* [The premature update map has been revamped using an LRU cache](https://github.com/lightningnetwork/lnd/pull/5902) + ## Code Health ### Code cleanup, refactor, typo fixes