discovery: introduce rate limiter to GossipSyncer

This commit is contained in:
yyforyongyu
2025-07-22 19:16:08 +08:00
parent 19bc941cbd
commit 6826703c77
2 changed files with 29 additions and 2 deletions

View File

@@ -40,6 +40,11 @@ const (
// they'll be refilled at this rate. // they'll be refilled at this rate.
DefaultMsgBytesPerSecond = 1000 * 1_024 DefaultMsgBytesPerSecond = 1000 * 1_024
// DefaultPeerMsgBytesPerSecond is the max bytes/s we'll permit for
// outgoing messages for a single peer. Once tokens (bytes) have been
// taken from the bucket, they'll be refilled at this rate.
DefaultPeerMsgBytesPerSecond = 50 * 1_024
// assumedMsgSize is the assumed size of a message if we can't compute // assumedMsgSize is the assumed size of a message if we can't compute
// its serialized size. This comes out to 1 KB. // its serialized size. This comes out to 1 KB.
assumedMsgSize = 1_024 assumedMsgSize = 1_024

View File

@@ -17,6 +17,7 @@ import (
graphdb "github.com/lightningnetwork/lnd/graph/db" graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/time/rate"
) )
// SyncerType encapsulates the different types of syncing mechanisms for a // SyncerType encapsulates the different types of syncing mechanisms for a
@@ -295,6 +296,11 @@ type gossipSyncerCfg struct {
// timestampQueueSize is the size of the timestamp range queue. If not // timestampQueueSize is the size of the timestamp range queue. If not
// set, defaults to the global timestampQueueSize constant. // set, defaults to the global timestampQueueSize constant.
timestampQueueSize int timestampQueueSize int
// msgBytesPerSecond is the allotted bandwidth rate, expressed in
// bytes/second that this gossip syncer can consume. Once we exceed this
// rate, message sending will block until we're below the rate.
msgBytesPerSecond uint64
} }
// GossipSyncer is a struct that handles synchronizing the channel graph state // GossipSyncer is a struct that handles synchronizing the channel graph state
@@ -407,6 +413,10 @@ type GossipSyncer struct {
// allows contexts that either block or cancel on those depending on // allows contexts that either block or cancel on those depending on
// the use case. // the use case.
cg *fn.ContextGuard cg *fn.ContextGuard
// rateLimiter dictates the frequency with which we will reply to gossip
// queries to this peer.
rateLimiter *rate.Limiter
} }
// newGossipSyncer returns a new instance of the GossipSyncer populated using // newGossipSyncer returns a new instance of the GossipSyncer populated using
@@ -418,6 +428,17 @@ func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
queueSize = defaultTimestampQueueSize queueSize = defaultTimestampQueueSize
} }
bytesPerSecond := cfg.msgBytesPerSecond
if bytesPerSecond == 0 {
bytesPerSecond = DefaultPeerMsgBytesPerSecond
}
bytesBurst := 2 * bytesPerSecond
// We'll use this rate limiter to limit this single peer.
rateLimiter := rate.NewLimiter(
rate.Limit(bytesPerSecond), int(bytesBurst),
)
return &GossipSyncer{ return &GossipSyncer{
cfg: cfg, cfg: cfg,
syncTransitionReqs: make(chan *syncTransitionReq), syncTransitionReqs: make(chan *syncTransitionReq),
@@ -429,6 +450,7 @@ func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
), ),
syncerSema: sema, syncerSema: sema,
cg: fn.NewContextGuard(), cg: fn.NewContextGuard(),
rateLimiter: rateLimiter,
} }
} }