discovery: use token bucket based rate limiting to throttle gossip

The recently added gossip throttling was shown to be too aggressive,
especially with our auto channel enable/disable signaling. We switch to
a token bucket based system instead as it's based on time, rather than a
block height which isn't constantly updated at a given rate.
This commit is contained in:
Wilmer Paulino
2021-02-10 15:35:37 -08:00
parent 1f1f7be770
commit 83a0d03c0b
2 changed files with 92 additions and 46 deletions

View File

@@ -22,6 +22,19 @@ import (
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"golang.org/x/time/rate"
)
const (
// DefaultMaxChannelUpdateBurst is the default maximum number of updates
// for a specific channel and direction that we'll accept over an
// interval.
DefaultMaxChannelUpdateBurst = 10
// DefaultChannelUpdateInterval is the default interval we'll use to
// determine how often we should allow a new update for a specific
// channel and direction.
DefaultChannelUpdateInterval = time.Minute
)
var (
@@ -237,6 +250,15 @@ type Config struct {
// ActiveSync upon connection. These peers will never transition to
// PassiveSync.
PinnedSyncers PinnedSyncers
// MaxChannelUpdateBurst specifies the maximum number of updates for a
// specific channel and direction that we'll accept over an interval.
MaxChannelUpdateBurst int
// ChannelUpdateInterval specifies the interval we'll use to determine
// how often we should allow a new update for a specific channel and
// direction.
ChannelUpdateInterval time.Duration
}
// AuthenticatedGossiper is a subsystem which is responsible for receiving
@@ -313,12 +335,14 @@ type AuthenticatedGossiper struct {
// network.
reliableSender *reliableSender
// heightForLastChanUpdate keeps track of the height at which we
// processed the latest channel update for a specific direction.
// chanUpdateRateLimiter contains rate limiters for each direction of
// a channel update we've processed. We'll use these to determine
// whether we should accept a new update for a specific channel and
// direction.
//
// NOTE: This map must be synchronized with the main
// AuthenticatedGossiper lock.
heightForLastChanUpdate map[uint64][2]uint32
chanUpdateRateLimiter map[uint64][2]*rate.Limiter
sync.Mutex
}
@@ -335,7 +359,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
heightForLastChanUpdate: make(map[uint64][2]uint32),
chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
}
gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
@@ -1950,21 +1974,32 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}
} else {
// If it's not, we'll only allow a single update
// for this channel per block.
// If it's not, we'll allow an update per minute
// with a maximum burst of 10. If we haven't
// seen an update for this channel before, we'll
// need to initialize a rate limiter for each
// direction.
d.Lock()
lastUpdateHeight := d.heightForLastChanUpdate[shortChanID]
if lastUpdateHeight[direction] == d.bestHeight {
log.Debugf("Ignoring update for "+
"channel %v due to previous "+
"update occurring within the "+
"same block %v", shortChanID,
d.bestHeight)
d.Unlock()
rateLimiters, ok := d.chanUpdateRateLimiter[shortChanID]
if !ok {
r := rate.Every(d.cfg.ChannelUpdateInterval)
b := d.cfg.MaxChannelUpdateBurst
rateLimiters = [2]*rate.Limiter{
rate.NewLimiter(r, b),
rate.NewLimiter(r, b),
}
d.chanUpdateRateLimiter[shortChanID] = rateLimiters
}
d.Unlock()
if !rateLimiters[direction].Allow() {
log.Debugf("Rate limiting update for "+
"channel %v from direction %x",
shortChanID,
pubKey.SerializeCompressed())
nMsg.err <- nil
return nil
}
d.Unlock()
}
}
@@ -2011,15 +2046,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}
// With the edge successfully updated on disk, we'll note the
// current height so that we're able to rate limit any future
// updates for the same channel.
d.Lock()
lastUpdateHeight := d.heightForLastChanUpdate[shortChanID]
lastUpdateHeight[direction] = d.bestHeight
d.heightForLastChanUpdate[shortChanID] = lastUpdateHeight
d.Unlock()
// If this is a local ChannelUpdate without an AuthProof, it
// means it is an update to a channel that is not (yet)
// supposed to be announced to the greater network. However,