discovery: pass context through to reliable sender

And remove a context.TODO() that was added in the previous commit.
This commit is contained in:
Elle Mouton
2025-04-07 10:03:29 +02:00
parent 7fd453f319
commit 4a30d6243d
3 changed files with 51 additions and 41 deletions

View File

@@ -602,11 +602,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
NotifyWhenOnline: cfg.NotifyWhenOnline, NotifyWhenOnline: cfg.NotifyWhenOnline,
NotifyWhenOffline: cfg.NotifyWhenOffline, NotifyWhenOffline: cfg.NotifyWhenOffline,
MessageStore: cfg.MessageStore, MessageStore: cfg.MessageStore,
IsMsgStale: func(message lnwire.Message) bool { IsMsgStale: gossiper.isMsgStale,
ctx := context.TODO()
return gossiper.isMsgStale(ctx, message)
},
}) })
return gossiper return gossiper
@@ -678,7 +674,7 @@ func (d *AuthenticatedGossiper) start(ctx context.Context) error {
// Start the reliable sender. In case we had any pending messages ready // Start the reliable sender. In case we had any pending messages ready
// to be sent when the gossiper was last shut down, we must continue on // to be sent when the gossiper was last shut down, we must continue on
// our quest to deliver them to their respective peers. // our quest to deliver them to their respective peers.
if err := d.reliableSender.Start(); err != nil { if err := d.reliableSender.Start(ctx); err != nil {
return err return err
} }
@@ -1889,7 +1885,7 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context,
edgeInfo.Info, chanUpdate.ChannelFlags, edgeInfo.Info, chanUpdate.ChannelFlags,
) )
err := d.reliableSender.sendMessage( err := d.reliableSender.sendMessage(
chanUpdate, remotePubKey, ctx, chanUpdate, remotePubKey,
) )
if err != nil { if err != nil {
log.Errorf("Unable to reliably send %v for "+ log.Errorf("Unable to reliably send %v for "+
@@ -3333,7 +3329,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
// Now we'll attempt to send the channel update message // Now we'll attempt to send the channel update message
// reliably to the remote peer in the background, so that we // reliably to the remote peer in the background, so that we
// don't block if the peer happens to be offline at the moment. // don't block if the peer happens to be offline at the moment.
err := d.reliableSender.sendMessage(upd, remotePubKey) err := d.reliableSender.sendMessage(ctx, upd, remotePubKey)
if err != nil { if err != nil {
err := fmt.Errorf("unable to reliably send %v for "+ err := fmt.Errorf("unable to reliably send %v for "+
"channel=%v to peer=%x: %v", upd.MsgType(), "channel=%v to peer=%x: %v", upd.MsgType(),
@@ -3470,7 +3466,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context,
// Since the remote peer might not be online we'll call a // Since the remote peer might not be online we'll call a
// method that will attempt to deliver the proof when it comes // method that will attempt to deliver the proof when it comes
// online. // online.
err := d.reliableSender.sendMessage(ann, remotePubKey) err := d.reliableSender.sendMessage(ctx, ann, remotePubKey)
if err != nil { if err != nil {
err := fmt.Errorf("unable to reliably send %v for "+ err := fmt.Errorf("unable to reliably send %v for "+
"channel=%v to peer=%x: %v", ann.MsgType(), "channel=%v to peer=%x: %v", ann.MsgType(),

View File

@@ -1,8 +1,10 @@
package discovery package discovery
import ( import (
"context"
"sync" "sync"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@@ -28,7 +30,7 @@ type reliableSenderCfg struct {
// IsMsgStale determines whether a message retrieved from the backing // IsMsgStale determines whether a message retrieved from the backing
// MessageStore is seen as stale by the current graph. // MessageStore is seen as stale by the current graph.
IsMsgStale func(lnwire.Message) bool IsMsgStale func(context.Context, lnwire.Message) bool
} }
// peerManager contains the set of channels required for the peerHandler to // peerManager contains the set of channels required for the peerHandler to
@@ -59,8 +61,9 @@ type reliableSender struct {
activePeers map[[33]byte]peerManager activePeers map[[33]byte]peerManager
activePeersMtx sync.Mutex activePeersMtx sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
cancel fn.Option[context.CancelFunc]
} }
// newReliableSender returns a new reliableSender backed by the given config. // newReliableSender returns a new reliableSender backed by the given config.
@@ -73,10 +76,13 @@ func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
} }
// Start spawns message handlers for any peers with pending messages. // Start spawns message handlers for any peers with pending messages.
func (s *reliableSender) Start() error { func (s *reliableSender) Start(ctx context.Context) error {
var err error var err error
s.start.Do(func() { s.start.Do(func() {
err = s.resendPendingMsgs() ctx, cancel := context.WithCancel(ctx)
s.cancel = fn.Some(cancel)
err = s.resendPendingMsgs(ctx)
}) })
return err return err
} }
@@ -87,6 +93,7 @@ func (s *reliableSender) Stop() {
log.Debugf("reliableSender is stopping") log.Debugf("reliableSender is stopping")
defer log.Debugf("reliableSender stopped") defer log.Debugf("reliableSender stopped")
s.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
close(s.quit) close(s.quit)
s.wg.Wait() s.wg.Wait()
}) })
@@ -96,7 +103,9 @@ func (s *reliableSender) Stop() {
// event that the peer is currently offline, this will only write the message to // event that the peer is currently offline, this will only write the message to
// disk. Once the peer reconnects, this message, along with any others pending, // disk. Once the peer reconnects, this message, along with any others pending,
// will be sent to the peer. // will be sent to the peer.
func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error { func (s *reliableSender) sendMessage(ctx context.Context, msg lnwire.Message,
peerPubKey [33]byte) error {
// We'll start by persisting the message to disk. This allows us to // We'll start by persisting the message to disk. This allows us to
// resend the message upon restarts and peer reconnections. // resend the message upon restarts and peer reconnections.
if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil { if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil {
@@ -106,7 +115,7 @@ func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) er
// Then, we'll spawn a peerHandler for this peer to handle resending its // Then, we'll spawn a peerHandler for this peer to handle resending its
// pending messages while taking into account its connection lifecycle. // pending messages while taking into account its connection lifecycle.
spawnHandler: spawnHandler:
msgHandler, ok := s.spawnPeerHandler(peerPubKey) msgHandler, ok := s.spawnPeerHandler(ctx, peerPubKey)
// If the handler wasn't previously active, we can exit now as we know // If the handler wasn't previously active, we can exit now as we know
// that the message will be sent once the peer online notification is // that the message will be sent once the peer online notification is
@@ -134,7 +143,7 @@ spawnHandler:
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't // spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
// one already active. The boolean returned signals whether there was already // one already active. The boolean returned signals whether there was already
// one active or not. // one active or not.
func (s *reliableSender) spawnPeerHandler( func (s *reliableSender) spawnPeerHandler(ctx context.Context,
peerPubKey [33]byte) (peerManager, bool) { peerPubKey [33]byte) (peerManager, bool) {
s.activePeersMtx.Lock() s.activePeersMtx.Lock()
@@ -152,7 +161,7 @@ func (s *reliableSender) spawnPeerHandler(
// peerHandler. // peerHandler.
if !ok { if !ok {
s.wg.Add(1) s.wg.Add(1)
go s.peerHandler(msgHandler, peerPubKey) go s.peerHandler(ctx, msgHandler, peerPubKey)
} }
return msgHandler, ok return msgHandler, ok
@@ -164,7 +173,9 @@ func (s *reliableSender) spawnPeerHandler(
// offline will be queued and sent once the peer reconnects. // offline will be queued and sent once the peer reconnects.
// //
// NOTE: This must be run as a goroutine. // NOTE: This must be run as a goroutine.
func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) { func (s *reliableSender) peerHandler(ctx context.Context, peerMgr peerManager,
peerPubKey [33]byte) {
defer s.wg.Done() defer s.wg.Done()
// We'll start by requesting a notification for when the peer // We'll start by requesting a notification for when the peer
@@ -252,7 +263,7 @@ out:
// check whether it's stale. This guarantees that // check whether it's stale. This guarantees that
// AnnounceSignatures are sent at least once if we happen to // AnnounceSignatures are sent at least once if we happen to
// already have signatures for both parties. // already have signatures for both parties.
if s.cfg.IsMsgStale(msg) { if s.cfg.IsMsgStale(ctx, msg) {
err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey) err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey)
if err != nil { if err != nil {
log.Errorf("Unable to remove stale %v message "+ log.Errorf("Unable to remove stale %v message "+
@@ -321,7 +332,7 @@ out:
// resendPendingMsgs retrieves and sends all of the messages within the message // resendPendingMsgs retrieves and sends all of the messages within the message
// store that should be reliably sent to their respective peers. // store that should be reliably sent to their respective peers.
func (s *reliableSender) resendPendingMsgs() error { func (s *reliableSender) resendPendingMsgs(ctx context.Context) error {
// Fetch all of the peers for which we have pending messages for and // Fetch all of the peers for which we have pending messages for and
// spawn a peerMsgHandler for each. Once the peer is seen as online, all // spawn a peerMsgHandler for each. Once the peer is seen as online, all
// of the pending messages will be sent. // of the pending messages will be sent.
@@ -331,7 +342,7 @@ func (s *reliableSender) resendPendingMsgs() error {
} }
for peer := range peers { for peer := range peers {
s.spawnPeerHandler(peer) s.spawnPeerHandler(ctx, peer)
} }
return nil return nil

View File

@@ -1,6 +1,7 @@
package discovery package discovery
import ( import (
"context"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
"testing" "testing"
@@ -11,6 +12,7 @@ import (
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
) )
// newTestReliableSender creates a new reliable sender instance used for // newTestReliableSender creates a new reliable sender instance used for
@@ -32,7 +34,7 @@ func newTestReliableSender(t *testing.T) *reliableSender {
return c return c
}, },
MessageStore: newMockMessageStore(), MessageStore: newMockMessageStore(),
IsMsgStale: func(lnwire.Message) bool { IsMsgStale: func(context.Context, lnwire.Message) bool {
return false return false
}, },
} }
@@ -69,6 +71,7 @@ func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message,
// a peer while taking into account its connection lifecycle works as expected. // a peer while taking into account its connection lifecycle works as expected.
func TestReliableSenderFlow(t *testing.T) { func TestReliableSenderFlow(t *testing.T) {
t.Parallel() t.Parallel()
ctx := context.Background()
reliableSender := newTestReliableSender(t) reliableSender := newTestReliableSender(t)
@@ -98,9 +101,8 @@ func TestReliableSenderFlow(t *testing.T) {
msg1 := randChannelUpdate() msg1 := randChannelUpdate()
var peerPubKey [33]byte var peerPubKey [33]byte
copy(peerPubKey[:], pubKey.SerializeCompressed()) copy(peerPubKey[:], pubKey.SerializeCompressed())
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { err := reliableSender.sendMessage(ctx, msg1, peerPubKey)
t.Fatalf("unable to reliably send message: %v", err) require.NoError(t, err)
}
// Since there isn't a peerHandler for this peer currently active due to // Since there isn't a peerHandler for this peer currently active due to
// this being the first message being sent reliably, we should expect to // this being the first message being sent reliably, we should expect to
@@ -114,9 +116,8 @@ func TestReliableSenderFlow(t *testing.T) {
// We'll then attempt to send another additional message reliably. // We'll then attempt to send another additional message reliably.
msg2 := randAnnounceSignatures() msg2 := randAnnounceSignatures()
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { err = reliableSender.sendMessage(ctx, msg2, peerPubKey)
t.Fatalf("unable to reliably send message: %v", err) require.NoError(t, err)
}
// This should not however request another peer online notification as // This should not however request another peer online notification as
// the peerHandler has already been started and is waiting for the // the peerHandler has already been started and is waiting for the
@@ -145,9 +146,8 @@ func TestReliableSenderFlow(t *testing.T) {
// Then, we'll send one more message reliably. // Then, we'll send one more message reliably.
msg3 := randChannelUpdate() msg3 := randChannelUpdate()
if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil { err = reliableSender.sendMessage(ctx, msg3, peerPubKey)
t.Fatalf("unable to reliably send message: %v", err) require.NoError(t, err)
}
// Again, this should not request another peer online notification // Again, this should not request another peer online notification
// request since we are currently waiting for the peer to be offline. // request since we are currently waiting for the peer to be offline.
@@ -188,6 +188,7 @@ func TestReliableSenderFlow(t *testing.T) {
// them as stale. // them as stale.
func TestReliableSenderStaleMessages(t *testing.T) { func TestReliableSenderStaleMessages(t *testing.T) {
t.Parallel() t.Parallel()
ctx := context.Background()
reliableSender := newTestReliableSender(t) reliableSender := newTestReliableSender(t)
@@ -206,7 +207,9 @@ func TestReliableSenderStaleMessages(t *testing.T) {
// We'll also override IsMsgStale to mark all messages as stale as we're // We'll also override IsMsgStale to mark all messages as stale as we're
// interested in testing the stale message behavior. // interested in testing the stale message behavior.
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { reliableSender.cfg.IsMsgStale = func(_ context.Context,
_ lnwire.Message) bool {
return true return true
} }
@@ -215,9 +218,8 @@ func TestReliableSenderStaleMessages(t *testing.T) {
msg1 := randAnnounceSignatures() msg1 := randAnnounceSignatures()
var peerPubKey [33]byte var peerPubKey [33]byte
copy(peerPubKey[:], pubKey.SerializeCompressed()) copy(peerPubKey[:], pubKey.SerializeCompressed())
if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { err := reliableSender.sendMessage(ctx, msg1, peerPubKey)
t.Fatalf("unable to reliably send message: %v", err) require.NoError(t, err)
}
// Since there isn't a peerHandler for this peer currently active due to // Since there isn't a peerHandler for this peer currently active due to
// this being the first message being sent reliably, we should expect to // this being the first message being sent reliably, we should expect to
@@ -245,7 +247,7 @@ func TestReliableSenderStaleMessages(t *testing.T) {
// message store since it is seen as stale and has been sent at least // message store since it is seen as stale and has been sent at least
// once. Once the message is removed, the peerHandler should be torn // once. Once the message is removed, the peerHandler should be torn
// down as there are no longer any pending messages within the store. // down as there are no longer any pending messages within the store.
err := wait.NoError(func() error { err = wait.NoError(func() error {
msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer( msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer(
peerPubKey, peerPubKey,
) )
@@ -265,14 +267,15 @@ func TestReliableSenderStaleMessages(t *testing.T) {
} }
// Override IsMsgStale to no longer mark messages as stale. // Override IsMsgStale to no longer mark messages as stale.
reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { reliableSender.cfg.IsMsgStale = func(_ context.Context,
_ lnwire.Message) bool {
return false return false
} }
// We'll request the message to be sent reliably. // We'll request the message to be sent reliably.
if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { err = reliableSender.sendMessage(ctx, msg2, peerPubKey)
t.Fatalf("unable to reliably send message: %v", err) require.NoError(t, err)
}
// We should see an online notification request indicating that a new // We should see an online notification request indicating that a new
// peerHandler has been spawned since it was previously torn down. // peerHandler has been spawned since it was previously torn down.