diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ee74a5938..ac95d55ba 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -602,11 +602,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper NotifyWhenOnline: cfg.NotifyWhenOnline, NotifyWhenOffline: cfg.NotifyWhenOffline, MessageStore: cfg.MessageStore, - IsMsgStale: func(message lnwire.Message) bool { - ctx := context.TODO() - - return gossiper.isMsgStale(ctx, message) - }, + IsMsgStale: gossiper.isMsgStale, }) return gossiper @@ -678,7 +674,7 @@ func (d *AuthenticatedGossiper) start(ctx context.Context) error { // Start the reliable sender. In case we had any pending messages ready // to be sent when the gossiper was last shut down, we must continue on // our quest to deliver them to their respective peers. - if err := d.reliableSender.Start(); err != nil { + if err := d.reliableSender.Start(ctx); err != nil { return err } @@ -1889,7 +1885,7 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context, edgeInfo.Info, chanUpdate.ChannelFlags, ) err := d.reliableSender.sendMessage( - chanUpdate, remotePubKey, + ctx, chanUpdate, remotePubKey, ) if err != nil { log.Errorf("Unable to reliably send %v for "+ @@ -3333,7 +3329,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context, // Now we'll attempt to send the channel update message // reliably to the remote peer in the background, so that we // don't block if the peer happens to be offline at the moment. - err := d.reliableSender.sendMessage(upd, remotePubKey) + err := d.reliableSender.sendMessage(ctx, upd, remotePubKey) if err != nil { err := fmt.Errorf("unable to reliably send %v for "+ "channel=%v to peer=%x: %v", upd.MsgType(), @@ -3470,7 +3466,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context, // Since the remote peer might not be online we'll call a // method that will attempt to deliver the proof when it comes // online. - err := d.reliableSender.sendMessage(ann, remotePubKey) + err := d.reliableSender.sendMessage(ctx, ann, remotePubKey) if err != nil { err := fmt.Errorf("unable to reliably send %v for "+ "channel=%v to peer=%x: %v", ann.MsgType(), diff --git a/discovery/reliable_sender.go b/discovery/reliable_sender.go index b4d32e73f..57f2f28ff 100644 --- a/discovery/reliable_sender.go +++ b/discovery/reliable_sender.go @@ -1,8 +1,10 @@ package discovery import ( + "context" "sync" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" ) @@ -28,7 +30,7 @@ type reliableSenderCfg struct { // IsMsgStale determines whether a message retrieved from the backing // MessageStore is seen as stale by the current graph. - IsMsgStale func(lnwire.Message) bool + IsMsgStale func(context.Context, lnwire.Message) bool } // peerManager contains the set of channels required for the peerHandler to @@ -59,8 +61,9 @@ type reliableSender struct { activePeers map[[33]byte]peerManager activePeersMtx sync.Mutex - wg sync.WaitGroup - quit chan struct{} + wg sync.WaitGroup + quit chan struct{} + cancel fn.Option[context.CancelFunc] } // newReliableSender returns a new reliableSender backed by the given config. @@ -73,10 +76,13 @@ func newReliableSender(cfg *reliableSenderCfg) *reliableSender { } // Start spawns message handlers for any peers with pending messages. -func (s *reliableSender) Start() error { +func (s *reliableSender) Start(ctx context.Context) error { var err error s.start.Do(func() { - err = s.resendPendingMsgs() + ctx, cancel := context.WithCancel(ctx) + s.cancel = fn.Some(cancel) + + err = s.resendPendingMsgs(ctx) }) return err } @@ -87,6 +93,7 @@ func (s *reliableSender) Stop() { log.Debugf("reliableSender is stopping") defer log.Debugf("reliableSender stopped") + s.cancel.WhenSome(func(fn context.CancelFunc) { fn() }) close(s.quit) s.wg.Wait() }) @@ -96,7 +103,9 @@ func (s *reliableSender) Stop() { // event that the peer is currently offline, this will only write the message to // disk. Once the peer reconnects, this message, along with any others pending, // will be sent to the peer. -func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error { +func (s *reliableSender) sendMessage(ctx context.Context, msg lnwire.Message, + peerPubKey [33]byte) error { + // We'll start by persisting the message to disk. This allows us to // resend the message upon restarts and peer reconnections. if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil { @@ -106,7 +115,7 @@ func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) er // Then, we'll spawn a peerHandler for this peer to handle resending its // pending messages while taking into account its connection lifecycle. spawnHandler: - msgHandler, ok := s.spawnPeerHandler(peerPubKey) + msgHandler, ok := s.spawnPeerHandler(ctx, peerPubKey) // If the handler wasn't previously active, we can exit now as we know // that the message will be sent once the peer online notification is @@ -134,7 +143,7 @@ spawnHandler: // spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't // one already active. The boolean returned signals whether there was already // one active or not. -func (s *reliableSender) spawnPeerHandler( +func (s *reliableSender) spawnPeerHandler(ctx context.Context, peerPubKey [33]byte) (peerManager, bool) { s.activePeersMtx.Lock() @@ -152,7 +161,7 @@ func (s *reliableSender) spawnPeerHandler( // peerHandler. if !ok { s.wg.Add(1) - go s.peerHandler(msgHandler, peerPubKey) + go s.peerHandler(ctx, msgHandler, peerPubKey) } return msgHandler, ok @@ -164,7 +173,9 @@ func (s *reliableSender) spawnPeerHandler( // offline will be queued and sent once the peer reconnects. // // NOTE: This must be run as a goroutine. -func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) { +func (s *reliableSender) peerHandler(ctx context.Context, peerMgr peerManager, + peerPubKey [33]byte) { + defer s.wg.Done() // We'll start by requesting a notification for when the peer @@ -252,7 +263,7 @@ out: // check whether it's stale. This guarantees that // AnnounceSignatures are sent at least once if we happen to // already have signatures for both parties. - if s.cfg.IsMsgStale(msg) { + if s.cfg.IsMsgStale(ctx, msg) { err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey) if err != nil { log.Errorf("Unable to remove stale %v message "+ @@ -321,7 +332,7 @@ out: // resendPendingMsgs retrieves and sends all of the messages within the message // store that should be reliably sent to their respective peers. -func (s *reliableSender) resendPendingMsgs() error { +func (s *reliableSender) resendPendingMsgs(ctx context.Context) error { // Fetch all of the peers for which we have pending messages for and // spawn a peerMsgHandler for each. Once the peer is seen as online, all // of the pending messages will be sent. @@ -331,7 +342,7 @@ func (s *reliableSender) resendPendingMsgs() error { } for peer := range peers { - s.spawnPeerHandler(peer) + s.spawnPeerHandler(ctx, peer) } return nil diff --git a/discovery/reliable_sender_test.go b/discovery/reliable_sender_test.go index 19fdaa1ca..fc94d57f3 100644 --- a/discovery/reliable_sender_test.go +++ b/discovery/reliable_sender_test.go @@ -1,6 +1,7 @@ package discovery import ( + "context" "fmt" "sync/atomic" "testing" @@ -11,6 +12,7 @@ import ( "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" ) // newTestReliableSender creates a new reliable sender instance used for @@ -32,7 +34,7 @@ func newTestReliableSender(t *testing.T) *reliableSender { return c }, MessageStore: newMockMessageStore(), - IsMsgStale: func(lnwire.Message) bool { + IsMsgStale: func(context.Context, lnwire.Message) bool { return false }, } @@ -69,6 +71,7 @@ func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message, // a peer while taking into account its connection lifecycle works as expected. func TestReliableSenderFlow(t *testing.T) { t.Parallel() + ctx := context.Background() reliableSender := newTestReliableSender(t) @@ -98,9 +101,8 @@ func TestReliableSenderFlow(t *testing.T) { msg1 := randChannelUpdate() var peerPubKey [33]byte copy(peerPubKey[:], pubKey.SerializeCompressed()) - if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { - t.Fatalf("unable to reliably send message: %v", err) - } + err := reliableSender.sendMessage(ctx, msg1, peerPubKey) + require.NoError(t, err) // Since there isn't a peerHandler for this peer currently active due to // this being the first message being sent reliably, we should expect to @@ -114,9 +116,8 @@ func TestReliableSenderFlow(t *testing.T) { // We'll then attempt to send another additional message reliably. msg2 := randAnnounceSignatures() - if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { - t.Fatalf("unable to reliably send message: %v", err) - } + err = reliableSender.sendMessage(ctx, msg2, peerPubKey) + require.NoError(t, err) // This should not however request another peer online notification as // the peerHandler has already been started and is waiting for the @@ -145,9 +146,8 @@ func TestReliableSenderFlow(t *testing.T) { // Then, we'll send one more message reliably. msg3 := randChannelUpdate() - if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil { - t.Fatalf("unable to reliably send message: %v", err) - } + err = reliableSender.sendMessage(ctx, msg3, peerPubKey) + require.NoError(t, err) // Again, this should not request another peer online notification // request since we are currently waiting for the peer to be offline. @@ -188,6 +188,7 @@ func TestReliableSenderFlow(t *testing.T) { // them as stale. func TestReliableSenderStaleMessages(t *testing.T) { t.Parallel() + ctx := context.Background() reliableSender := newTestReliableSender(t) @@ -206,7 +207,9 @@ func TestReliableSenderStaleMessages(t *testing.T) { // We'll also override IsMsgStale to mark all messages as stale as we're // interested in testing the stale message behavior. - reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { + reliableSender.cfg.IsMsgStale = func(_ context.Context, + _ lnwire.Message) bool { + return true } @@ -215,9 +218,8 @@ func TestReliableSenderStaleMessages(t *testing.T) { msg1 := randAnnounceSignatures() var peerPubKey [33]byte copy(peerPubKey[:], pubKey.SerializeCompressed()) - if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { - t.Fatalf("unable to reliably send message: %v", err) - } + err := reliableSender.sendMessage(ctx, msg1, peerPubKey) + require.NoError(t, err) // Since there isn't a peerHandler for this peer currently active due to // this being the first message being sent reliably, we should expect to @@ -245,7 +247,7 @@ func TestReliableSenderStaleMessages(t *testing.T) { // message store since it is seen as stale and has been sent at least // once. Once the message is removed, the peerHandler should be torn // down as there are no longer any pending messages within the store. - err := wait.NoError(func() error { + err = wait.NoError(func() error { msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer( peerPubKey, ) @@ -265,14 +267,15 @@ func TestReliableSenderStaleMessages(t *testing.T) { } // Override IsMsgStale to no longer mark messages as stale. - reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { + reliableSender.cfg.IsMsgStale = func(_ context.Context, + _ lnwire.Message) bool { + return false } // We'll request the message to be sent reliably. - if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { - t.Fatalf("unable to reliably send message: %v", err) - } + err = reliableSender.sendMessage(ctx, msg2, peerPubKey) + require.NoError(t, err) // We should see an online notification request indicating that a new // peerHandler has been spawned since it was previously torn down.