From 778456769afbf9af805f476073ce2cec767f5a43 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 22 Jul 2025 19:17:32 +0800 Subject: [PATCH] discovery: rate limiting sending msgs per peer We now add another layer of rate limiting before sending the messages inside `GossipSyncer`. --- discovery/sync_manager.go | 9 +--- discovery/syncer.go | 75 ++++++++++++++++++++++++--------- discovery/syncer_atomic_test.go | 11 +++-- discovery/syncer_test.go | 29 ++++++------- 4 files changed, 79 insertions(+), 45 deletions(-) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 6ffea00cb..2cc1bd949 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -663,15 +663,10 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { encodingType: encoding, chunkSize: encodingTypeToChunkSize[encoding], batchSize: requestBatchSize, - sendToPeer: func(ctx context.Context, + sendMsg: func(ctx context.Context, sync bool, msgs ...lnwire.Message) error { - return m.sendMessages(ctx, false, peer, nodeID, msgs...) - }, - sendToPeerSync: func(ctx context.Context, - msgs ...lnwire.Message) error { - - return m.sendMessages(ctx, true, peer, nodeID, msgs...) + return m.sendMessages(ctx, sync, peer, nodeID, msgs...) }, ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters, bestHeight: m.cfg.BestHeight, diff --git a/discovery/syncer.go b/discovery/syncer.go index 671b75e04..12e7dbf16 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -246,15 +246,10 @@ type gossipSyncerCfg struct { // the remote node in a single QueryShortChanIDs request. batchSize int32 - // sendToPeer sends a variadic number of messages to the remote peer. - // This method should not block while waiting for sends to be written - // to the wire. - sendToPeer func(context.Context, ...lnwire.Message) error - - // sendToPeerSync sends a variadic number of messages to the remote - // peer, blocking until all messages have been sent successfully or a - // write error is encountered. - sendToPeerSync func(context.Context, ...lnwire.Message) error + // sendMsg sends a variadic number of messages to the remote peer. + // The boolean indicates whether this method should be blocked or not + // while waiting for sends to be written to the wire. + sendMsg func(context.Context, bool, ...lnwire.Message) error // noSyncChannels will prevent the GossipSyncer from spawning a // channelGraphSyncer, meaning we will not try to reconcile unknown @@ -518,7 +513,7 @@ func (g *GossipSyncer) handleSyncingChans(ctx context.Context) { // Send the msg to the remote peer, which is non-blocking as // `sendToPeer` only queues the msg in Brontide. - err = g.cfg.sendToPeer(ctx, queryRangeMsg) + err = g.sendToPeer(ctx, queryRangeMsg) if err != nil { log.Errorf("Unable to send chan range query: %v", err) return @@ -805,11 +800,11 @@ func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context, TimestampRange: timestampRange, } - if err := g.cfg.sendToPeer(ctx, localUpdateHorizon); err != nil { + if err := g.sendToPeer(ctx, localUpdateHorizon); err != nil { return err } - if firstTimestamp == zeroTimestamp && timestampRange == 0 { + if firstTimestamp.Equal(zeroTimestamp) && timestampRange == 0 { g.localUpdateHorizon = nil } else { g.localUpdateHorizon = localUpdateHorizon @@ -861,7 +856,7 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool { // With our chunk obtained, we'll send over our next query, then return // false indicating that we're net yet fully synced. - err := g.cfg.sendToPeer(ctx, &lnwire.QueryShortChanIDs{ + err := g.sendToPeer(ctx, &lnwire.QueryShortChanIDs{ ChainHash: g.cfg.chainHash, EncodingType: lnwire.EncodingSortedPlain, ShortChanIDs: queryChunk, @@ -1176,7 +1171,7 @@ func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context, "chain=%v, we're on chain=%v", query.ChainHash, g.cfg.chainHash) - return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{ + return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{ ChainHash: query.ChainHash, FirstBlockHeight: query.FirstBlockHeight, NumBlocks: query.NumBlocks, @@ -1249,7 +1244,7 @@ func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context, ) } - return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{ + return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{ ChainHash: query.ChainHash, NumBlocks: numBlocks, FirstBlockHeight: firstHeight, @@ -1355,7 +1350,7 @@ func (g *GossipSyncer) replyShortChanIDs(ctx context.Context, "chain=%v, we're on chain=%v", query.ChainHash, g.cfg.chainHash) - return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{ + return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{ ChainHash: query.ChainHash, Complete: 0, }) @@ -1386,7 +1381,7 @@ func (g *GossipSyncer) replyShortChanIDs(ctx context.Context, // each one individually and synchronously to throttle the sends and // perform buffering of responses in the syncer as opposed to the peer. for _, msg := range replyMsgs { - err := g.cfg.sendToPeerSync(ctx, msg) + err := g.sendToPeerSync(ctx, msg) if err != nil { return err } @@ -1394,7 +1389,7 @@ func (g *GossipSyncer) replyShortChanIDs(ctx context.Context, // Regardless of whether we had any messages to reply with, send over // the sentinel message to signal that the stream has terminated. - return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{ + return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{ ChainHash: query.ChainHash, Complete: 1, }) @@ -1484,7 +1479,7 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context, defer g.isSendingBacklog.Store(false) for _, msg := range newUpdatestoSend { - err := g.cfg.sendToPeerSync(ctx, msg) + err := g.sendToPeerSync(ctx, msg) switch { case err == ErrGossipSyncerExiting: return @@ -1630,7 +1625,7 @@ func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context, return } - if err = g.cfg.sendToPeer(ctx, msgsToSend...); err != nil { + if err = g.sendToPeer(ctx, msgsToSend...); err != nil { log.Errorf("unable to send gossip msgs: %v", err) } @@ -1824,3 +1819,43 @@ func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) { g.setSyncState(syncingChans) close(req.doneChan) } + +// sendToPeer sends a variadic number of messages to the remote peer. This +// method should not block while waiting for sends to be written to the wire. +func (g *GossipSyncer) sendToPeer(ctx context.Context, + msgs ...lnwire.Message) error { + + return g.sendMsgRateLimited(ctx, false, msgs...) +} + +// sendToPeerSync sends a variadic number of messages to the remote peer, +// blocking until all messages have been sent successfully or a write error is +// encountered. +func (g *GossipSyncer) sendToPeerSync(ctx context.Context, + msgs ...lnwire.Message) error { + + return g.sendMsgRateLimited(ctx, true, msgs...) +} + +// sendMsgRateLimited sends a variadic number of messages to the remote peer, +// applying our per-peer rate limit before each send. The sync boolean +// determines if the send is blocking or not. +func (g *GossipSyncer) sendMsgRateLimited(ctx context.Context, sync bool, + msgs ...lnwire.Message) error { + + for _, msg := range msgs { + err := maybeRateLimitMsg( + ctx, g.rateLimiter, g.cfg.peerPub, msg, g.cg.Done(), + ) + if err != nil { + return err + } + + err = g.cfg.sendMsg(ctx, sync, msg) + if err != nil { + return err + } + } + + return nil +} diff --git a/discovery/syncer_atomic_test.go b/discovery/syncer_atomic_test.go index 10a6b252c..ea1a60886 100644 --- a/discovery/syncer_atomic_test.go +++ b/discovery/syncer_atomic_test.go @@ -26,9 +26,14 @@ func TestGossipSyncerSingleBacklogSend(t *testing.T) { // Create a blocking sendToPeerSync function. We'll use this to simulate // sending a large backlog. blockingSendChan := make(chan struct{}) - sendToPeerSync := func(_ context.Context, + mockSendMsg := func(_ context.Context, sync bool, msgs ...lnwire.Message) error { + // Sync is only true when calling `sendToPeerSync`. + if !sync { + return nil + } + // Track that we're in a send goroutine. count := activeGoroutines.Add(1) totalGoroutinesLaunched.Add(1) @@ -55,8 +60,8 @@ func TestGossipSyncerSingleBacklogSend(t *testing.T) { defaultChunkSize, true, true, true, ) - // Override the sendToPeerSync to use our blocking version. - syncer.cfg.sendToPeerSync = sendToPeerSync + // Override the sendMsg to use our blocking version. + syncer.cfg.sendMsg = mockSendMsg syncer.cfg.ignoreHistoricalFilters = false syncer.Start() diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 328b3d944..edf91cb69 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -200,13 +200,7 @@ func newTestSyncer(hID lnwire.ShortChannelID, noSyncChannels: !syncChannels, noReplyQueries: !replyQueries, noTimestampQueryOption: !timestamps, - sendToPeer: func(_ context.Context, - msgs ...lnwire.Message) error { - - msgChan <- msgs - return nil - }, - sendToPeerSync: func(_ context.Context, + sendMsg: func(_ context.Context, _ bool, msgs ...lnwire.Message) error { msgChan <- msgs @@ -392,16 +386,21 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) { // We'll then instruct the gossiper to filter this set of messages. syncer.FilterGossipMsgs(ctx, msgs...) - // Out of all the messages we sent in, we should only get 2 of them + // Out of all the messages we sent in, we should only get 3 of them // back. - select { - case <-time.After(time.Second * 15): - t.Fatalf("no msgs received") + msgReceived := make([]lnwire.Message, 0, 3) + for { + select { + case <-time.After(time.Second * 1): + t.Fatalf("timeout receiving msg, want 3 msgs, got %v "+ + "messages: %v", len(msgReceived), msgReceived) - case msgs := <-msgChan: - if len(msgs) != 3 { - t.Fatalf("expected 3 messages instead got %v "+ - "messages: %v", len(msgs), spew.Sdump(msgs)) + case msgs := <-msgChan: + msgReceived = append(msgReceived, msgs...) + } + + if len(msgReceived) == 3 { + break } }