brontide+peer: use internal sync/pool to reduce allocations

This ensures that under medium to high load, we eliminate all
allocations once we arrive a steady state, re working memory.
This commit is contained in:
Olaoluwa Osuntokun
2025-09-18 14:27:56 -07:00
parent f968206849
commit 4c737d3f02
3 changed files with 93 additions and 6 deletions

View File

@@ -287,3 +287,10 @@ func (c *Conn) RemotePub() *btcec.PublicKey {
func (c *Conn) LocalPub() *btcec.PublicKey {
return c.noise.localStatic.PubKey()
}
// ClearPendingSend drops references to the next header and body buffers and
// returns any pooled buffers back to their respective pools so that the memory
// can be reused.
func (c *Conn) ClearPendingSend() {
c.noise.releaseBuffers()
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"math"
"sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
@@ -69,9 +70,25 @@ var (
ephemeralGen = func() (*btcec.PrivateKey, error) {
return btcec.NewPrivateKey()
}
)
// TODO(roasbeef): free buffer pool?
// headerBufferPool is a pool for encrypted header buffers.
headerBufferPool = &sync.Pool{
New: func() interface{} {
b := make([]byte, 0, encHeaderSize)
return &b
},
}
// bodyBufferPool is a pool for encrypted message body buffers.
bodyBufferPool = &sync.Pool{
New: func() interface{} {
// Allocate max size to avoid reallocation.
// maxMessageSize already includes the MAC.
b := make([]byte, 0, maxMessageSize)
return &b
},
}
)
// ecdh performs an ECDH operation between pub and priv. The returned value is
// the sha256 of the compressed shared point.
@@ -397,6 +414,14 @@ type Machine struct {
// out for a pending message. This allows us to tolerate timeout errors
// that cause partial writes.
nextBodySend []byte
// pooledHeaderBuf is the pooled buffer used for the header, which we
// need to track so we can return it to the pool when done.
pooledHeaderBuf *[]byte
// pooledBodyBuf is the pooled buffer used for the body, which we need
// to track so we can return it to the pool when done.
pooledBodyBuf *[]byte
}
// NewBrontideMachine creates a new instance of the brontide state-machine. If
@@ -756,11 +781,33 @@ func (b *Machine) WriteMessage(p []byte) error {
binary.BigEndian.PutUint16(b.pktLenBuffer[:], fullLength)
// First, generate the encrypted+MAC'd length prefix for the packet.
b.nextHeaderSend = b.sendCipher.Encrypt(nil, nil, b.pktLenBuffer[:])
headerBufInterface := headerBufferPool.Get()
headerBuf, ok := headerBufInterface.(*[]byte)
if !ok {
b.releaseBuffers()
return fmt.Errorf("headerBufferPool returned unexpected "+
"type: %T", headerBufInterface)
}
b.pooledHeaderBuf = headerBuf
// Finally, generate the encrypted packet itself.
b.nextBodySend = b.sendCipher.Encrypt(nil, nil, p)
bodyBufInterface := bodyBufferPool.Get()
bodyBuf, ok := bodyBufInterface.(*[]byte)
if !ok {
b.releaseBuffers()
return fmt.Errorf("bodyBufferPool returned unexpected "+
"type: %T", bodyBufInterface)
}
b.pooledBodyBuf = bodyBuf
// First, generate the encrypted+MAC'd length prefix for the packet. We
// pass our pooled buffer as the cipherText (dst) parameter.
b.nextHeaderSend = b.sendCipher.Encrypt(
nil, *b.pooledHeaderBuf, b.pktLenBuffer[:],
)
// Finally, generate the encrypted packet itself. We pass our pooled
// buffer as the cipherText (dst) parameter.
b.nextBodySend = b.sendCipher.Encrypt(nil, *b.pooledBodyBuf, p)
return nil
}
@@ -837,9 +884,34 @@ func (b *Machine) Flush(w io.Writer) (int, error) {
}
}
// If both header and body have been fully flushed, release the pooled
// buffers back to their pools.
if len(b.nextHeaderSend) == 0 && len(b.nextBodySend) == 0 {
b.releaseBuffers()
}
return nn, nil
}
// releaseBuffers returns the pooled buffers back to their respective pools
// and clears the references.
func (b *Machine) releaseBuffers() {
if b.pooledHeaderBuf != nil {
*b.pooledHeaderBuf = (*b.pooledHeaderBuf)[:0]
headerBufferPool.Put(b.pooledHeaderBuf)
b.pooledHeaderBuf = nil
}
if b.pooledBodyBuf != nil {
*b.pooledBodyBuf = (*b.pooledBodyBuf)[:0]
bodyBufferPool.Put(b.pooledBodyBuf)
b.pooledBodyBuf = nil
}
b.nextHeaderSend = nil
b.nextBodySend = nil
}
// ReadMessage attempts to read the next message from the passed io.Reader. In
// the case of an authentication error, a non-nil error is returned.
func (b *Machine) ReadMessage(r io.Reader) ([]byte, error) {

View File

@@ -19,6 +19,7 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/buffer"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
@@ -2705,6 +2706,13 @@ out:
goto retry
}
// Message has either been successfully sent or an
// unrecoverable error occurred. Either way, we can
// free the memory used to store the message.
if bConn, ok := p.cfg.Conn.(*brontide.Conn); ok {
bConn.ClearPendingSend()
}
// The write succeeded, reset the idle timer to prevent
// us from disconnecting the peer.
if !idleTimer.Stop() {