Merge pull request #5902 from Roasbeef/lru-chan-upd

discovery: revamp premature update map
This commit is contained in:
Olaoluwa Osuntokun 2021-11-04 16:46:28 -07:00 committed by GitHub
commit 7d6915d17c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 138 additions and 46 deletions

View File

@ -12,6 +12,8 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/neutrino/cache"
"github.com/lightninglabs/neutrino/cache/lru"
"github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
@ -38,6 +40,15 @@ const (
// determine how often we should allow a new update for a specific // determine how often we should allow a new update for a specific
// channel and direction. // channel and direction.
DefaultChannelUpdateInterval = time.Minute DefaultChannelUpdateInterval = time.Minute
// maxPrematureUpdates tracks the max amount of premature channel
// updates that we'll hold onto.
maxPrematureUpdates = 100
// maxRejectedUpdates tracks the max amount of rejected channel updates
// we'll maintain. This is the global size across all peers. We'll
// allocate ~3 MB max to the cache.
maxRejectedUpdates = 10_000
) )
var ( var (
@ -268,6 +279,46 @@ type Config struct {
ChannelUpdateInterval time.Duration ChannelUpdateInterval time.Duration
} }
// cachedNetworkMsg is a wrapper around a network message that can be used with
// *lru.Cache.
type cachedNetworkMsg struct {
msgs []*networkMsg
}
// Size returns the "size" of an entry. We return the number of items as we
// just want to limit the total amount of entires rather than do accurate size
// accounting.
func (c *cachedNetworkMsg) Size() (uint64, error) {
return uint64(len(c.msgs)), nil
}
// rejectCacheKey is the cache key that we'll use to track announcements we've
// recently rejected.
type rejectCacheKey struct {
pubkey [33]byte
chanID uint64
}
// newRejectCacheKey returns a new cache key for the reject cache.
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
k := rejectCacheKey{
chanID: cid,
pubkey: pub,
}
return k
}
// cachedReject is the empty value used to track the value for rejects.
type cachedReject struct {
}
// Size returns the "size" of an entry. We return 1 as we just want to limit
// the total size.
func (c *cachedReject) Size() (uint64, error) {
return 1, nil
}
// AuthenticatedGossiper is a subsystem which is responsible for receiving // AuthenticatedGossiper is a subsystem which is responsible for receiving
// announcements, validating them and applying the changes to router, syncing // announcements, validating them and applying the changes to router, syncing
// lightning network with newly connected nodes, broadcasting announcements // lightning network with newly connected nodes, broadcasting announcements
@ -302,8 +353,7 @@ type AuthenticatedGossiper struct {
// that wasn't associated with any channel we know about. We store // that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a // them temporarily, such that we can reprocess them when a
// ChannelAnnouncement for the channel is received. // ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg prematureChannelUpdates *lru.Cache
pChanUpdMtx sync.Mutex
// networkMsgs is a channel that carries new network broadcasted // networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the // message from outside the gossiper service to be processed by the
@ -327,8 +377,7 @@ type AuthenticatedGossiper struct {
// consistent between when the DB is first read until it's written. // consistent between when the DB is first read until it's written.
channelMtx *multimutex.Mutex channelMtx *multimutex.Mutex
rejectMtx sync.RWMutex recentRejects *lru.Cache
recentRejects map[uint64]struct{}
// syncMgr is a subsystem responsible for managing the gossip syncers // syncMgr is a subsystem responsible for managing the gossip syncers
// for peers currently connected. When a new peer is connected, the // for peers currently connected. When a new peer is connected, the
@ -368,9 +417,9 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
networkMsgs: make(chan *networkMsg), networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}), quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: make(map[uint64][]*networkMsg), prematureChannelUpdates: lru.NewCache(maxPrematureUpdates),
channelMtx: multimutex.NewMutex(), channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}), recentRejects: lru.NewCache(maxRejectedUpdates),
chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
} }
@ -1018,7 +1067,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
// If this message was recently rejected, then we won't // If this message was recently rejected, then we won't
// attempt to re-process it. // attempt to re-process it.
if d.isRecentlyRejectedMsg(announcement.msg) { if announcement.isRemote && d.isRecentlyRejectedMsg(
announcement.msg, announcement.peer.PubKey(),
) {
announcement.err <- fmt.Errorf("recently " + announcement.err <- fmt.Errorf("recently " +
"rejected") "rejected")
continue continue
@ -1184,22 +1235,23 @@ func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
// isRecentlyRejectedMsg returns true if we recently rejected a message, and // isRecentlyRejectedMsg returns true if we recently rejected a message, and
// false otherwise, This avoids expensive reprocessing of the message. // false otherwise, This avoids expensive reprocessing of the message.
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
d.rejectMtx.RLock() peerPub [33]byte) bool {
defer d.rejectMtx.RUnlock()
var scid uint64
switch m := msg.(type) { switch m := msg.(type) {
case *lnwire.ChannelUpdate: case *lnwire.ChannelUpdate:
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()] scid = m.ShortChannelID.ToUint64()
return ok
case *lnwire.ChannelAnnouncement: case *lnwire.ChannelAnnouncement:
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()] scid = m.ShortChannelID.ToUint64()
return ok
default: default:
return false return false
} }
_, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
return err != cache.ErrElementNotFound
} }
// retransmitStaleAnns examines all outgoing channels that the source node is // retransmitStaleAnns examines all outgoing channels that the source node is
@ -1622,9 +1674,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.cfg.ChainHash) d.cfg.ChainHash)
log.Errorf(err.Error()) log.Errorf(err.Error())
d.rejectMtx.Lock() key := newRejectCacheKey(
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} msg.ShortChannelID.ToUint64(),
d.rejectMtx.Unlock() nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
nMsg.err <- err nMsg.err <- err
return nil, false return nil, false
@ -1662,9 +1716,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
if err := routing.ValidateChannelAnn(msg); err != nil { if err := routing.ValidateChannelAnn(msg); err != nil {
err := fmt.Errorf("unable to validate "+ err := fmt.Errorf("unable to validate "+
"announcement: %v", err) "announcement: %v", err)
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} key := newRejectCacheKey(
d.rejectMtx.Unlock() msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
log.Error(err) log.Error(err)
nMsg.err <- err nMsg.err <- err
@ -1735,9 +1792,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// see if we get any new announcements. // see if we get any new announcements.
anns, rErr := d.processRejectedEdge(msg, proof) anns, rErr := d.processRejectedEdge(msg, proof)
if rErr != nil { if rErr != nil {
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} key := newRejectCacheKey(
d.rejectMtx.Unlock() msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
nMsg.err <- rErr nMsg.err <- rErr
return nil, false return nil, false
} }
@ -1759,9 +1820,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Tracef("Router rejected channel "+ log.Tracef("Router rejected channel "+
"edge: %v", err) "edge: %v", err)
d.rejectMtx.Lock() key := newRejectCacheKey(
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} msg.ShortChannelID.ToUint64(),
d.rejectMtx.Unlock() nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
} }
nMsg.err <- err nMsg.err <- err
@ -1774,13 +1837,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID := msg.ShortChannelID.ToUint64() shortChanID := msg.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg var channelUpdates []*networkMsg
d.pChanUpdMtx.Lock() earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID)
channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...) if err == nil {
// There was actually an entry in the map, so we'll
// Now delete the premature ChannelUpdates, since we added them // accumulate it. We don't worry about deletion, since
// all to the queue of network messages. // it'll eventually fall out anyway.
delete(d.prematureChannelUpdates, shortChanID) chanMsgs := earlyChanUpdates.(*cachedNetworkMsg)
d.pChanUpdMtx.Unlock() channelUpdates = append(channelUpdates, chanMsgs.msgs...)
}
// Launch a new goroutine to handle each ChannelUpdate, this to // Launch a new goroutine to handle each ChannelUpdate, this to
// ensure we don't block here, as we can handle only one // ensure we don't block here, as we can handle only one
@ -1843,9 +1907,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.cfg.ChainHash) d.cfg.ChainHash)
log.Errorf(err.Error()) log.Errorf(err.Error())
d.rejectMtx.Lock() key := newRejectCacheKey(
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} msg.ShortChannelID.ToUint64(),
d.rejectMtx.Unlock() nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
nMsg.err <- err nMsg.err <- err
return nil, false return nil, false
@ -1929,11 +1995,28 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// of this, we temporarily add it to a map, and // of this, we temporarily add it to a map, and
// reprocess it after our own ChannelAnnouncement has // reprocess it after our own ChannelAnnouncement has
// been processed. // been processed.
d.pChanUpdMtx.Lock() earlyMsgs, err := d.prematureChannelUpdates.Get(
d.prematureChannelUpdates[shortChanID] = append( shortChanID,
d.prematureChannelUpdates[shortChanID], nMsg,
) )
d.pChanUpdMtx.Unlock() switch {
// Nothing in the cache yet, we can just directly
// insert this element.
case err == cache.ErrElementNotFound:
_, _ = d.prematureChannelUpdates.Put(
shortChanID, &cachedNetworkMsg{
msgs: []*networkMsg{nMsg},
})
// There's already something in the cache, so we'll
// combine the set of messages into a single value.
default:
msgs := earlyMsgs.(*cachedNetworkMsg).msgs
msgs = append(msgs, nMsg)
_, _ = d.prematureChannelUpdates.Put(
shortChanID, &cachedNetworkMsg{
msgs: msgs,
})
}
log.Debugf("Got ChannelUpdate for edge not found in "+ log.Debugf("Got ChannelUpdate for edge not found in "+
"graph(shortChanID=%v), saving for "+ "graph(shortChanID=%v), saving for "+
@ -1950,9 +2033,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Error(err) log.Error(err)
nMsg.err <- err nMsg.err <- err
d.rejectMtx.Lock() key := newRejectCacheKey(
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} msg.ShortChannelID.ToUint64(),
d.rejectMtx.Unlock() nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
return nil, false return nil, false
} }
@ -2055,9 +2141,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
routing.ErrIgnored) { routing.ErrIgnored) {
log.Debug(err) log.Debug(err)
} else if err != routing.ErrVBarrierShuttingDown { } else if err != routing.ErrVBarrierShuttingDown {
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} key := newRejectCacheKey(
d.rejectMtx.Unlock() msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
log.Error(err) log.Error(err)
} }

View File

@ -370,6 +370,8 @@ messages directly. There is no routing/path finding involved.
* Using `go get` to install go executables is now deprecated. Migrate to `go install` our lnrpc proto dockerfile [Migrate `go get` to `go install`](https://github.com/lightningnetwork/lnd/pull/5879) * Using `go get` to install go executables is now deprecated. Migrate to `go install` our lnrpc proto dockerfile [Migrate `go get` to `go install`](https://github.com/lightningnetwork/lnd/pull/5879)
* [The premature update map has been revamped using an LRU cache](https://github.com/lightningnetwork/lnd/pull/5902)
## Code Health ## Code Health
### Code cleanup, refactor, typo fixes ### Code cleanup, refactor, typo fixes