Merge pull request #9607 from Roasbeef/unified-gossip-limiter

discovery: unify rate.Limiter across all gossip peers
This commit is contained in:
Olaoluwa Osuntokun 2025-03-24 19:22:16 -07:00 committed by GitHub
commit 1cebfedbae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 432 additions and 278 deletions

View File

@ -707,6 +707,8 @@ func DefaultConfig() Config {
ChannelUpdateInterval: discovery.DefaultChannelUpdateInterval,
SubBatchDelay: discovery.DefaultSubBatchDelay,
AnnouncementConf: discovery.DefaultProofMatureDelta,
MsgRateBytes: discovery.DefaultMsgBytesPerSecond,
MsgBurstBytes: discovery.DefaultMsgBytesBurst,
},
Invoices: &lncfg.Invoices{
HoldExpiryDelta: lncfg.DefaultHoldInvoiceExpiryDelta,

View File

@ -390,6 +390,14 @@ type Config struct {
// spent-ness of channel outpoints. For neutrino, this saves long
// rescans from blocking initial usage of the daemon.
AssumeChannelValid bool
// MsgRateBytes is the rate limit for the number of bytes per second
// that we'll allocate to outbound gossip messages.
MsgRateBytes uint64
// MsgBurstBytes is the allotted burst amount in bytes. This is the
// number of starting tokens in our token bucket algorithm.
MsgBurstBytes uint64
}
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
@ -574,16 +582,18 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
NumActiveSyncers: cfg.NumActiveSyncers,
NoTimestampQueries: cfg.NoTimestampQueries,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
BestHeight: gossiper.latestHeight,
PinnedSyncers: cfg.PinnedSyncers,
IsStillZombieChannel: cfg.IsStillZombieChannel,
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
NumActiveSyncers: cfg.NumActiveSyncers,
NoTimestampQueries: cfg.NoTimestampQueries,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
BestHeight: gossiper.latestHeight,
PinnedSyncers: cfg.PinnedSyncers,
IsStillZombieChannel: cfg.IsStillZombieChannel,
AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
AllotedMsgBytesBurst: cfg.MsgBurstBytes,
})
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{

View File

@ -1,6 +1,7 @@
package discovery
import (
"context"
"errors"
"sync"
"sync/atomic"
@ -11,6 +12,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"golang.org/x/time/rate"
)
const (
@ -25,6 +27,21 @@ const (
// filterSemaSize is the capacity of gossipFilterSema.
filterSemaSize = 5
// DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
// This is the most that can be sent in a given go. Requests beyond
// this, will block indefinitely. Once tokens (bytes are depleted),
// they'll be refilled at the DefaultMsgBytesPerSecond rate.
DefaultMsgBytesBurst = 2 * 100 * 1_024
// DefaultMsgBytesPerSecond is the max bytes/s we'll permit for outgoing
// messages. Once tokens (bytes) have been taken from the bucket,
// they'll be refilled at this rate.
DefaultMsgBytesPerSecond = 100 * 1_024
// assumedMsgSize is the assumed size of a message if we can't compute
// its serialized size. This comes out to 1 KB.
assumedMsgSize = 1_024
)
var (
@ -110,6 +127,15 @@ type SyncManagerCfg struct {
// updates for a channel and returns true if the channel should be
// considered a zombie based on these timestamps.
IsStillZombieChannel func(time.Time, time.Time) bool
// AllotedMsgBytesPerSecond is the allotted bandwidth rate, expressed in
// bytes/second that the gossip manager can consume. Once we exceed this
// rate, message sending will block until we're below the rate.
AllotedMsgBytesPerSecond uint64
// AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
// we've exceeded the hard upper limit.
AllotedMsgBytesBurst uint64
}
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
@ -168,6 +194,12 @@ type SyncManager struct {
// queries.
gossipFilterSema chan struct{}
// rateLimiter dictates the frequency with which we will reply to gossip
// queries from a peer. This is used to delay responses to peers to
// prevent DOS vulnerabilities if they are spamming with an unreasonable
// number of queries.
rateLimiter *rate.Limiter
wg sync.WaitGroup
quit chan struct{}
}
@ -180,8 +212,25 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
filterSema <- struct{}{}
}
bytesPerSecond := cfg.AllotedMsgBytesPerSecond
if bytesPerSecond == 0 {
bytesPerSecond = DefaultMsgBytesPerSecond
}
bytesBurst := cfg.AllotedMsgBytesBurst
if bytesBurst == 0 {
bytesBurst = DefaultMsgBytesBurst
}
// We'll use this rate limiter to limit our total outbound bandwidth for
// gossip queries peers.
rateLimiter := rate.NewLimiter(
rate.Limit(bytesPerSecond), int(bytesBurst),
)
return &SyncManager{
cfg: *cfg,
rateLimiter: rateLimiter,
newSyncers: make(chan *newSyncer),
staleSyncers: make(chan *staleSyncer),
activeSyncers: make(
@ -494,6 +543,95 @@ func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
return isPinnedSyncer
}
// deriveRateLimitReservation will take the current message and derive a
// reservation that can be used to wait on the rate limiter.
func (m *SyncManager) deriveRateLimitReservation(msg lnwire.Message,
) (*rate.Reservation, error) {
var (
msgSize uint32
err error
)
// Figure out the serialized size of the message. If we can't easily
// compute it, then we'll used the assumed msg size.
if sMsg, ok := msg.(lnwire.SizeableMessage); ok {
msgSize, err = sMsg.SerializedSize()
if err != nil {
return nil, err
}
} else {
log.Warnf("Unable to compute serialized size of %T", msg)
msgSize = assumedMsgSize
}
return m.rateLimiter.ReserveN(time.Now(), int(msgSize)), nil
}
// waitMsgDelay takes a delay, and waits until it has finished.
func (m *SyncManager) waitMsgDelay(ctx context.Context, peerPub [33]byte,
limitReservation *rate.Reservation) error {
// If we've already replied a handful of times, we will start to delay
// responses back to the remote peer. This can help prevent DOS attacks
// where the remote peer spams us endlessly.
//
// We skip checking for reservation.OK() here, as during config
// validation, we ensure that the burst is enough for a single message
// to be sent.
delay := limitReservation.Delay()
if delay > 0 {
log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
"responding in %s", peerPub, delay)
select {
case <-time.After(delay):
case <-ctx.Done():
limitReservation.Cancel()
return ErrGossipSyncerExiting
case <-m.quit:
limitReservation.Cancel()
return ErrGossipSyncerExiting
}
}
return nil
}
// maybeRateLimitMsg takes a message, and may wait a period of time to rate
// limit the msg.
func (m *SyncManager) maybeRateLimitMsg(ctx context.Context, peerPub [33]byte,
msg lnwire.Message) error {
delay, err := m.deriveRateLimitReservation(msg)
if err != nil {
return nil
}
return m.waitMsgDelay(ctx, peerPub, delay)
}
// sendMessages sends a set of messages to the remote peer.
func (m *SyncManager) sendMessages(ctx context.Context, sync bool,
peer lnpeer.Peer, nodeID route.Vertex, msgs ...lnwire.Message) error {
for _, msg := range msgs {
if err := m.maybeRateLimitMsg(ctx, nodeID, msg); err != nil {
return err
}
if err := peer.SendMessageLazy(sync, msg); err != nil {
return err
}
}
return nil
}
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
nodeID := route.Vertex(peer.PubKey())
@ -507,20 +645,22 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
encodingType: encoding,
chunkSize: encodingTypeToChunkSize[encoding],
batchSize: requestBatchSize,
sendToPeer: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(false, msgs...)
sendToPeer: func(ctx context.Context,
msgs ...lnwire.Message) error {
return m.sendMessages(ctx, false, peer, nodeID, msgs...)
},
sendToPeerSync: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(true, msgs...)
sendToPeerSync: func(ctx context.Context,
msgs ...lnwire.Message) error {
return m.sendMessages(ctx, true, peer, nodeID, msgs...)
},
ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters,
maxUndelayedQueryReplies: DefaultMaxUndelayedQueryReplies,
delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval,
bestHeight: m.cfg.BestHeight,
markGraphSynced: m.markGraphSynced,
maxQueryChanRangeReplies: maxQueryChanRangeReplies,
noTimestampQueryOption: m.cfg.NoTimestampQueries,
isStillZombieChannel: m.cfg.IsStillZombieChannel,
ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters,
bestHeight: m.cfg.BestHeight,
markGraphSynced: m.markGraphSynced,
maxQueryChanRangeReplies: maxQueryChanRangeReplies,
noTimestampQueryOption: m.cfg.NoTimestampQueries,
isStillZombieChannel: m.cfg.IsStillZombieChannel,
}, m.gossipFilterSema)
// Gossip syncers are initialized by default in a PassiveSync type

View File

@ -1,7 +1,9 @@
package discovery
import (
"bytes"
"fmt"
"io"
"reflect"
"sync"
"sync/atomic"
@ -15,6 +17,7 @@ import (
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
// randPeer creates a random peer.
@ -685,3 +688,167 @@ func assertActiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer)
})
assertSyncerStatus(t, s, chansSynced, PassiveSync)
}
// TestSizeableMessage is a test implementation of lnwire.SizeableMessage.
type TestSizeableMessage struct {
size uint32
sizeErr error
}
// Decode implements the lnwire.Message interface.
func (m *TestSizeableMessage) Decode(r io.Reader, pver uint32) error {
return nil
}
// Encode implements the lnwire.Message interface.
func (m *TestSizeableMessage) Encode(w *bytes.Buffer, pver uint32) error {
return nil
}
// MsgType implements the lnwire.Message interface.
func (m *TestSizeableMessage) MsgType() lnwire.MessageType {
// Use a custom message type for testing purposes.
return lnwire.MessageType(9999)
}
// SerializedSize implements the lnwire.SizeableMessage interface.
func (m *TestSizeableMessage) SerializedSize() (uint32, error) {
if m.sizeErr != nil {
return 0, m.sizeErr
}
return m.size, nil
}
// TestDeriveRateLimitReservation tests that the
// SyncManager.deriveRateLimitReservation method correctly computes delays based
// on message size.
func TestDeriveRateLimitReservation(t *testing.T) {
// Define standard test parameters for rate limiting.
bytesPerSec := uint64(1000) // 1000 bytes/second
bytesBurst := uint64(100) // 100 bytes burst capacity
// Helper to create a fresh SyncManager with a clean rate limiter.
newSyncManager := func(perSec, burst uint64) *SyncManager {
limiter := rate.NewLimiter(rate.Limit(perSec), int(burst))
return &SyncManager{
rateLimiter: limiter,
}
}
// Test sequential messages causing increasing delays - this is the core
// property of the rate limiting mechanism.
t.Run("sequential messages have increasing delays", func(t *testing.T) {
sm := newSyncManager(bytesPerSec, bytesBurst)
// Send a series of messages that exactly consume the burst
// limit to make the behavior predictable.
msg := &TestSizeableMessage{
size: uint32(bytesBurst),
}
// First message should have no delay as it fits within burst.
delay1, err := sm.deriveRateLimitReservation(msg)
require.NoError(t, err)
require.Equal(
t, time.Duration(0), delay1.Delay(), "first message "+
"should have no delay",
)
// Second message should have a non-zero delay as the token
// bucket is now depleted.
delay2, err := sm.deriveRateLimitReservation(msg)
require.NoError(t, err)
require.True(
t, delay2.Delay() > 0, "second message should have "+
"non-zero delay, got: %s", delay2,
)
// Third message should have an even longer delay since the
// token bucket is still refilling at a constant rate.
delay3, err := sm.deriveRateLimitReservation(msg)
require.NoError(t, err)
require.True(t, delay3.Delay() > delay2.Delay(), "third "+
"message should have longer delay than second: %s > %s",
delay3, delay2)
// The expected theoretical delay when sending messages at
// exactly the burst size should be approximately 100ms with our
// parameters.
expectedDelay := time.Duration(
float64(msg.size) / float64(bytesPerSec) * float64(time.Second), //nolint:ll
)
// Check that delays are increasing in a coherent manner.
t.Logf("Delay sequence: %s → %s → %s (expected "+
"theoretical: %s)", delay1.Delay(), delay2.Delay(),
delay3.Delay(), expectedDelay)
})
// Test handling of errors from SerializedSize.
t.Run("propagates serialization errors", func(t *testing.T) {
sm := newSyncManager(bytesPerSec, bytesBurst)
// Create a test message that returns an error from
// SerializedSize.
errMsg := fmt.Errorf("failed to serialize message")
msg := &TestSizeableMessage{
sizeErr: errMsg,
}
// The error should propagate through
// deriveRateLimitReservation.
_, err := sm.deriveRateLimitReservation(msg)
require.Error(t, err)
require.Equal(
t, errMsg, err, "Error should be propagated unchanged",
)
})
// Test that message size affects delay.
t.Run("larger messages have longer delays", func(t *testing.T) {
// Create a rate limiter with a known burst size.
sm := newSyncManager(bytesPerSec, bytesBurst)
// First, empty the token bucket with a message exactly at the
// burst size.
initialMsg := &TestSizeableMessage{
size: uint32(bytesBurst),
}
_, err := sm.deriveRateLimitReservation(initialMsg)
require.NoError(t, err)
// Now send two messages of different sizes and compare their
// delays.
smallMsg := &TestSizeableMessage{
size: 50,
}
largeMsg := &TestSizeableMessage{
size: 200,
}
// Send the small message first.
smallDelay, err := sm.deriveRateLimitReservation(smallMsg)
require.NoError(t, err)
// Reset the limiter to the same state, then empty the bucket.
sm.rateLimiter = rate.NewLimiter(
rate.Limit(bytesPerSec), int(bytesBurst),
)
_, err = sm.deriveRateLimitReservation(initialMsg)
require.NoError(t, err)
// Now send the large message.
largeDelay, err := sm.deriveRateLimitReservation(largeMsg)
require.NoError(t, err)
// The large message should have a longer delay than the small
// one.
require.True(
t, largeDelay.Delay() > smallDelay.Delay(),
"large message (size %d) should have longer delay "+
"(%s) than small message (size %d, delay %s)",
largeMsg.size, largeDelay.Delay(), smallMsg.size,
smallDelay.Delay())
})
}

View File

@ -1,6 +1,7 @@
package discovery
import (
"context"
"errors"
"fmt"
"math"
@ -11,11 +12,11 @@ import (
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/graph"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/time/rate"
)
// SyncerType encapsulates the different types of syncing mechanisms for a
@ -152,15 +153,6 @@ func (s syncerState) String() string {
}
const (
// DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
// will respond to immediately before starting to delay responses.
DefaultMaxUndelayedQueryReplies = 10
// DefaultDelayedQueryReplyInterval is the length of time we will wait
// before responding to gossip queries after replying to
// maxUndelayedQueryReplies queries.
DefaultDelayedQueryReplyInterval = 5 * time.Second
// maxQueryChanRangeReplies specifies the default limit of replies to
// process for a single QueryChannelRange request.
maxQueryChanRangeReplies = 500
@ -250,21 +242,12 @@ type gossipSyncerCfg struct {
// sendToPeer sends a variadic number of messages to the remote peer.
// This method should not block while waiting for sends to be written
// to the wire.
sendToPeer func(...lnwire.Message) error
sendToPeer func(context.Context, ...lnwire.Message) error
// sendToPeerSync sends a variadic number of messages to the remote
// peer, blocking until all messages have been sent successfully or a
// write error is encountered.
sendToPeerSync func(...lnwire.Message) error
// maxUndelayedQueryReplies specifies how many gossip queries we will
// respond to immediately before starting to delay responses.
maxUndelayedQueryReplies int
// delayedQueryReplyInterval is the length of time we will wait before
// responding to gossip queries after replying to
// maxUndelayedQueryReplies queries.
delayedQueryReplyInterval time.Duration
sendToPeerSync func(context.Context, ...lnwire.Message) error
// noSyncChannels will prevent the GossipSyncer from spawning a
// channelGraphSyncer, meaning we will not try to reconcile unknown
@ -390,12 +373,6 @@ type GossipSyncer struct {
cfg gossipSyncerCfg
// rateLimiter dictates the frequency with which we will reply to gossip
// queries from a peer. This is used to delay responses to peers to
// prevent DOS vulnerabilities if they are spamming with an unreasonable
// number of queries.
rateLimiter *rate.Limiter
// syncedSignal is a channel that, if set, will be closed when the
// GossipSyncer reaches its terminal chansSynced state.
syncedSignal chan struct{}
@ -406,43 +383,23 @@ type GossipSyncer struct {
sync.Mutex
quit chan struct{}
wg sync.WaitGroup
// cg is a helper that encapsulates a wait group and quit channel and
// allows contexts that either block or cancel on those depending on
// the use case.
cg *fn.ContextGuard
}
// newGossipSyncer returns a new instance of the GossipSyncer populated using
// the passed config.
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
// If no parameter was specified for max undelayed query replies, set it
// to the default of 5 queries.
if cfg.maxUndelayedQueryReplies <= 0 {
cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
}
// If no parameter was specified for delayed query reply interval, set
// to the default of 5 seconds.
if cfg.delayedQueryReplyInterval <= 0 {
cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
}
// Construct a rate limiter that will govern how frequently we reply to
// gossip queries from this peer. The limiter will automatically adjust
// during periods of quiescence, and increase the reply interval under
// load.
interval := rate.Every(cfg.delayedQueryReplyInterval)
rateLimiter := rate.NewLimiter(
interval, cfg.maxUndelayedQueryReplies,
)
return &GossipSyncer{
cfg: cfg,
rateLimiter: rateLimiter,
syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan *historicalSyncReq),
gossipMsgs: make(chan lnwire.Message, syncerBufferSize),
queryMsgs: make(chan lnwire.Message, syncerBufferSize),
syncerSema: sema,
quit: make(chan struct{}),
cg: fn.NewContextGuard(),
}
}
@ -456,11 +413,11 @@ func (g *GossipSyncer) Start() {
// supports gossip queries, and only spawn replyHandler if we
// advertise support
if !g.cfg.noSyncChannels {
g.wg.Add(1)
g.cg.WgAdd(1)
go g.channelGraphSyncer()
}
if !g.cfg.noReplyQueries {
g.wg.Add(1)
g.cg.WgAdd(1)
go g.replyHandler()
}
})
@ -473,8 +430,7 @@ func (g *GossipSyncer) Stop() {
log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
close(g.quit)
g.wg.Wait()
g.cg.Quit()
})
}
@ -500,7 +456,8 @@ func (g *GossipSyncer) handleSyncingChans() {
// Send the msg to the remote peer, which is non-blocking as
// `sendToPeer` only queues the msg in Brontide.
err = g.cfg.sendToPeer(queryRangeMsg)
ctx, _ := g.cg.Create(context.Background())
err = g.cfg.sendToPeer(ctx, queryRangeMsg)
if err != nil {
log.Errorf("Unable to send chan range query: %v", err)
return
@ -515,7 +472,7 @@ func (g *GossipSyncer) handleSyncingChans() {
// properly channel graph state with the remote peer, and also that we only
// send them messages which actually pass their defined update horizon.
func (g *GossipSyncer) channelGraphSyncer() {
defer g.wg.Done()
defer g.cg.WgDone()
for {
state := g.syncState()
@ -563,7 +520,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
log.Warnf("Unexpected message: %T in state=%v",
msg, state)
case <-g.quit:
case <-g.cg.Done():
return
}
@ -613,7 +570,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
log.Warnf("Unexpected message: %T in state=%v",
msg, state)
case <-g.quit:
case <-g.cg.Done():
return
}
@ -659,7 +616,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
case req := <-g.historicalSyncReqs:
g.handleHistoricalSync(req)
case <-g.quit:
case <-g.cg.Done():
return
}
}
@ -674,7 +631,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
//
// NOTE: This method MUST be run as a goroutine.
func (g *GossipSyncer) replyHandler() {
defer g.wg.Done()
defer g.cg.WgDone()
for {
select {
@ -692,7 +649,7 @@ func (g *GossipSyncer) replyHandler() {
"query: %v", err)
}
case <-g.quit:
case <-g.cg.Done():
return
}
}
@ -716,7 +673,8 @@ func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
TimestampRange: timestampRange,
}
if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
ctx, _ := g.cg.Create(context.Background())
if err := g.cfg.sendToPeer(ctx, localUpdateHorizon); err != nil {
return err
}
@ -772,7 +730,8 @@ func (g *GossipSyncer) synchronizeChanIDs() bool {
// With our chunk obtained, we'll send over our next query, then return
// false indicating that we're net yet fully synced.
err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
ctx, _ := g.cg.Create(context.Background())
err := g.cfg.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
ChainHash: g.cfg.chainHash,
EncodingType: lnwire.EncodingSortedPlain,
ShortChanIDs: queryChunk,
@ -1047,23 +1006,6 @@ func (g *GossipSyncer) genChanRangeQuery(
// replyPeerQueries is called in response to any query by the remote peer.
// We'll examine our state and send back our best response.
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
reservation := g.rateLimiter.Reserve()
delay := reservation.Delay()
// If we've already replied a handful of times, we will start to delay
// responses back to the remote peer. This can help prevent DOS attacks
// where the remote peer spams us endlessly.
if delay > 0 {
log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
"responding in %s", g.cfg.peerPub[:], delay)
select {
case <-time.After(delay):
case <-g.quit:
return ErrGossipSyncerExiting
}
}
switch msg := msg.(type) {
// In this state, we'll also handle any incoming channel range queries
@ -1096,7 +1038,9 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
"chain=%v, we're on chain=%v", query.ChainHash,
g.cfg.chainHash)
return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
ctx, _ := g.cg.Create(context.Background())
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
ChainHash: query.ChainHash,
FirstBlockHeight: query.FirstBlockHeight,
NumBlocks: query.NumBlocks,
@ -1169,7 +1113,9 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro
)
}
return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
ctx, _ := g.cg.Create(context.Background())
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
ChainHash: query.ChainHash,
NumBlocks: numBlocks,
FirstBlockHeight: firstHeight,
@ -1273,7 +1219,9 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
"chain=%v, we're on chain=%v", query.ChainHash,
g.cfg.chainHash)
return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
ctx, _ := g.cg.Create(context.Background())
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 0,
})
@ -1304,7 +1252,8 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
// each one individually and synchronously to throttle the sends and
// perform buffering of responses in the syncer as opposed to the peer.
for _, msg := range replyMsgs {
err := g.cfg.sendToPeerSync(msg)
ctx, _ := g.cg.Create(context.Background())
err := g.cfg.sendToPeerSync(ctx, msg)
if err != nil {
return err
}
@ -1312,7 +1261,9 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
// Regardless of whether we had any messages to reply with, send over
// the sentinel message to signal that the stream has terminated.
return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
ctx, _ := g.cg.Create(context.Background())
return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
ChainHash: query.ChainHash,
Complete: 1,
})
@ -1341,7 +1292,7 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
select {
case <-g.syncerSema:
case <-g.quit:
case <-g.cg.Done():
return ErrGossipSyncerExiting
}
@ -1372,13 +1323,14 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
}
// We'll conclude by launching a goroutine to send out any updates.
g.wg.Add(1)
g.cg.WgAdd(1)
go func() {
defer g.wg.Done()
defer g.cg.WgDone()
defer returnSema()
for _, msg := range newUpdatestoSend {
err := g.cfg.sendToPeerSync(msg)
ctx, _ := g.cg.Create(context.Background())
err := g.cfg.sendToPeerSync(ctx, msg)
switch {
case err == ErrGossipSyncerExiting:
return
@ -1411,7 +1363,7 @@ func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
// If we've been signaled to exit, or are exiting, then we'll stop
// short.
select {
case <-g.quit:
case <-g.cg.Done():
return
default:
}
@ -1520,7 +1472,11 @@ func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
return
}
g.cfg.sendToPeer(msgsToSend...)
ctx, _ := g.cg.Create(context.Background())
if err = g.cfg.sendToPeer(ctx, msgsToSend...); err != nil {
log.Errorf("unable to send gossip msgs: %v", err)
}
}
// ProcessQueryMsg is used by outside callers to pass new channel time series
@ -1553,7 +1509,7 @@ func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc
select {
case msgChan <- msg:
case <-peerQuit:
case <-g.quit:
case <-g.cg.Done():
}
return nil
@ -1601,14 +1557,14 @@ func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
}:
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
case <-g.cg.Done():
return ErrGossipSyncerExiting
}
select {
case err := <-errChan:
return err
case <-g.quit:
case <-g.cg.Done():
return ErrGossipSyncerExiting
}
}
@ -1687,14 +1643,14 @@ func (g *GossipSyncer) historicalSync() error {
}:
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
case <-g.cg.Done():
return ErrGossiperShuttingDown
}
select {
case <-done:
return nil
case <-g.quit:
case <-g.cg.Done():
return ErrGossiperShuttingDown
}
}

View File

@ -1,6 +1,7 @@
package discovery
import (
"context"
"errors"
"fmt"
"math"
@ -196,15 +197,18 @@ func newTestSyncer(hID lnwire.ShortChannelID,
noSyncChannels: !syncChannels,
noReplyQueries: !replyQueries,
noTimestampQueryOption: !timestamps,
sendToPeer: func(msgs ...lnwire.Message) error {
sendToPeer: func(_ context.Context,
msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
},
sendToPeerSync: func(msgs ...lnwire.Message) error {
sendToPeerSync: func(_ context.Context,
msgs ...lnwire.Message) error {
msgChan <- msgs
return nil
},
delayedQueryReplyInterval: 2 * time.Second,
bestHeight: func() uint32 {
return latestKnownHeight
},
@ -1557,161 +1561,6 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
}
}
// TestGossipSyncerDelayDOS tests that the gossip syncer will begin delaying
// queries after its prescribed allotment of undelayed query responses. Once
// this happens, all query replies should be delayed by the configurated
// interval.
func TestGossipSyncerDelayDOS(t *testing.T) {
t.Parallel()
// We'll modify the chunk size to be a smaller value, since we'll be
// sending a modest number of queries. After exhausting our undelayed
// gossip queries, we'll send two extra queries and ensure that they are
// delayed properly.
const chunkSize = 2
const numDelayedQueries = 2
const delayTolerance = time.Millisecond * 200
// First, we'll create two GossipSyncer instances with a canned
// sendToPeer message to allow us to intercept their potential sends.
highestID := lnwire.ShortChannelID{
BlockHeight: 1144,
}
msgChan1, syncer1, chanSeries1 := newTestSyncer(
highestID, defaultEncoding, chunkSize, true, false,
)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
highestID, defaultEncoding, chunkSize, false, true,
)
syncer2.Start()
defer syncer2.Stop()
// Record the delayed query reply interval used by each syncer.
delayedQueryInterval := syncer1.cfg.delayedQueryReplyInterval
// Record the number of undelayed queries allowed by the syncers.
numUndelayedQueries := syncer1.cfg.maxUndelayedQueryReplies
// We will send enough queries to exhaust the undelayed responses, and
// then send two more queries which should be delayed. An additional one
// is subtracted from the total since undelayed message will be consumed
// by the initial QueryChannelRange.
numQueryResponses := numUndelayedQueries + numDelayedQueries - 1
// The total number of responses must include the initial reply each
// syncer will make to QueryChannelRange.
numTotalQueries := 1 + numQueryResponses
// The total number of channels each syncer needs to request must be
// scaled by the chunk size being used.
numTotalChans := numQueryResponses * chunkSize
// Construct enough channels so that all of the queries will have enough
// channels. Since syncer1 won't know of any channels, their sets are
// inherently disjoint.
var syncer2Chans []lnwire.ShortChannelID
for i := 0; i < numTotalChans; i++ {
syncer2Chans = append([]lnwire.ShortChannelID{
{
BlockHeight: highestID.BlockHeight - uint32(i) - 1,
TxIndex: uint32(i),
},
}, syncer2Chans...)
}
// We'll kick off the test by asserting syncer1 sends over the
// QueryChannelRange message the other node.
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer1")
case msgs := <-msgChan1:
for _, msg := range msgs {
// The message MUST be a QueryChannelRange message.
_, ok := msg.(*lnwire.QueryChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer2.queryMsgs <- msg:
}
}
}
// At this point, we'll need to a response from syncer2's channel
// series. This will cause syncer1 to simply request the entire set of
// channels from syncer2. This will count as the first undelayed
// response for sycner2.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries2.filterRangeReqs:
// We'll send back all the channels that it should know of.
chanSeries2.filterRangeResp <- syncer2Chans
}
// At this point, we'll assert that the ReplyChannelRange message is
// sent by sycner2.
for i := 0; i < numQueryResponses; i++ {
select {
case <-time.After(time.Second * 2):
t.Fatalf("didn't get msg from syncer2")
case msgs := <-msgChan2:
for _, msg := range msgs {
// The message MUST be a ReplyChannelRange message.
_, ok := msg.(*lnwire.ReplyChannelRange)
if !ok {
t.Fatalf("wrong message: expected "+
"QueryChannelRange for %T", msg)
}
select {
case <-time.After(time.Second * 2):
t.Fatalf("node 2 didn't read msg")
case syncer1.gossipMsgs <- msg:
}
}
}
}
// We'll now have syncer1 process the received sids from syncer2.
select {
case <-time.After(time.Second * 2):
t.Fatalf("no query recvd")
case <-chanSeries1.filterReq:
chanSeries1.filterResp <- syncer2Chans
}
// At this point, syncer1 should start to send out initial requests to
// query the chan IDs of the remote party. We'll keep track of the
// number of queries made using the iterated value, which starts at one
// due the initial contribution of the QueryChannelRange msgs.
for i := 1; i < numTotalQueries; i++ {
expDelayResponse := i >= numUndelayedQueries
queryBatch(t,
msgChan1, msgChan2,
syncer1, syncer2,
chanSeries2,
expDelayResponse,
delayedQueryInterval,
delayTolerance,
)
}
}
// queryBatch is a helper method that will query for a single batch of channels
// from a peer and assert the responses. The method can also be used to assert
// the same transition happens, but is delayed by the remote peer's DOS

View File

@ -344,6 +344,13 @@ The underlying functionality between those two options remain the same.
## Breaking Changes
## Performance Improvements
* Users can now [limit the total amount of
bandwidth](https://github.com/lightningnetwork/lnd/pull/9607) that will be allocated to
outbound gossip traffic via two new args: `--gossip.msg-rate-bytes` and
`--gossip.msg-rate-burst`. The burst value should be set to the largest amount
of bytes that can be transmitted in a go without rate limiting, and the rate to
the on going rate we'll permit.
* Log rotation can now use ZSTD
* [Remove redundant

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
@ -32,6 +33,10 @@ type Gossip struct {
SubBatchDelay time.Duration `long:"sub-batch-delay" description:"The duration to wait before sending the next announcement batch if there are multiple. Use a small value if there are a lot announcements and they need to be broadcast quickly."`
AnnouncementConf uint32 `long:"announcement-conf" description:"The number of confirmations required before processing channel announcements."`
MsgRateBytes uint64 `long:"msg-rate-bytes" description:"The maximum number of bytes of gossip messages that will be sent per second. This is a global limit that applies to all peers."`
MsgBurstBytes uint64 `long:"msg-burst-bytes" description:"The maximum number of bytes of gossip messages that will be sent in a burst. This is a global limit that applies to all peers. This value should be set to something greater than 130 KB"`
}
// Parse the pubkeys for the pinned syncers.
@ -58,6 +63,11 @@ func (g *Gossip) Validate() error {
"%v", g.AnnouncementConf, minAnnouncementConf)
}
if g.MsgBurstBytes < lnwire.MaxSliceLength {
return fmt.Errorf("msg-burst-bytes=%v must be at least %v",
g.MsgBurstBytes, lnwire.MaxSliceLength)
}
return nil
}

View File

@ -1746,6 +1746,17 @@
; The number of confirmations required before processing channel announcements.
; gossip.announcement-conf=6
; The allotted bandwidth rate expressed in bytes/second that will be allocated
; towards outbound gossip messages. Realized rates above this value will be
; throttled. This value is shared across all peers.
; gossip.msg-rate-bytes=102400
; The amount of bytes of gossip messages that can be sent at a given time. This
; is used as the amount of tokens in the token bucket algorithm. This value
; MUST be set to something about 65 KB, otherwise a single max sized message
; can never be sent.
; gossip.msg-burst-bytes=204800
[invoices]
; If a hold invoice has accepted htlcs that reach their expiry height and are

View File

@ -1188,6 +1188,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
IsStillZombieChannel: s.graphBuilder.IsZombieChannel,
ScidCloser: scidCloserMan,
AssumeChannelValid: cfg.Routing.AssumeChannelValid,
MsgRateBytes: cfg.Gossip.MsgRateBytes,
MsgBurstBytes: cfg.Gossip.MsgBurstBytes,
}, nodeKeyDesc)
accessCfg := &accessManConfig{