From 4c737d3f02c4508d6971b9a3fe314d311da891cd Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 18 Sep 2025 14:27:56 -0700 Subject: [PATCH] brontide+peer: use internal sync/pool to reduce allocations This ensures that under medium to high load, we eliminate all allocations once we arrive a steady state, re working memory. --- brontide/conn.go | 7 ++++ brontide/noise.go | 84 +++++++++++++++++++++++++++++++++++++++++++---- peer/brontide.go | 8 +++++ 3 files changed, 93 insertions(+), 6 deletions(-) diff --git a/brontide/conn.go b/brontide/conn.go index e83c8a5a8..bf30e2659 100644 --- a/brontide/conn.go +++ b/brontide/conn.go @@ -287,3 +287,10 @@ func (c *Conn) RemotePub() *btcec.PublicKey { func (c *Conn) LocalPub() *btcec.PublicKey { return c.noise.localStatic.PubKey() } + +// ClearPendingSend drops references to the next header and body buffers and +// returns any pooled buffers back to their respective pools so that the memory +// can be reused. +func (c *Conn) ClearPendingSend() { + c.noise.releaseBuffers() +} diff --git a/brontide/noise.go b/brontide/noise.go index bdd922841..a1b7cd4dd 100644 --- a/brontide/noise.go +++ b/brontide/noise.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "math" + "sync" "time" "github.com/btcsuite/btcd/btcec/v2" @@ -69,9 +70,25 @@ var ( ephemeralGen = func() (*btcec.PrivateKey, error) { return btcec.NewPrivateKey() } -) -// TODO(roasbeef): free buffer pool? + // headerBufferPool is a pool for encrypted header buffers. + headerBufferPool = &sync.Pool{ + New: func() interface{} { + b := make([]byte, 0, encHeaderSize) + return &b + }, + } + + // bodyBufferPool is a pool for encrypted message body buffers. + bodyBufferPool = &sync.Pool{ + New: func() interface{} { + // Allocate max size to avoid reallocation. + // maxMessageSize already includes the MAC. + b := make([]byte, 0, maxMessageSize) + return &b + }, + } +) // ecdh performs an ECDH operation between pub and priv. The returned value is // the sha256 of the compressed shared point. @@ -397,6 +414,14 @@ type Machine struct { // out for a pending message. This allows us to tolerate timeout errors // that cause partial writes. nextBodySend []byte + + // pooledHeaderBuf is the pooled buffer used for the header, which we + // need to track so we can return it to the pool when done. + pooledHeaderBuf *[]byte + + // pooledBodyBuf is the pooled buffer used for the body, which we need + // to track so we can return it to the pool when done. + pooledBodyBuf *[]byte } // NewBrontideMachine creates a new instance of the brontide state-machine. If @@ -756,11 +781,33 @@ func (b *Machine) WriteMessage(p []byte) error { binary.BigEndian.PutUint16(b.pktLenBuffer[:], fullLength) - // First, generate the encrypted+MAC'd length prefix for the packet. - b.nextHeaderSend = b.sendCipher.Encrypt(nil, nil, b.pktLenBuffer[:]) + headerBufInterface := headerBufferPool.Get() + headerBuf, ok := headerBufInterface.(*[]byte) + if !ok { + b.releaseBuffers() + return fmt.Errorf("headerBufferPool returned unexpected "+ + "type: %T", headerBufInterface) + } + b.pooledHeaderBuf = headerBuf - // Finally, generate the encrypted packet itself. - b.nextBodySend = b.sendCipher.Encrypt(nil, nil, p) + bodyBufInterface := bodyBufferPool.Get() + bodyBuf, ok := bodyBufInterface.(*[]byte) + if !ok { + b.releaseBuffers() + return fmt.Errorf("bodyBufferPool returned unexpected "+ + "type: %T", bodyBufInterface) + } + b.pooledBodyBuf = bodyBuf + + // First, generate the encrypted+MAC'd length prefix for the packet. We + // pass our pooled buffer as the cipherText (dst) parameter. + b.nextHeaderSend = b.sendCipher.Encrypt( + nil, *b.pooledHeaderBuf, b.pktLenBuffer[:], + ) + + // Finally, generate the encrypted packet itself. We pass our pooled + // buffer as the cipherText (dst) parameter. + b.nextBodySend = b.sendCipher.Encrypt(nil, *b.pooledBodyBuf, p) return nil } @@ -837,9 +884,34 @@ func (b *Machine) Flush(w io.Writer) (int, error) { } } + // If both header and body have been fully flushed, release the pooled + // buffers back to their pools. + if len(b.nextHeaderSend) == 0 && len(b.nextBodySend) == 0 { + b.releaseBuffers() + } + return nn, nil } +// releaseBuffers returns the pooled buffers back to their respective pools +// and clears the references. +func (b *Machine) releaseBuffers() { + if b.pooledHeaderBuf != nil { + *b.pooledHeaderBuf = (*b.pooledHeaderBuf)[:0] + headerBufferPool.Put(b.pooledHeaderBuf) + b.pooledHeaderBuf = nil + } + + if b.pooledBodyBuf != nil { + *b.pooledBodyBuf = (*b.pooledBodyBuf)[:0] + bodyBufferPool.Put(b.pooledBodyBuf) + b.pooledBodyBuf = nil + } + + b.nextHeaderSend = nil + b.nextBodySend = nil +} + // ReadMessage attempts to read the next message from the passed io.Reader. In // the case of an authentication error, a non-nil error is returned. func (b *Machine) ReadMessage(r io.Reader) ([]byte, error) { diff --git a/peer/brontide.go b/peer/brontide.go index 57e340fb0..1c9073f19 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -19,6 +19,7 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/buffer" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -2705,6 +2706,13 @@ out: goto retry } + // Message has either been successfully sent or an + // unrecoverable error occurred. Either way, we can + // free the memory used to store the message. + if bConn, ok := p.cfg.Conn.(*brontide.Conn); ok { + bConn.ClearPendingSend() + } + // The write succeeded, reset the idle timer to prevent // us from disconnecting the peer. if !idleTimer.Stop() {