discovery: switch to bytes based rate limiting for outbound msgs

In this commit, we revamp the old message based rate limiting. First, we
move to meter by bytes/s instead of messages/s. The old logic had an
error in that it limited groups of message replies, instead of each
message. With this new approach, we'll use the newly added
SerializedSize method to implement fine grained bandwidth metering.

We need to pick two values, the burst rate, and the msg bytes rate. The
burst rate is the max amt that can be sent in a given period of time. We
need to set this above 65 KB, or the max msg limit, otherwise no
messages can be sent. The bucket starts with this many tokens (bytes).
As those are depleted, the amount of tokens is refilled at the msg
bytes rate.

As conservative values, we've chosen 200 KB as the burst rate, and 100
KB/s as the limit.
This commit is contained in:
Olaoluwa Osuntokun 2025-03-17 16:41:38 -05:00
parent 67d2eac437
commit 05702d48b2
No known key found for this signature in database
GPG Key ID: 90525F7DEEE0AD86
4 changed files with 380 additions and 268 deletions

View File

@ -1,6 +1,7 @@
package discovery
import (
"context"
"errors"
"sync"
"sync/atomic"
@ -11,6 +12,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"golang.org/x/time/rate"
)
const (
@ -25,6 +27,21 @@ const (
// filterSemaSize is the capacity of gossipFilterSema.
filterSemaSize = 5
// DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
// This is the most that can be sent in a given go. Requests beyond
// this, will block indefinitely. Once tokens (bytes are depleted),
// they'll be refilled at the DefaultMsgBytesPerSecond rate.
DefaultMsgBytesBurst = 2 * 100 * 1_024
// DefaultMsgBytesPerSecond is the max bytes/s we'll permit for outgoing
// messages. Once tokens (bytes) have been taken from the bucket,
// they'll be refilled at this rate.
DefaultMsgBytesPerSecond = 100 * 1_024
// assumedMsgSize is the assumed size of a message if we can't compute
// its serialized size. This comes out to 1 KB.
assumedMsgSize = 1_024
)
var (
@ -110,6 +127,15 @@ type SyncManagerCfg struct {
// updates for a channel and returns true if the channel should be
// considered a zombie based on these timestamps.
IsStillZombieChannel func(time.Time, time.Time) bool
// AllotedMsgBytesPerSecond is the allotted bandwidth rate, expressed in
// bytes/second that the gossip manager can consume. Once we exceed this
// rate, message sending will block until we're below the rate.
AllotedMsgBytesPerSecond uint64
// AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
// we've exceeded the hard upper limit.
AllotedMsgBytesBurst uint64
}
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
@ -168,6 +194,12 @@ type SyncManager struct {
// queries.
gossipFilterSema chan struct{}
// rateLimiter dictates the frequency with which we will reply to gossip
// queries from a peer. This is used to delay responses to peers to
// prevent DOS vulnerabilities if they are spamming with an unreasonable
// number of queries.
rateLimiter *rate.Limiter
wg sync.WaitGroup
quit chan struct{}
}
@ -180,8 +212,25 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
filterSema <- struct{}{}
}
bytesPerSecond := cfg.AllotedMsgBytesPerSecond
if bytesPerSecond == 0 {
bytesPerSecond = DefaultMsgBytesPerSecond
}
bytesBurst := cfg.AllotedMsgBytesBurst
if bytesBurst == 0 {
bytesBurst = DefaultMsgBytesBurst
}
// We'll use this rate limiter to limit our total outbound bandwidth for
// gossip queries peers.
rateLimiter := rate.NewLimiter(
rate.Limit(bytesPerSecond), int(bytesBurst),
)
return &SyncManager{
cfg: *cfg,
rateLimiter: rateLimiter,
newSyncers: make(chan *newSyncer),
staleSyncers: make(chan *staleSyncer),
activeSyncers: make(
@ -494,6 +543,95 @@ func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
return isPinnedSyncer
}
// deriveRateLimitReservation will take the current message and derive a
// reservation that can be used to wait on the rate limiter.
func (m *SyncManager) deriveRateLimitReservation(msg lnwire.Message,
) (*rate.Reservation, error) {
var (
msgSize uint32
err error
)
// Figure out the serialized size of the message. If we can't easily
// compute it, then we'll used the assumed msg size.
if sMsg, ok := msg.(lnwire.SizeableMessage); ok {
msgSize, err = sMsg.SerializedSize()
if err != nil {
return nil, err
}
} else {
log.Warnf("Unable to compute serialized size of %T", msg)
msgSize = assumedMsgSize
}
return m.rateLimiter.ReserveN(time.Now(), int(msgSize)), nil
}
// waitMsgDelay takes a delay, and waits until it has finished.
func (m *SyncManager) waitMsgDelay(ctx context.Context, peerPub [33]byte,
limitReservation *rate.Reservation) error {
// If we've already replied a handful of times, we will start to delay
// responses back to the remote peer. This can help prevent DOS attacks
// where the remote peer spams us endlessly.
//
// We skip checking for reservation.OK() here, as during config
// validation, we ensure that the burst is enough for a single message
// to be sent.
delay := limitReservation.Delay()
if delay > 0 {
log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
"responding in %s", peerPub, delay)
select {
case <-time.After(delay):
case <-ctx.Done():
limitReservation.Cancel()
return ErrGossipSyncerExiting
case <-m.quit:
limitReservation.Cancel()
return ErrGossipSyncerExiting
}
}
return nil
}
// maybeRateLimitMsg takes a message, and may wait a period of time to rate
// limit the msg.
func (m *SyncManager) maybeRateLimitMsg(ctx context.Context, peerPub [33]byte,
msg lnwire.Message) error {
delay, err := m.deriveRateLimitReservation(msg)
if err != nil {
return nil
}
return m.waitMsgDelay(ctx, peerPub, delay)
}
// sendMessages sends a set of messages to the remote peer.
func (m *SyncManager) sendMessages(ctx context.Context, sync bool,
peer lnpeer.Peer, nodeID route.Vertex, msgs ...lnwire.Message) error {
for _, msg := range msgs {
if err := m.maybeRateLimitMsg(ctx, nodeID, msg); err != nil {
return err
}
if err := peer.SendMessageLazy(sync, msg); err != nil {
return err
}
}
return nil
}
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
nodeID := route.Vertex(peer.PubKey())
@ -507,20 +645,22 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
encodingType: encoding,
chunkSize: encodingTypeToChunkSize[encoding],
batchSize: requestBatchSize,
sendToPeer: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(false, msgs...)
sendToPeer: func(ctx context.Context,
msgs ...lnwire.Message) error {
return m.sendMessages(ctx, false, peer, nodeID, msgs...)
},
sendToPeerSync: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(true, msgs...)
sendToPeerSync: func(ctx context.Context,
msgs ...lnwire.Message) error {
return m.sendMessages(ctx, true, peer, nodeID, msgs...)
},
ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters,
maxUndelayedQueryReplies: DefaultMaxUndelayedQueryReplies,
delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval,
bestHeight: m.cfg.BestHeight,
markGraphSynced: m.markGraphSynced,
maxQueryChanRangeReplies: maxQueryChanRangeReplies,
noTimestampQueryOption: m.cfg.NoTimestampQueries,
isStillZombieChannel: m.cfg.IsStillZombieChannel,
ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters,
bestHeight: m.cfg.BestHeight,
markGraphSynced: m.markGraphSynced,
maxQueryChanRangeReplies: maxQueryChanRangeReplies,
noTimestampQueryOption: m.cfg.NoTimestampQueries,
isStillZombieChannel: m.cfg.IsStillZombieChannel,
}, m.gossipFilterSema)
// Gossip syncers are initialized by default in a PassiveSync type

View File

@ -1,7 +1,9 @@
package discovery
import (
"bytes"
"fmt"
"io"
"reflect"
"sync"
"sync/atomic"
@ -15,6 +17,7 @@ import (
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
// randPeer creates a random peer.
@ -685,3 +688,167 @@ func assertActiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer)
})
assertSyncerStatus(t, s, chansSynced, PassiveSync)
}
// TestSizeableMessage is a test implementation of lnwire.SizeableMessage.
type TestSizeableMessage struct {
size uint32
sizeErr error
}
// Decode implements the lnwire.Message interface.
func (m *TestSizeableMessage) Decode(r io.Reader, pver uint32) error {
return nil
}
// Encode implements the lnwire.Message interface.
func (m *TestSizeableMessage) Encode(w *bytes.Buffer, pver uint32) error {
return nil
}
// MsgType implements the lnwire.Message interface.
func (m *TestSizeableMessage) MsgType() lnwire.MessageType {
// Use a custom message type for testing purposes.
return lnwire.MessageType(9999)
}
// SerializedSize implements the lnwire.SizeableMessage interface.
func (m *TestSizeableMessage) SerializedSize() (uint32, error) {
if m.sizeErr != nil {
return 0, m.sizeErr
}
return m.size, nil
}
// TestDeriveRateLimitReservation tests that the
// SyncManager.deriveRateLimitReservation method correctly computes delays based
// on message size.
func TestDeriveRateLimitReservation(t *testing.T) {
// Define standard test parameters for rate limiting.
bytesPerSec := uint64(1000) // 1000 bytes/second
bytesBurst := uint64(100) // 100 bytes burst capacity
// Helper to create a fresh SyncManager with a clean rate limiter.
newSyncManager := func(perSec, burst uint64) *SyncManager {
limiter := rate.NewLimiter(rate.Limit(perSec), int(burst))
return &SyncManager{
rateLimiter: limiter,
}
}
// Test sequential messages causing increasing delays - this is the core
// property of the rate limiting mechanism.
t.Run("sequential messages have increasing delays", func(t *testing.T) {
sm := newSyncManager(bytesPerSec, bytesBurst)
// Send a series of messages that exactly consume the burst
// limit to make the behavior predictable.
msg := &TestSizeableMessage{
size: uint32(bytesBurst),
}
// First message should have no delay as it fits within burst.
delay1, err := sm.deriveRateLimitReservation(msg)
require.NoError(t, err)
require.Equal(
t, time.Duration(0), delay1.Delay(), "first message "+
"should have no delay",
)
// Second message should have a non-zero delay as the token
// bucket is now depleted.
delay2, err := sm.deriveRateLimitReservation(msg)
require.NoError(t, err)
require.True(
t, delay2.Delay() > 0, "second message should have "+
"non-zero delay, got: %s", delay2,
)
// Third message should have an even longer delay since the
// token bucket is still refilling at a constant rate.
delay3, err := sm.deriveRateLimitReservation(msg)
require.NoError(t, err)
require.True(t, delay3.Delay() > delay2.Delay(), "third "+
"message should have longer delay than second: %s > %s",
delay3, delay2)
// The expected theoretical delay when sending messages at
// exactly the burst size should be approximately 100ms with our
// parameters.
expectedDelay := time.Duration(
float64(msg.size) / float64(bytesPerSec) * float64(time.Second), //nolint:ll
)
// Check that delays are increasing in a coherent manner.
t.Logf("Delay sequence: %s → %s → %s (expected "+
"theoretical: %s)", delay1.Delay(), delay2.Delay(),
delay3.Delay(), expectedDelay)
})
// Test handling of errors from SerializedSize.
t.Run("propagates serialization errors", func(t *testing.T) {
sm := newSyncManager(bytesPerSec, bytesBurst)
// Create a test message that returns an error from
// SerializedSize.
errMsg := fmt.Errorf("failed to serialize message")
msg := &TestSizeableMessage{
sizeErr: errMsg,
}
// The error should propagate through
// deriveRateLimitReservation.
_, err := sm.deriveRateLimitReservation(msg)
require.Error(t, err)
require.Equal(
t, errMsg, err, "Error should be propagated unchanged",
)
})
// Test that message size affects delay.
t.Run("larger messages have longer delays", func(t *testing.T) {
// Create a rate limiter with a known burst size.
sm := newSyncManager(bytesPerSec, bytesBurst)
// First, empty the token bucket with a message exactly at the
// burst size.
initialMsg := &TestSizeableMessage{
size: uint32(bytesBurst),
}
_, err := sm.deriveRateLimitReservation(initialMsg)
require.NoError(t, err)
// Now send two messages of different sizes and compare their
// delays.
smallMsg := &TestSizeableMessage{
size: 50,
}
largeMsg := &TestSizeableMessage{
size: 200,
}
// Send the small message first.
smallDelay, err := sm.deriveRateLimitReservation(smallMsg)
require.NoError(t, err)
// Reset the limiter to the same state, then empty the bucket.
sm.rateLimiter = rate.NewLimiter(
rate.Limit(bytesPerSec), int(bytesBurst),
)
_, err = sm.deriveRateLimitReservation(initialMsg)
require.NoError(t, err)
// Now send the large message.
largeDelay, err := sm.deriveRateLimitReservation(largeMsg)
require.NoError(t, err)
// The large message should have a longer delay than the small
// one.
require.True(
t, largeDelay.Delay() > smallDelay.Delay(),
"large message (size %d) should have longer delay "+
"(%s) than small message (size %d, delay %s)",
largeMsg.size, largeDelay.Delay(), smallMsg.size,
smallDelay.Delay())
})
}

View File

@ -1,6 +1,7 @@
package discovery
import (
"context"
"errors"
"fmt"
"math"
@ -11,11 +12,11 @@ import (
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/graph"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/time/rate"
)
// SyncerType encapsulates the different types of syncing mechanisms for a
@ -152,15 +153,6 @@ func (s syncerState) String() string {
}
const (
// DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
// will respond to immediately before starting to delay responses.
DefaultMaxUndelayedQueryReplies = 10
// DefaultDelayedQueryReplyInterval is the length of time we will wait
// before responding to gossip queries after replying to
// maxUndelayedQueryReplies queries.
DefaultDelayedQueryReplyInterval = 5 * time.Second
// maxQueryChanRangeReplies specifies the default limit of replies to
// process for a single QueryChannelRange request.
maxQueryChanRangeReplies = 500
@ -250,21 +242,12 @@ type gossipSyncerCfg struct {
// sendToPeer sends a variadic number of messages to the remote peer.
// This method should not block while waiting for sends to be written
// to the wire.
sendToPeer func(...lnwire.Message) error
sendToPeer func(context.Context, ...lnwire.Message) error
// sendToPeerSync sends a variadic number of messages to the remote
// peer, blocking until all messages have been sent successfully or a
// write error is encountered.
sendToPeerSync func(...lnwire.Message) error
// maxUndelayedQueryReplies specifies how many gossip queries we will
// respond to immediately before starting to delay responses.
maxUndelayedQueryReplies int
// delayedQueryReplyInterval is the length of time we will wait before
// responding to gossip queries after replying to
// maxUndelayedQueryReplies queries.
delayedQueryReplyInterval time.Duration
sendToPeerSync func(context.Context, ...lnwire.Message) error
// noSyncChannels will prevent the GossipSyncer from spawning a
// channelGraphSyncer, meaning we will not try to reconcile unknown
@ -390,12 +373,6 @@ type GossipSyncer struct {
cfg gossipSyncerCfg
// rateLimiter dictates the frequency with which we will reply to gossip
// queries from a peer. This is used to delay responses to peers to
// prevent DOS vulnerabilities if they are spamming with an unreasonable
// number of queries.
rateLimiter *rate.Limiter
// syncedSignal is a channel that, if set, will be closed when the
// GossipSyncer reaches its terminal chansSynced state.
syncedSignal chan struct{}
@ -406,43 +383,23 @@ type GossipSyncer struct {
sync.Mutex
quit chan struct{}
wg sync.WaitGroup
// cg is a helper that encapsulates a wait group and quit channel and
// allows contexts that either block or cancel on those depending on
// the use case.
cg *fn.ContextGuard
}
// newGossipSyncer returns a new instance of the GossipSyncer populated using
// the passed config.
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
// If no parameter was specified for max undelayed query replies, set it
// to the default of 5 queries.
if cfg.maxUndelayedQueryReplies <= 0 {
cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
}
// If no parameter was specified for delayed query reply interval, set
// to the default of 5 seconds.
if cfg.delayedQueryReplyInterval <= 0 {
cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
}
// Construct a rate limiter that will govern how frequently we reply to
// gossip queries from this peer. The limiter will automatically adjust
// during periods of quiescence, and increase the reply interval under
// load.
interval := rate.Every(cfg.delayedQueryReplyInterval)
rateLimiter := rate.NewLimiter(
interval, cfg.maxUndelayedQueryReplies,
)
return &GossipSyncer{
cfg: cfg,
rateLimiter: rateLimiter,
syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan *historicalSyncReq),
gossipMsgs: make(chan lnwire.Message, syncerBufferSize),
queryMsgs: make(chan lnwire.Message, syncerBufferSize),
syncerSema: sema,
quit: make(chan struct{}),
cg: fn.NewContextGuard(),
}
}
@ -456,11 +413,11 @@ func (g *GossipSyncer) Start() {
// supports gossip queries, and only spawn replyHandler if we
// advertise support
if !g.cfg.noSyncChannels {
g.wg.Add(1)
g.cg.WgAdd(1)
go g.channelGraphSyncer()
}
if !g.cfg.noReplyQueries {
g.wg.Add(1)
g.cg.WgAdd(1)
go g.replyHandler()
}
})
@ -473,8 +430,7 @@ func (g *GossipSyncer) Stop() {
log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
close(g.quit)
g.wg.Wait()
g.cg.Quit()
})
}
@ -500,7 +456,8 @@ func (g *GossipSyncer) handleSyncingChans() {
// Send the msg to the remote peer, which is non-blocking as
// `sendToPeer` only queues the msg in Brontide.
err = g.cfg.sendToPeer(queryRangeMsg)
ctx, _ := g.cg.Create(context.Background())
err = g.cfg.sendToPeer(ctx, queryRangeMsg)
if err != nil {
log.Errorf("Unable to send chan range query: %v", err)
return
@ -515,7 +472,7 @@ func (g *GossipSyncer) handleSyncingChans() {
// properly channel graph state with the remote peer, and also that we only
// send them messages which actually pass their defined update horizon.
func (g *GossipSyncer) channelGraphSyncer() {
defer g.wg.Done()
defer g.cg.WgDone()
for {
state := g.syncState()
@ -563,7 +520,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
log.Warnf("Unexpected message: %T in state=%v",
msg, state)
case <-g.quit:
case <-g.cg.Done():
return
}
@ -613,7 +570,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
log.Warnf("Unexpected message: %T in state=%v",
msg, state)
case <-g.quit:
case <-g.cg.Done():
return
}
@ -659,7 +616,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
case req := <-g.historicalSyncReqs:
g.handleHistoricalSync(req)
case <-g.quit:
case <-g.cg.Done():
return
}
}
@ -674,7 +631,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
//
// NOTE: This method MUST be run as a goroutine.
func (g *GossipSyncer) replyHandler() {
defer g.wg.Done()
defer g.cg.WgDone()
for {
select {
@ -692,7 +649,7 @@ func (g *GossipSyncer) replyHandler() {
"query: %v", err)
}
case <-g.quit:
case <-g.cg.Done():
return
}
}
@ -716,7 +673,8 @@ func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
TimestampRange: timestampRange,
}
if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
ctx, _ := g.cg.Create(context.Background())
if err := g.cfg.sendToPeer(ctx, localUpdateHorizon); err != nil {
return err
}
@ -772,7 +730,8 @@ func (g *GossipSyncer) synchronizeChanIDs() bool {
// With our chunk obtained, we'll send over our next query, then return
// false indicating that we're net yet fully synced.
err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
ctx, _ := g.cg.Create(context.Background())
err := g.cfg.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
ChainHash: g.cfg.chainHash,
EncodingType: lnwire.EncodingSortedPlain,
ShortChanIDs: queryChunk,
@ -1047,23 +1006,6 @@ func (g *GossipSyncer) genChanRangeQuery(
// replyPeerQueries is called in response to any query by the remote peer.
// We'll examine our state and send back our best response.
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
reservation := g.rateLimiter.Reserve()
delay := reservation.Delay()
// If we've already replied a handful of times, we will start to delay
// responses back to the remote peer. This can help prevent DOS attacks
// where the remote peer spams us endlessly.
if delay > 0 {
log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
"responding in %s", g.cfg.peerPub[:], delay)
select {
case <-time.After(delay):
case <-g.quit:
return ErrGossipSyncerExiting
}
}
switch msg := msg.(type) {
// In this state, we'll also handle any incoming channel range queries
@ -1096,7 +1038,9 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
"chain=%v, we're on chain=%v", query.ChainHash,
g.cfg.chainHash)
return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
ctx, _ := g.cg.Create(context.Background())
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
ChainHash: query.ChainHash,
FirstBlockHeight: query.FirstBlockHeight,
NumBlocks: query.NumBlocks,
@ -1169,7 +1113,9 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
)
}
return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
ctx, _ := g.cg.Create(context.Background())
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
ChainHash: query.ChainHash,
NumBlocks: numBlocks,
FirstBlockHeight: firstHeight,
@ -1273,7 +1219,9 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
"chain=%v, we're on chain=%v", query.ChainHash,
g.cfg.chainHash)
return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
ctx, _ := g.cg.Create(context.Background())
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 0,
})
@ -1304,7 +1252,8 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
// each one individually and synchronously to throttle the sends and
// perform buffering of responses in the syncer as opposed to the peer.
for _, msg := range replyMsgs {
err := g.cfg.sendToPeerSync(msg)
ctx, _ := g.cg.Create(context.Background())
err := g.cfg.sendToPeerSync(ctx, msg)
if err != nil {
return err
}
@ -1312,7 +1261,9 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
// Regardless of whether we had any messages to reply with, send over
// the sentinel message to signal that the stream has terminated.
return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
ctx, _ := g.cg.Create(context.Background())
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 1,
})
@ -1341,7 +1292,7 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
select {
case <-g.syncerSema:
case <-g.quit:
case <-g.cg.Done():
return ErrGossipSyncerExiting
}
@ -1372,13 +1323,14 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
}
// We'll conclude by launching a goroutine to send out any updates.
g.wg.Add(1)
g.cg.WgAdd(1)
go func() {
defer g.wg.Done()
defer g.cg.WgDone()
defer returnSema()
for _, msg := range newUpdatestoSend {
err := g.cfg.sendToPeerSync(msg)
ctx, _ := g.cg.Create(context.Background())
err := g.cfg.sendToPeerSync(ctx, msg)
switch {
case err == ErrGossipSyncerExiting:
return
@ -1411,7 +1363,7 @@ func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
// If we've been signaled to exit, or are exiting, then we'll stop
// short.
select {
case <-g.quit:
case <-g.cg.Done():
return
default:
}
@ -1520,7 +1472,11 @@ func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
return
}
g.cfg.sendToPeer(msgsToSend...)
ctx, _ := g.cg.Create(context.Background())
if err = g.cfg.sendToPeer(ctx, msgsToSend...); err != nil {
log.Errorf("unable to send gossip msgs: %v", err)
}
}
// ProcessQueryMsg is used by outside callers to pass new channel time series
@ -1553,7 +1509,7 @@ func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc
select {
case msgChan <- msg:
case <-peerQuit:
case <-g.quit:
case <-g.cg.Done():
}
return nil
@ -1601,14 +1557,14 @@ func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
}:
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
case <-g.cg.Done():
return ErrGossipSyncerExiting
}
select {
case err := <-errChan:
return err
case <-g.quit:
case <-g.cg.Done():
return ErrGossipSyncerExiting
}
}
@ -1687,14 +1643,14 @@ func (g *GossipSyncer) historicalSync() error {
}:
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
case <-g.cg.Done():
return ErrGossiperShuttingDown
}
select {
case <-done:
return nil
case <-g.quit:
case <-g.cg.Done():
return ErrGossiperShuttingDown
}
}

View File

@ -1,6 +1,7 @@
package discovery
import (
"context"
"errors"
"fmt"
"math"
@ -196,15 +197,18 @@ func newTestSyncer(hID lnwire.ShortChannelID,
noSyncChannels: !syncChannels,
noReplyQueries: !replyQueries,
noTimestampQueryOption: !timestamps,
sendToPeer: func(msgs ...lnwire.Message) error {
sendToPeer: func(_ context.Context,
msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
},
sendToPeerSync: func(msgs ...lnwire.Message) error {
sendToPeerSync: func(_ context.Context,
msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
},
delayedQueryReplyInterval: 2 * time.Second,
bestHeight: func() uint32 {
return latestKnownHeight
},
@ -1557,161 +1561,6 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
}
}
// TestGossipSyncerDelayDOS tests that the gossip syncer will begin delaying
// queries after its prescribed allotment of undelayed query responses. Once
// this happens, all query replies should be delayed by the configurated
// interval.
func TestGossipSyncerDelayDOS(t *testing.T) {
t.Parallel()
// We'll modify the chunk size to be a smaller value, since we'll be
// sending a modest number of queries. After exhausting our undelayed
// gossip queries, we'll send two extra queries and ensure that they are
// delayed properly.
const chunkSize = 2
const numDelayedQueries = 2
const delayTolerance = time.Millisecond * 200
// First, we'll create two GossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
highestID := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
highestID, defaultEncoding, chunkSize, true, false,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
highestID, defaultEncoding, chunkSize, false, true,
)
syncer2.Start()
defer syncer2.Stop()
// Record the delayed query reply interval used by each syncer.
delayedQueryInterval := syncer1.cfg.delayedQueryReplyInterval
// Record the number of undelayed queries allowed by the syncers.
numUndelayedQueries := syncer1.cfg.maxUndelayedQueryReplies
// We will send enough queries to exhaust the undelayed responses, and
// then send two more queries which should be delayed. An additional one
// is subtracted from the total since undelayed message will be consumed
// by the initial QueryChannelRange.
numQueryResponses := numUndelayedQueries + numDelayedQueries - 1
// The total number of responses must include the initial reply each
// syncer will make to QueryChannelRange.
numTotalQueries := 1 + numQueryResponses
// The total number of channels each syncer needs to request must be
// scaled by the chunk size being used.
numTotalChans := numQueryResponses * chunkSize
// Construct enough channels so that all of the queries will have enough
// channels. Since syncer1 won't know of any channels, their sets are
// inherently disjoint.
var syncer2Chans []lnwire.ShortChannelID
for i := 0; i < numTotalChans; i++ {
syncer2Chans = append([]lnwire.ShortChannelID{
{
BlockHeight: highestID.BlockHeight - uint32(i) - 1,
TxIndex: uint32(i),
},
}, syncer2Chans...)
}
// We'll kick off the test by asserting syncer1 sends over the
// QueryChannelRange message the other node.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.queryMsgs <- msg:
}
}
}
// At this point, we'll need to a response from syncer2's channel
// series. This will cause syncer1 to simply request the entire set of
// channels from syncer2. This will count as the first undelayed
// response for sycner2.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterRangeReqs:
// We'll send back all the channels that it should know of.
chanSeries2.filterRangeResp <- syncer2Chans
}
// At this point, we'll assert that the ReplyChannelRange message is
// sent by sycner2.
for i := 0; i < numQueryResponses; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}
// We'll now have syncer1 process the received sids from syncer2.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterReq:
chanSeries1.filterResp <- syncer2Chans
}
// At this point, syncer1 should start to send out initial requests to
// query the chan IDs of the remote party. We'll keep track of the
// number of queries made using the iterated value, which starts at one
// due the initial contribution of the QueryChannelRange msgs.
for i := 1; i < numTotalQueries; i++ {
expDelayResponse := i >= numUndelayedQueries
queryBatch(t,
msgChan1, msgChan2,
syncer1, syncer2,
chanSeries2,
expDelayResponse,
delayedQueryInterval,
delayTolerance,
)
}
}
// queryBatch is a helper method that will query for a single batch of channels
// from a peer and assert the responses. The method can also be used to assert
// the same transition happens, but is delayed by the remote peer's DOS