discovery: rate limiting sending msgs per peer

We now add another layer of rate limiting before sending the messages
inside `GossipSyncer`.
This commit is contained in:
yyforyongyu
2025-07-22 19:17:32 +08:00
parent 6826703c77
commit 778456769a
4 changed files with 79 additions and 45 deletions

View File

@@ -663,15 +663,10 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
encodingType: encoding, encodingType: encoding,
chunkSize: encodingTypeToChunkSize[encoding], chunkSize: encodingTypeToChunkSize[encoding],
batchSize: requestBatchSize, batchSize: requestBatchSize,
sendToPeer: func(ctx context.Context, sendMsg: func(ctx context.Context, sync bool,
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
return m.sendMessages(ctx, false, peer, nodeID, msgs...) return m.sendMessages(ctx, sync, peer, nodeID, msgs...)
},
sendToPeerSync: func(ctx context.Context,
msgs ...lnwire.Message) error {
return m.sendMessages(ctx, true, peer, nodeID, msgs...)
}, },
ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters, ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters,
bestHeight: m.cfg.BestHeight, bestHeight: m.cfg.BestHeight,

View File

@@ -246,15 +246,10 @@ type gossipSyncerCfg struct {
// the remote node in a single QueryShortChanIDs request. // the remote node in a single QueryShortChanIDs request.
batchSize int32 batchSize int32
// sendToPeer sends a variadic number of messages to the remote peer. // sendMsg sends a variadic number of messages to the remote peer.
// This method should not block while waiting for sends to be written // The boolean indicates whether this method should be blocked or not
// to the wire. // while waiting for sends to be written to the wire.
sendToPeer func(context.Context, ...lnwire.Message) error sendMsg func(context.Context, bool, ...lnwire.Message) error
// sendToPeerSync sends a variadic number of messages to the remote
// peer, blocking until all messages have been sent successfully or a
// write error is encountered.
sendToPeerSync func(context.Context, ...lnwire.Message) error
// noSyncChannels will prevent the GossipSyncer from spawning a // noSyncChannels will prevent the GossipSyncer from spawning a
// channelGraphSyncer, meaning we will not try to reconcile unknown // channelGraphSyncer, meaning we will not try to reconcile unknown
@@ -518,7 +513,7 @@ func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
// Send the msg to the remote peer, which is non-blocking as // Send the msg to the remote peer, which is non-blocking as
// `sendToPeer` only queues the msg in Brontide. // `sendToPeer` only queues the msg in Brontide.
err = g.cfg.sendToPeer(ctx, queryRangeMsg) err = g.sendToPeer(ctx, queryRangeMsg)
if err != nil { if err != nil {
log.Errorf("Unable to send chan range query: %v", err) log.Errorf("Unable to send chan range query: %v", err)
return return
@@ -805,11 +800,11 @@ func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
TimestampRange: timestampRange, TimestampRange: timestampRange,
} }
if err := g.cfg.sendToPeer(ctx, localUpdateHorizon); err != nil { if err := g.sendToPeer(ctx, localUpdateHorizon); err != nil {
return err return err
} }
if firstTimestamp == zeroTimestamp && timestampRange == 0 { if firstTimestamp.Equal(zeroTimestamp) && timestampRange == 0 {
g.localUpdateHorizon = nil g.localUpdateHorizon = nil
} else { } else {
g.localUpdateHorizon = localUpdateHorizon g.localUpdateHorizon = localUpdateHorizon
@@ -861,7 +856,7 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
// With our chunk obtained, we'll send over our next query, then return // With our chunk obtained, we'll send over our next query, then return
// false indicating that we're net yet fully synced. // false indicating that we're net yet fully synced.
err := g.cfg.sendToPeer(ctx, &lnwire.QueryShortChanIDs{ err := g.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
ChainHash: g.cfg.chainHash, ChainHash: g.cfg.chainHash,
EncodingType: lnwire.EncodingSortedPlain, EncodingType: lnwire.EncodingSortedPlain,
ShortChanIDs: queryChunk, ShortChanIDs: queryChunk,
@@ -1176,7 +1171,7 @@ func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
"chain=%v, we're on chain=%v", query.ChainHash, "chain=%v, we're on chain=%v", query.ChainHash,
g.cfg.chainHash) g.cfg.chainHash)
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{ return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
ChainHash: query.ChainHash, ChainHash: query.ChainHash,
FirstBlockHeight: query.FirstBlockHeight, FirstBlockHeight: query.FirstBlockHeight,
NumBlocks: query.NumBlocks, NumBlocks: query.NumBlocks,
@@ -1249,7 +1244,7 @@ func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
) )
} }
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{ return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
ChainHash: query.ChainHash, ChainHash: query.ChainHash,
NumBlocks: numBlocks, NumBlocks: numBlocks,
FirstBlockHeight: firstHeight, FirstBlockHeight: firstHeight,
@@ -1355,7 +1350,7 @@ func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
"chain=%v, we're on chain=%v", query.ChainHash, "chain=%v, we're on chain=%v", query.ChainHash,
g.cfg.chainHash) g.cfg.chainHash)
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{ return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash, ChainHash: query.ChainHash,
Complete: 0, Complete: 0,
}) })
@@ -1386,7 +1381,7 @@ func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
// each one individually and synchronously to throttle the sends and // each one individually and synchronously to throttle the sends and
// perform buffering of responses in the syncer as opposed to the peer. // perform buffering of responses in the syncer as opposed to the peer.
for _, msg := range replyMsgs { for _, msg := range replyMsgs {
err := g.cfg.sendToPeerSync(ctx, msg) err := g.sendToPeerSync(ctx, msg)
if err != nil { if err != nil {
return err return err
} }
@@ -1394,7 +1389,7 @@ func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
// Regardless of whether we had any messages to reply with, send over // Regardless of whether we had any messages to reply with, send over
// the sentinel message to signal that the stream has terminated. // the sentinel message to signal that the stream has terminated.
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{ return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash, ChainHash: query.ChainHash,
Complete: 1, Complete: 1,
}) })
@@ -1484,7 +1479,7 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
defer g.isSendingBacklog.Store(false) defer g.isSendingBacklog.Store(false)
for _, msg := range newUpdatestoSend { for _, msg := range newUpdatestoSend {
err := g.cfg.sendToPeerSync(ctx, msg) err := g.sendToPeerSync(ctx, msg)
switch { switch {
case err == ErrGossipSyncerExiting: case err == ErrGossipSyncerExiting:
return return
@@ -1630,7 +1625,7 @@ func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context,
return return
} }
if err = g.cfg.sendToPeer(ctx, msgsToSend...); err != nil { if err = g.sendToPeer(ctx, msgsToSend...); err != nil {
log.Errorf("unable to send gossip msgs: %v", err) log.Errorf("unable to send gossip msgs: %v", err)
} }
@@ -1824,3 +1819,43 @@ func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
g.setSyncState(syncingChans) g.setSyncState(syncingChans)
close(req.doneChan) close(req.doneChan)
} }
// sendToPeer sends a variadic number of messages to the remote peer. This
// method should not block while waiting for sends to be written to the wire.
func (g *GossipSyncer) sendToPeer(ctx context.Context,
msgs ...lnwire.Message) error {
return g.sendMsgRateLimited(ctx, false, msgs...)
}
// sendToPeerSync sends a variadic number of messages to the remote peer,
// blocking until all messages have been sent successfully or a write error is
// encountered.
func (g *GossipSyncer) sendToPeerSync(ctx context.Context,
msgs ...lnwire.Message) error {
return g.sendMsgRateLimited(ctx, true, msgs...)
}
// sendMsgRateLimited sends a variadic number of messages to the remote peer,
// applying our per-peer rate limit before each send. The sync boolean
// determines if the send is blocking or not.
func (g *GossipSyncer) sendMsgRateLimited(ctx context.Context, sync bool,
msgs ...lnwire.Message) error {
for _, msg := range msgs {
err := maybeRateLimitMsg(
ctx, g.rateLimiter, g.cfg.peerPub, msg, g.cg.Done(),
)
if err != nil {
return err
}
err = g.cfg.sendMsg(ctx, sync, msg)
if err != nil {
return err
}
}
return nil
}

View File

@@ -26,9 +26,14 @@ func TestGossipSyncerSingleBacklogSend(t *testing.T) {
// Create a blocking sendToPeerSync function. We'll use this to simulate // Create a blocking sendToPeerSync function. We'll use this to simulate
// sending a large backlog. // sending a large backlog.
blockingSendChan := make(chan struct{}) blockingSendChan := make(chan struct{})
sendToPeerSync := func(_ context.Context, mockSendMsg := func(_ context.Context, sync bool,
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
// Sync is only true when calling `sendToPeerSync`.
if !sync {
return nil
}
// Track that we're in a send goroutine. // Track that we're in a send goroutine.
count := activeGoroutines.Add(1) count := activeGoroutines.Add(1)
totalGoroutinesLaunched.Add(1) totalGoroutinesLaunched.Add(1)
@@ -55,8 +60,8 @@ func TestGossipSyncerSingleBacklogSend(t *testing.T) {
defaultChunkSize, true, true, true, defaultChunkSize, true, true, true,
) )
// Override the sendToPeerSync to use our blocking version. // Override the sendMsg to use our blocking version.
syncer.cfg.sendToPeerSync = sendToPeerSync syncer.cfg.sendMsg = mockSendMsg
syncer.cfg.ignoreHistoricalFilters = false syncer.cfg.ignoreHistoricalFilters = false
syncer.Start() syncer.Start()

View File

@@ -200,13 +200,7 @@ func newTestSyncer(hID lnwire.ShortChannelID,
noSyncChannels: !syncChannels, noSyncChannels: !syncChannels,
noReplyQueries: !replyQueries, noReplyQueries: !replyQueries,
noTimestampQueryOption: !timestamps, noTimestampQueryOption: !timestamps,
sendToPeer: func(_ context.Context, sendMsg: func(_ context.Context, _ bool,
msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
},
sendToPeerSync: func(_ context.Context,
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
msgChan <- msgs msgChan <- msgs
@@ -392,16 +386,21 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
// We'll then instruct the gossiper to filter this set of messages. // We'll then instruct the gossiper to filter this set of messages.
syncer.FilterGossipMsgs(ctx, msgs...) syncer.FilterGossipMsgs(ctx, msgs...)
// Out of all the messages we sent in, we should only get 2 of them // Out of all the messages we sent in, we should only get 3 of them
// back. // back.
select { msgReceived := make([]lnwire.Message, 0, 3)
case <-time.After(time.Second * 15): for {
t.Fatalf("no msgs received") select {
case <-time.After(time.Second * 1):
t.Fatalf("timeout receiving msg, want 3 msgs, got %v "+
"messages: %v", len(msgReceived), msgReceived)
case msgs := <-msgChan: case msgs := <-msgChan:
if len(msgs) != 3 { msgReceived = append(msgReceived, msgs...)
t.Fatalf("expected 3 messages instead got %v "+ }
"messages: %v", len(msgs), spew.Sdump(msgs))
if len(msgReceived) == 3 {
break
} }
} }