peer: add RED and protected types to msgStream queue

The msgStream's backpressure queue previously used a drop predicate that
always returned false, meaning messages were never dropped based on
queue length.

This commit introduces a new drop predicate mechanism for the msgStream
queue, controlled by build tags.

For non-integration builds, the predicate combines a type-based check
with Random Early Detection (RED):

- Certain critical message types (`lnwire.LinkUpdater`,
  `lnwire.AnnounceSignatures1`) are marked as protected and are never
  dropped.

- For other message types, RED is applied based on the queue length,
  using `redMinThreshold` and `redMaxThreshold` to determine the drop
  probability.

For integration builds, the predicate always returns false, preserving
the previous behavior to avoid interfering with tests.

This change allows the msgStream queue to proactively drop less critical
messages under high load, preventing unbounded queue growth while
ensuring essential messages are prioritized.
This commit is contained in:
Olaoluwa Osuntokun
2025-05-19 17:25:07 -07:00
parent 3b27b69989
commit f59fa7477d
3 changed files with 81 additions and 18 deletions

View File

@@ -1754,14 +1754,11 @@ func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize int,
quit: make(chan struct{}),
}
// Initialize the backpressure queue. The predicate always returns
// false, meaning we don't proactively drop messages based on queue
// length here.
alwaysFalsePredicate := func(int, lnwire.Message) bool {
return false
}
// Initialize the backpressure queue with a predicate determined by
// build tags.
dropPredicate := getMsgStreamDropPredicate()
stream.queue = queue.NewBackpressureQueue[lnwire.Message](
bufSize, alwaysFalsePredicate,
bufSize, dropPredicate,
)
return stream
@@ -1820,20 +1817,12 @@ func (ms *msgStream) msgConsumer() {
func (ms *msgStream) AddMsg(ctx context.Context, msg lnwire.Message) {
dropped, err := ms.queue.Enqueue(ctx, msg).Unpack()
if err != nil {
if !errors.Is(err, context.Canceled) &&
!errors.Is(err, context.DeadlineExceeded) {
ms.peer.log.Warnf("msgStream.AddMsg: failed to "+
"enqueue message: %v", err)
} else {
ms.peer.log.Tracef("msgStream.AddMsg: context "+
"canceled during enqueue: %v", err)
}
ms.peer.log.Warnf("unable to enqueue message: %v", err)
return
}
if dropped {
ms.peer.log.Debugf("msgStream.AddMsg: message %T "+
"dropped from queue", msg)
ms.peer.log.Debugf("message %T dropped by predicate", msg)
}
}

57
peer/drop_predicate.go Normal file
View File

@@ -0,0 +1,57 @@
//go:build !integration
package peer
import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
)
const (
// redMinThreshold is the minimum queue length before RED starts dropping
// messages.
redMinThreshold = 10
// redMaxThreshold is the queue length at or above which RED drops all
// messages (that are not protected by type).
redMaxThreshold = 40
)
// isProtectedMsgType checks if a message is of a type that should not be
// dropped by the predicate.
func isProtectedMsgType(msg lnwire.Message) bool {
switch msg.(type) {
// Never drop any messages that are heading to an active channel.
case lnwire.LinkUpdater:
return true
// Make sure to never drop an incoming announcement signatures
// message, as we need this to be able to advertise channels.
//
// TODO(roasbeef): don't drop any gossip if doing IGD?
case *lnwire.AnnounceSignatures1:
return true
default:
return false
}
}
// getMsgStreamDropPredicate returns the drop predicate for the msgStream's
// BackpressureQueue. For non-integration builds, this combines a type-based
// check for critical messages with Random Early Detection (RED).
func getMsgStreamDropPredicate() queue.DropPredicate[lnwire.Message] {
redPred := queue.RandomEarlyDrop[lnwire.Message](
redMinThreshold, redMaxThreshold,
)
// We'll never dropped protected messages, for the rest we'll use the
// RED predicate.
return func(queueLen int, item lnwire.Message) bool {
if isProtectedMsgType(item) {
return false
}
return redPred(queueLen, item)
}
}

View File

@@ -0,0 +1,17 @@
//go:build integration
package peer
import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
)
// getMsgStreamDropPredicate returns the drop predicate for the msgStream's
// BackpressureQueue. For integration builds, this predicate never drops
// messages.
func getMsgStreamDropPredicate() queue.DropPredicate[lnwire.Message] {
return func(queueLen int, item lnwire.Message) bool {
return false
}
}