Merge pull request #7446 from yyforyongyu/fix-future-msg

discovery: flatten future msg cache and increase its size
This commit is contained in:
Olaoluwa Osuntokun 2023-03-06 11:21:48 -08:00 committed by GitHub
commit 4b88020269
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 132 additions and 49 deletions

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec/v2"
@ -47,6 +48,10 @@ const (
// updates that we'll hold onto.
maxPrematureUpdates = 100
// maxFutureMessages tracks the max amount of future messages that
// we'll hold onto.
maxFutureMessages = 1000
// DefaultSubBatchDelay is the default delay we'll use when
// broadcasting the next announcement batch.
DefaultSubBatchDelay = 5 * time.Second
@ -424,8 +429,8 @@ type AuthenticatedGossiper struct {
// height specified in the future. We will save them and resend it to
// the chan networkMsgs once the block height has reached. The cached
// map format is,
// {blockHeight: [msg1, msg2, ...], ...}
futureMsgs *lru.Cache[uint32, *cachedNetworkMsg]
// {msgID1: msg1, msgID2: msg2, ...}
futureMsgs *futureMsgCache
// chanPolicyUpdates is a channel that requests to update the
// forwarding policy of a set of channels is sent over.
@ -478,13 +483,11 @@ type AuthenticatedGossiper struct {
// passed configuration parameters.
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
gossiper := &AuthenticatedGossiper{
selfKey: selfKeyDesc.PubKey,
selfKeyLoc: selfKeyDesc.KeyLocator,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
futureMsgs: lru.NewCache[uint32, *cachedNetworkMsg](
maxPrematureUpdates,
),
selfKey: selfKeyDesc.PubKey,
selfKeyLoc: selfKeyDesc.KeyLocator,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
futureMsgs: newFutureMsgCache(maxFutureMessages),
quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: lll
@ -632,33 +635,89 @@ func (d *AuthenticatedGossiper) syncBlockHeight() {
}
}
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
// the unique ID when saving the message.
type futureMsgCache struct {
*lru.Cache[uint64, *cachedFutureMsg]
// msgID is a monotonically increased integer.
msgID atomic.Uint64
}
// nextMsgID returns a unique message ID.
func (f *futureMsgCache) nextMsgID() uint64 {
return f.msgID.Add(1)
}
// newFutureMsgCache creates a new future message cache with the underlying lru
// cache being initialized with the specified capacity.
func newFutureMsgCache(capacity uint64) *futureMsgCache {
// Create a new cache.
cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
return &futureMsgCache{
Cache: cache,
}
}
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
type cachedFutureMsg struct {
// msg is the network message.
msg *networkMsg
// height is the block height.
height uint32
}
// Size returns the size of the message.
func (c *cachedFutureMsg) Size() (uint64, error) {
// Return a constant 1.
return 1, nil
}
// resendFutureMessages takes a block height, resends all the future messages
// found at that height and deletes those messages found in the gossiper's
// futureMsgs.
// found below and equal to that height and deletes those messages found in the
// gossiper's futureMsgs.
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
result, err := d.futureMsgs.Get(height)
var (
// msgs are the target messages.
msgs []*networkMsg
// keys are the target messages' caching keys.
keys []uint64
)
// filterMsgs is the visitor used when iterating the future cache.
filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
if cmsg.height <= height {
msgs = append(msgs, cmsg.msg)
keys = append(keys, k)
}
return true
}
// Filter out the target messages.
d.futureMsgs.Range(filterMsgs)
// Return early if no messages found.
if err == cache.ErrElementNotFound {
if len(msgs) == 0 {
return
}
// The error must nil, we will log an error and exit.
if err != nil {
log.Errorf("Reading future messages got error: %v", err)
return
// Remove the filtered messages.
for _, key := range keys {
d.futureMsgs.Delete(key)
}
msgs := result.msgs
log.Debugf("Resending %d network messages at height %d",
len(msgs), height)
for _, pMsg := range msgs {
for _, msg := range msgs {
select {
case d.networkMsgs <- pMsg.msg:
case d.networkMsgs <- msg:
case <-d.quit:
pMsg.msg.err <- ErrGossiperShuttingDown
msg.err <- ErrGossiperShuttingDown
}
}
}
@ -1875,23 +1934,11 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
return false
}
// Add the premature message to our future messages which will
// be resent once the block height has reached.
// Add the premature message to our future messages which will be
// resent once the block height has reached.
//
// Init an empty cached message and overwrite it if there are cached
// messages found.
cachedMsgs := &cachedNetworkMsg{
msgs: make([]*processedNetworkMsg, 0),
}
result, err := d.futureMsgs.Get(msgHeight)
// No error returned means we have old messages cached.
if err == nil {
cachedMsgs = result
}
// Copy the networkMsgs since the old message's err chan will
// be consumed.
// Copy the networkMsgs since the old message's err chan will be
// consumed.
copied := &networkMsg{
peer: msg.peer,
source: msg.source,
@ -1901,15 +1948,15 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
err: make(chan error, 1),
}
// The processed boolean is unused in the futureMsgs case.
pMsg := &processedNetworkMsg{msg: copied}
// Create the cached message.
cachedMsg := &cachedFutureMsg{
msg: copied,
height: msgHeight,
}
// Add the network message.
msgs := cachedMsgs.msgs
msgs = append(msgs, pMsg)
_, err = d.futureMsgs.Put(msgHeight, &cachedNetworkMsg{
msgs: msgs,
})
// Increment the msg ID and add it to the cache.
nextMsgID := d.futureMsgs.nextMsgID()
_, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
if err != nil {
log.Errorf("Adding future message got error: %v", err)
}

View File

@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightninglabs/neutrino/cache"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
@ -4098,3 +4099,38 @@ func TestRejectCacheChannelAnn(t *testing.T) {
t.Fatal("did not process remote announcement")
}
}
// TestFutureMsgCacheEviction checks that when the cache's capacity is reached,
// saving one more item will evict the oldest item.
func TestFutureMsgCacheEviction(t *testing.T) {
t.Parallel()
// Create a future message cache with size 1.
c := newFutureMsgCache(1)
// Send two messages to the cache, which ends in the first message
// being evicted.
//
// Put the first item.
id := c.nextMsgID()
evicted, err := c.Put(id, &cachedFutureMsg{height: uint32(id)})
require.NoError(t, err)
require.False(t, evicted, "should not be evicted")
// Put the second item.
id = c.nextMsgID()
evicted, err = c.Put(id, &cachedFutureMsg{height: uint32(id)})
require.NoError(t, err)
require.True(t, evicted, "should be evicted")
// The first item should have been evicted.
//
// NOTE: msg ID starts at 1, not 0.
_, err = c.Get(1)
require.ErrorIs(t, err, cache.ErrElementNotFound)
// The second item should be found.
item, err := c.Get(2)
require.NoError(t, err)
require.EqualValues(t, 2, item.height, "should be the second item")
}

2
go.mod
View File

@ -31,7 +31,7 @@ require (
github.com/jrick/logrotate v1.0.0
github.com/kkdai/bstream v1.0.0
github.com/lightninglabs/neutrino v0.15.0
github.com/lightninglabs/neutrino/cache v1.1.0
github.com/lightninglabs/neutrino/cache v1.1.1
github.com/lightninglabs/protobuf-hex-display v1.4.3-hex-display
github.com/lightningnetwork/lightning-onion v1.2.1-0.20221202012345-ca23184850a1
github.com/lightningnetwork/lnd/cert v1.2.1

4
go.sum
View File

@ -393,8 +393,8 @@ github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
github.com/lightninglabs/neutrino v0.15.0 h1:yr3uz36fLAq8hyM0TRUVlef1TRNoWAqpmmNlVtKUDtI=
github.com/lightninglabs/neutrino v0.15.0/go.mod h1:pmjwElN/091TErtSE9Vd5W4hpxoG2/+xlb+HoPm9Gug=
github.com/lightninglabs/neutrino/cache v1.1.0 h1:szZIhVabiQIsGzJjhvo76sj05Au+zVotj2M34EquGME=
github.com/lightninglabs/neutrino/cache v1.1.0/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo=
github.com/lightninglabs/neutrino/cache v1.1.1 h1:TllWOSlkABhpgbWJfzsrdUaDH2fBy/54VSIB4vVqV8M=
github.com/lightninglabs/neutrino/cache v1.1.1/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo=
github.com/lightninglabs/protobuf-hex-display v1.4.3-hex-display h1:RZJ8H4ueU/aQ9pFtx5wqsuD3B/DezrewJeVwDKKYY8E=
github.com/lightninglabs/protobuf-hex-display v1.4.3-hex-display/go.mod h1:2oKOBU042GKFHrdbgGiKax4xVrFiZu51lhacUZQ9MnE=
github.com/lightningnetwork/lightning-onion v1.2.1-0.20221202012345-ca23184850a1 h1:Wm0g70gkcAu2pGpNZwfWPSVOY21j8IyYsNewwK4OkT4=