diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 6232a92ce..ab12524fb 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -857,10 +857,8 @@ func (d *AuthenticatedGossiper) stop() { // then added to a queue for batched trickled announcement to all connected // peers. Remote channel announcements should contain the announcement proof // and be fully validated. -func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, - peer lnpeer.Peer) chan error { - - ctx := context.TODO() +func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context, + msg lnwire.Message, peer lnpeer.Peer) chan error { log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey()) @@ -950,8 +948,12 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, // If the peer that sent us this error is quitting, then we don't need // to send back an error and can return immediately. + // TODO(elle): the peer should now just rely on canceling the passed + // context. case <-peer.QuitSignal(): return nil + case <-ctx.Done(): + return nil case <-d.quit: nMsg.err <- ErrGossiperShuttingDown } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index b3ca738eb..ef7f2f21f 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -1020,9 +1020,10 @@ func createTestCtx(t *testing.T, startHeight uint32, isChanPeer bool) ( // the router subsystem. func TestProcessAnnouncement(t *testing.T) { t.Parallel() + ctx := context.Background() timestamp := testTimestamp - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") assertSenderExistence := func(sender *btcec.PublicKey, msg msgWithSenders) { @@ -1038,11 +1039,11 @@ func TestProcessAnnouncement(t *testing.T) { // First, we'll craft a valid remote channel announcement and send it to // the gossiper so that it can be processed. - ca, err := ctx.createRemoteChannelAnnouncement(0) + ca, err := tCtx.createRemoteChannelAnnouncement(0) require.NoError(t, err, "can't create channel announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement(ctx, ca, nodePeer): case <-time.After(2 * time.Second): t.Fatal("remote announcement not processed") } @@ -1051,13 +1052,13 @@ func TestProcessAnnouncement(t *testing.T) { // The announcement should be broadcast and included in our local view // of the graph. select { - case msg := <-ctx.broadcastedMessage: + case msg := <-tCtx.broadcastedMessage: assertSenderExistence(nodePeer.IdentityKey(), msg) case <-time.After(2 * trickleDelay): t.Fatal("announcement wasn't proceeded") } - if len(ctx.router.infos) != 1 { + if len(tCtx.router.infos) != 1 { t.Fatalf("edge wasn't added to router: %v", err) } @@ -1068,7 +1069,7 @@ func TestProcessAnnouncement(t *testing.T) { // We send an invalid channel update and expect it to fail. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement(ctx, ua, nodePeer): case <-time.After(2 * time.Second): t.Fatal("remote announcement not processed") } @@ -1077,7 +1078,7 @@ func TestProcessAnnouncement(t *testing.T) { // We should not broadcast the channel update. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("gossiper should not have broadcast channel update") case <-time.After(2 * trickleDelay): } @@ -1088,7 +1089,7 @@ func TestProcessAnnouncement(t *testing.T) { require.NoError(t, err, "can't create update announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement(ctx, ua, nodePeer): case <-time.After(2 * time.Second): t.Fatal("remote announcement not processed") } @@ -1096,13 +1097,13 @@ func TestProcessAnnouncement(t *testing.T) { // The channel policy should be broadcast to the rest of the network. select { - case msg := <-ctx.broadcastedMessage: + case msg := <-tCtx.broadcastedMessage: assertSenderExistence(nodePeer.IdentityKey(), msg) case <-time.After(2 * trickleDelay): t.Fatal("announcement wasn't proceeded") } - if len(ctx.router.edges) != 1 { + if len(tCtx.router.edges) != 1 { t.Fatalf("edge update wasn't added to router: %v", err) } @@ -1111,7 +1112,7 @@ func TestProcessAnnouncement(t *testing.T) { require.NoError(t, err, "can't create node announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, nodePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement(ctx, na, nodePeer): case <-time.After(2 * time.Second): t.Fatal("remote announcement not processed") } @@ -1120,13 +1121,13 @@ func TestProcessAnnouncement(t *testing.T) { // It should also be broadcast to the network and included in our local // view of the graph. select { - case msg := <-ctx.broadcastedMessage: + case msg := <-tCtx.broadcastedMessage: assertSenderExistence(nodePeer.IdentityKey(), msg) case <-time.After(2 * trickleDelay): t.Fatal("announcement wasn't proceeded") } - if len(ctx.router.nodes) != 1 { + if len(tCtx.router.nodes) != 1 { t.Fatalf("node wasn't added to router: %v", err) } } @@ -1135,10 +1136,11 @@ func TestProcessAnnouncement(t *testing.T) { // propagated to the router subsystem. func TestPrematureAnnouncement(t *testing.T) { t.Parallel() + ctx := context.Background() timestamp := testTimestamp - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") _, err = createNodeAnnouncement(remoteKeyPriv1, timestamp) @@ -1150,18 +1152,18 @@ func TestPrematureAnnouncement(t *testing.T) { // remote side, but block height of this announcement is greater than // highest know to us, for that reason it should be ignored and not // added to the router. - ca, err := ctx.createRemoteChannelAnnouncement( + ca, err := tCtx.createRemoteChannelAnnouncement( 1, withFundingTxPrep(fundingTxPrepTypeNone), ) require.NoError(t, err, "can't create channel announcement") select { - case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): + case <-tCtx.gossiper.ProcessRemoteAnnouncement(ctx, ca, nodePeer): case <-time.After(time.Second): t.Fatal("announcement was not processed") } - if len(ctx.router.infos) != 0 { + if len(tCtx.router.infos) != 0 { t.Fatal("edge was added to router") } } @@ -1170,69 +1172,70 @@ func TestPrematureAnnouncement(t *testing.T) { // properly processes partial and fully announcement signatures message. func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target [33]byte, - peerChan chan<- lnpeer.Peer) { + tCtx.gossiper.reliableSender.cfg.NotifyWhenOnline = func( + target [33]byte, peerChan chan<- lnpeer.Peer) { pk, _ := btcec.ParsePubKey(target[:]) select { case peerChan <- &mockPeer{ - pk, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + pk, sentMsgs, tCtx.gossiper.quit, atomic.Bool{}, }: - case <-ctx.gossiper.quit: + case <-tCtx.gossiper.quit: } } - batch, err := ctx.createLocalAnnouncements(0) + batch, err := tCtx.createLocalAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") remotePeer := &mockPeer{ - remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + remoteKey, sentMsgs, tCtx.gossiper.quit, atomic.Bool{}, } // Recreate lightning network topology. Initialize router with channel // between two nodes. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process channel ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process channel update") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process node ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -1248,29 +1251,29 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process channel update") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.nodeAnn2, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.nodeAnn2, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process node ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -1278,20 +1281,22 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( + batch.localProofAnn, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process local proof") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("announcements were broadcast") case <-time.After(2 * trickleDelay): } number := 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1308,8 +1313,8 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") @@ -1318,14 +1323,14 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { for i := 0; i < 5; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(time.Second): t.Fatal("announcement wasn't broadcast") } } number = 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1346,33 +1351,34 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // processes announcement with unknown channel ids. func TestOrphanSignatureAnnouncement(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") // Set up a channel that we can use to inspect the messages sent // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target [33]byte, - peerChan chan<- lnpeer.Peer) { + tCtx.gossiper.reliableSender.cfg.NotifyWhenOnline = func( + target [33]byte, peerChan chan<- lnpeer.Peer) { pk, _ := btcec.ParsePubKey(target[:]) select { case peerChan <- &mockPeer{ - pk, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + pk, sentMsgs, tCtx.gossiper.quit, atomic.Bool{}, }: - case <-ctx.gossiper.quit: + case <-tCtx.gossiper.quit: } } - batch, err := ctx.createLocalAnnouncements(0) + batch, err := tCtx.createLocalAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) require.NoError(t, err, "unable to parse pubkey") remotePeer := &mockPeer{ - remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + remoteKey, sentMsgs, tCtx.gossiper.quit, atomic.Bool{}, } // Pretending that we receive local channel announcement from funding @@ -1380,15 +1386,16 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // this case the announcement should be added in the orphan batch // because we haven't announce the channel yet. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, - remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to proceed announcement") number := 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1407,7 +1414,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } @@ -1415,32 +1422,32 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { require.NoError(t, err, "unable to process") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process node ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -1456,28 +1463,29 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, - remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process node ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.nodeAnn2, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.nodeAnn2, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -1485,7 +1493,9 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // After that we process local announcement, and waiting to receive // the channel announcement. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( + batch.localProofAnn, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -1503,14 +1513,14 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // should be broadcasting the final channel announcements. for i := 0; i < 5; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(time.Second): t.Fatal("announcement wasn't broadcast") } } number = 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(p *channeldb.WaitingProof) error { number++ return nil @@ -1533,11 +1543,12 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // assembled. func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") - batch, err := ctx.createLocalAnnouncements(0) + batch, err := tCtx.createLocalAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) @@ -1546,7 +1557,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Set up a channel to intercept the messages sent to the remote peer. sentToPeer := make(chan lnwire.Message, 1) remotePeer := &mockPeer{ - remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + remoteKey, sentToPeer, tCtx.gossiper.quit, atomic.Bool{}, } // Since the reliable send to the remote peer of the local channel proof @@ -1554,7 +1565,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // channel through which it gets sent to control exactly when to // dispatch it. notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, + tCtx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -1562,13 +1573,13 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process channel ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -1576,7 +1587,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( batch.localProofAnn, ): case <-time.After(2 * time.Second): @@ -1598,7 +1609,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // The proof should not be broadcast yet since we're still missing the // remote party's. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("announcements were broadcast") case <-time.After(2 * trickleDelay): } @@ -1611,7 +1622,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } number := 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1630,7 +1641,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Restart the gossiper and restore its original NotifyWhenOnline and // NotifyWhenOffline methods. This should trigger a new attempt to send // the message to the peer. - ctx.gossiper.Stop() + require.NoError(t, tCtx.gossiper.Stop()) isAlias := func(lnwire.ShortChannelID) bool { return false @@ -1654,19 +1665,19 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { //nolint:ll gossiper := New(Config{ - Notifier: ctx.gossiper.cfg.Notifier, - Broadcast: ctx.gossiper.cfg.Broadcast, - NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, - NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, - FetchSelfAnnouncement: ctx.gossiper.cfg.FetchSelfAnnouncement, - UpdateSelfAnnouncement: ctx.gossiper.cfg.UpdateSelfAnnouncement, - Graph: ctx.gossiper.cfg.Graph, + Notifier: tCtx.gossiper.cfg.Notifier, + Broadcast: tCtx.gossiper.cfg.Broadcast, + NotifyWhenOnline: tCtx.gossiper.reliableSender.cfg.NotifyWhenOnline, + NotifyWhenOffline: tCtx.gossiper.reliableSender.cfg.NotifyWhenOffline, + FetchSelfAnnouncement: tCtx.gossiper.cfg.FetchSelfAnnouncement, + UpdateSelfAnnouncement: tCtx.gossiper.cfg.UpdateSelfAnnouncement, + Graph: tCtx.gossiper.cfg.Graph, TrickleDelay: trickleDelay, RetransmitTicker: ticker.NewForce(retransmitDelay), RebroadcastInterval: rebroadcastInterval, ProofMatureDelta: proofMatureDelta, - WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, - MessageStore: ctx.gossiper.cfg.MessageStore, + WaitingProofStore: tCtx.gossiper.cfg.WaitingProofStore, + MessageStore: tCtx.gossiper.cfg.MessageStore, RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), NumActiveSyncers: 3, @@ -1677,8 +1688,8 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { FindBaseByAlias: findBaseByAlias, GetAlias: getAlias, }, &keychain.KeyDescriptor{ - PubKey: ctx.gossiper.selfKey, - KeyLocator: ctx.gossiper.selfKeyLoc, + PubKey: tCtx.gossiper.selfKey, + KeyLocator: tCtx.gossiper.selfKeyLoc, }) require.NoError(t, err, "unable to recreate gossiper") if err := gossiper.Start(context.Background()); err != nil { @@ -1690,8 +1701,8 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // broadcast. gossiper.syncMgr.markGraphSynced() - ctx.gossiper = gossiper - remotePeer.quit = ctx.gossiper.quit + tCtx.gossiper = gossiper + remotePeer.quit = tCtx.gossiper.quit // After starting up, the gossiper will see that it has a proof in the // WaitingProofStore, and will retry sending its part to the remote. @@ -1729,8 +1740,8 @@ out: // Now exchanging the remote channel proof, the channel announcement // broadcast should continue as normal. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") @@ -1740,13 +1751,13 @@ out: } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(time.Second): t.Fatal("announcement wasn't broadcast") } number = 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1768,11 +1779,12 @@ out: // the full proof (ChannelAnnouncement) to the remote peer. func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") - batch, err := ctx.createLocalAnnouncements(0) + batch, err := tCtx.createLocalAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) @@ -1782,12 +1794,12 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // gossiper to the remote peer. sentToPeer := make(chan lnwire.Message, 1) remotePeer := &mockPeer{ - remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + remoteKey, sentToPeer, tCtx.gossiper.quit, atomic.Bool{}, } // Override NotifyWhenOnline to return the remote peer which we expect // meesages to be sent to. - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, + tCtx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, peerChan chan<- lnpeer.Peer) { peerChan <- remotePeer @@ -1796,7 +1808,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( batch.chanAnn, ): case <-time.After(2 * time.Second): @@ -1804,13 +1816,13 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { } require.NoError(t, err, "unable to process channel ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( batch.chanUpdAnn1, ): case <-time.After(2 * time.Second): @@ -1818,7 +1830,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { } require.NoError(t, err, "unable to process channel update") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -1831,7 +1843,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( batch.nodeAnn1, ): case <-time.After(2 * time.Second): @@ -1841,35 +1853,34 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Fatalf("unable to process node ann:%v", err) } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process channel update") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.nodeAnn2, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.nodeAnn2, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process node ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -1877,7 +1888,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( batch.localProofAnn, ): case <-time.After(2 * time.Second): @@ -1886,8 +1897,8 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { require.NoError(t, err, "unable to process local proof") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") @@ -1905,14 +1916,14 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // All channel and node announcements should be broadcast. for i := 0; i < 5; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(time.Second): t.Fatal("announcement wasn't broadcast") } } number := 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1931,8 +1942,8 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // Now give the gossiper the remote proof yet again. This should // trigger a send of the full ChannelAnnouncement. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") @@ -2210,25 +2221,26 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { // announcements for nodes who do not intend to publicly advertise themselves. func TestForwardPrivateNodeAnnouncement(t *testing.T) { t.Parallel() + ctx := context.Background() const ( startingHeight = 100 timestamp = 123456 ) - ctx, err := createTestCtx(t, startingHeight, false) + tCtx, err := createTestCtx(t, startingHeight, false) require.NoError(t, err, "can't create context") // We'll start off by processing a channel announcement without a proof // (i.e., an unadvertised channel), followed by a node announcement for // this same channel announcement. - chanAnn := ctx.createAnnouncementWithoutProof( + chanAnn := tCtx.createAnnouncementWithoutProof( startingHeight-2, selfKeyDesc.PubKey, remoteKeyPub1, ) pubKey := remoteKeyPriv1.PubKey() select { - case err := <-ctx.gossiper.ProcessLocalAnnouncement(chanAnn): + case err := <-tCtx.gossiper.ProcessLocalAnnouncement(chanAnn): if err != nil { t.Fatalf("unable to process local announcement: %v", err) } @@ -2239,7 +2251,7 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { // The gossiper should not broadcast the announcement due to it not // having its announcement signatures. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("gossiper should not have broadcast channel announcement") case <-time.After(2 * trickleDelay): } @@ -2248,7 +2260,7 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { require.NoError(t, err, "unable to create node announcement") select { - case err := <-ctx.gossiper.ProcessLocalAnnouncement(nodeAnn): + case err := <-tCtx.gossiper.ProcessLocalAnnouncement(nodeAnn): if err != nil { t.Fatalf("unable to process remote announcement: %v", err) } @@ -2259,7 +2271,7 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { // The gossiper should also not broadcast the node announcement due to // it not being part of any advertised channels. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("gossiper should not have broadcast node announcement") case <-time.After(2 * trickleDelay): } @@ -2268,14 +2280,16 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { // by opening a public channel on the network. We'll create a // ChannelAnnouncement and hand it off to the gossiper in order to // process it. - remoteChanAnn, err := ctx.createRemoteChannelAnnouncement( + remoteChanAnn, err := tCtx.createRemoteChannelAnnouncement( startingHeight - 1, ) require.NoError(t, err, "unable to create remote channel announcement") peer := &mockPeer{pubKey, nil, nil, atomic.Bool{}} select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement(remoteChanAnn, peer): + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, remoteChanAnn, peer, + ): if err != nil { t.Fatalf("unable to process remote announcement: %v", err) } @@ -2284,7 +2298,7 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(2 * trickleDelay): t.Fatal("gossiper should have broadcast the channel announcement") } @@ -2295,7 +2309,9 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { require.NoError(t, err, "unable to create node announcement") select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement(nodeAnn, peer): + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, nodeAnn, peer, + ): if err != nil { t.Fatalf("unable to process remote announcement: %v", err) } @@ -2304,7 +2320,7 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(2 * trickleDelay): t.Fatal("gossiper should have broadcast the node announcement") } @@ -2314,13 +2330,14 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { // zombie edges. func TestRejectZombieEdge(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll start by creating our test context with a batch of // announcements. - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "unable to create test context") - batch, err := ctx.createRemoteAnnouncements(0) + batch, err := tCtx.createRemoteAnnouncements(0) require.NoError(t, err, "unable to create announcements") remotePeer := &mockPeer{pk: remoteKeyPriv2.PubKey()} @@ -2330,8 +2347,8 @@ func TestRejectZombieEdge(t *testing.T) { processAnnouncements := func(isZombie bool) { t.Helper() - errChan := ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanAnn, remotePeer, + errChan := tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanAnn, remotePeer, ) select { case err := <-errChan: @@ -2347,7 +2364,7 @@ func TestRejectZombieEdge(t *testing.T) { t.Fatal("expected to process channel announcement") } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: if isZombie { t.Fatal("expected to not broadcast zombie " + "channel announcement") @@ -2359,8 +2376,8 @@ func TestRejectZombieEdge(t *testing.T) { } } - errChan = ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, + errChan = tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, ) select { case err := <-errChan: @@ -2376,7 +2393,7 @@ func TestRejectZombieEdge(t *testing.T) { t.Fatal("expected to process channel update") } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: if isZombie { t.Fatal("expected to not broadcast zombie " + "channel update") @@ -2393,7 +2410,7 @@ func TestRejectZombieEdge(t *testing.T) { // zombie within the router. This should reject any announcements for // this edge while it remains as a zombie. chanID := batch.chanAnn.ShortChannelID - err = ctx.router.MarkEdgeZombie( + err = tCtx.router.MarkEdgeZombie( chanID, batch.chanAnn.NodeID1, batch.chanAnn.NodeID2, ) if err != nil { @@ -2404,7 +2421,7 @@ func TestRejectZombieEdge(t *testing.T) { // If we then mark the edge as live, the edge's zombie status should be // overridden and the announcements should be processed. - if err := ctx.router.MarkEdgeLive(chanID); err != nil { + if err := tCtx.router.MarkEdgeLive(chanID); err != nil { t.Fatalf("unable mark channel %v as zombie: %v", chanID, err) } @@ -2415,13 +2432,14 @@ func TestRejectZombieEdge(t *testing.T) { // becomes live by receiving a fresh update. func TestProcessZombieEdgeNowLive(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll start by creating our test context with a batch of // announcements. - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "unable to create test context") - batch, err := ctx.createRemoteAnnouncements(0) + batch, err := tCtx.createRemoteAnnouncements(0) require.NoError(t, err, "unable to create announcements") remotePeer := &mockPeer{pk: remoteKeyPriv1.PubKey()} @@ -2435,8 +2453,8 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { processAnnouncement := func(ann lnwire.Message, isZombie, expectsErr bool) { t.Helper() - errChan := ctx.gossiper.ProcessRemoteAnnouncement( - ann, remotePeer, + errChan := tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, ann, remotePeer, ) var err error @@ -2454,7 +2472,7 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { } select { - case msgWithSenders := <-ctx.broadcastedMessage: + case msgWithSenders := <-tCtx.broadcastedMessage: if isZombie { t.Fatal("expected to not broadcast zombie " + "channel message") @@ -2483,7 +2501,7 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { // want to allow a new update from the second node to allow the entire // edge to be resurrected. chanID := batch.chanAnn.ShortChannelID - err = ctx.router.MarkEdgeZombie( + err = tCtx.router.MarkEdgeZombie( chanID, [33]byte{}, batch.chanAnn.NodeID2, ) if err != nil { @@ -2500,7 +2518,7 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { processAnnouncement(batch.chanUpdAnn1, true, true) // At this point, the channel should still be considered a zombie. - _, _, _, err = ctx.router.GetChannelByID(chanID) + _, _, _, err = tCtx.router.GetChannelByID(chanID) require.ErrorIs(t, err, graphdb.ErrZombieEdge) // Attempting to process the current channel update should fail due to @@ -2532,12 +2550,12 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { // until the channel announcement is. Since the channel update indicates // a fresh new update, the gossiper should stash it until it sees the // corresponding channel announcement. - updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, + updateErrChan := tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, ) select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("expected to not broadcast live channel update " + "without announcement") case <-time.After(2 * trickleDelay): @@ -2560,7 +2578,7 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { } select { - case msgWithSenders := <-ctx.broadcastedMessage: + case msgWithSenders := <-tCtx.broadcastedMessage: assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg) case <-time.After(2 * trickleDelay): t.Fatal("expected to broadcast live channel update") @@ -2572,11 +2590,12 @@ func TestProcessZombieEdgeNowLive(t *testing.T) { // be reprocessed later, after our ChannelAnnouncement. func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") - batch, err := ctx.createLocalAnnouncements(0) + batch, err := tCtx.createLocalAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) @@ -2586,12 +2605,12 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) remotePeer := &mockPeer{ - remoteKey, sentMsgs, ctx.gossiper.quit, atomic.Bool{}, + remoteKey, sentMsgs, tCtx.gossiper.quit, atomic.Bool{}, } // Override NotifyWhenOnline to return the remote peer which we expect // messages to be sent to. - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, + tCtx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte, peerChan chan<- lnpeer.Peer) { peerChan <- remotePeer @@ -2600,19 +2619,21 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Recreate the case where the remote node is sending us its ChannelUpdate // before we have been able to process our own ChannelAnnouncement and // ChannelUpdate. - errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, + errRemoteAnn := tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, ) select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer) + err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.nodeAnn2, remotePeer, + ) require.NoError(t, err, "unable to process node ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -2621,7 +2642,9 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // we did not already know about, it should have been added // to the map of premature ChannelUpdates. Check that nothing // was added to the graph. - chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID) + chanInfo, e1, e2, err := tCtx.router.GetChannelByID( + batch.chanUpdAnn1.ShortChannelID, + ) if !errors.Is(err, graphdb.ErrEdgeNotFound) { t.Fatalf("Expected ErrEdgeNotFound, got: %v", err) } @@ -2637,32 +2660,32 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanAnn) + err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanAnn) if err != nil { t.Fatalf("unable to process :%v", err) } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1) + err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1) if err != nil { t.Fatalf("unable to process :%v", err) } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1) + err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1) if err != nil { t.Fatalf("unable to process :%v", err) } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -2689,7 +2712,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } // Check that the ChannelEdgePolicy was added to the graph. - chanInfo, e1, e2, err = ctx.router.GetChannelByID( + chanInfo, e1, e2, err = tCtx.router.GetChannelByID( batch.chanUpdAnn1.ShortChannelID, ) require.NoError(t, err, "unable to get channel from router") @@ -2705,19 +2728,19 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn) + err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn) if err != nil { t.Fatalf("unable to process :%v", err) } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("announcements were broadcast") case <-time.After(2 * trickleDelay): } number := 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2733,8 +2756,8 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Fatal("wrong number of objects in storage") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, + err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, ) if err != nil { t.Fatalf("unable to process :%v", err) @@ -2742,14 +2765,14 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { for i := 0; i < 4; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(time.Second): t.Fatal("announcement wasn't broadcast") } } number = 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( + if err := tCtx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2771,8 +2794,9 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // currently know of. func TestExtraDataChannelAnnouncementValidation(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") remotePeer := &mockPeer{ @@ -2783,7 +2807,7 @@ func TestExtraDataChannelAnnouncementValidation(t *testing.T) { // that we don't know of ourselves, but should still include in the // final signature check. extraBytes := []byte("gotta validate this still!") - ca, err := ctx.createRemoteChannelAnnouncement( + ca, err := tCtx.createRemoteChannelAnnouncement( 0, withExtraBytes(extraBytes), ) require.NoError(t, err, "can't create channel announcement") @@ -2791,7 +2815,9 @@ func TestExtraDataChannelAnnouncementValidation(t *testing.T) { // We'll now send the announcement to the main gossiper. We should be // able to validate this announcement to problem. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, ca, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -2805,9 +2831,10 @@ func TestExtraDataChannelAnnouncementValidation(t *testing.T) { // know of. func TestExtraDataChannelUpdateValidation(t *testing.T) { t.Parallel() + ctx := context.Background() timestamp := testTimestamp - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") remotePeer := &mockPeer{ @@ -2817,7 +2844,7 @@ func TestExtraDataChannelUpdateValidation(t *testing.T) { // In this scenario, we'll create two announcements, one regular // channel announcement, and another channel update announcement, that // has additional data that we won't be interpreting. - chanAnn, err := ctx.createRemoteChannelAnnouncement(0) + chanAnn, err := tCtx.createRemoteChannelAnnouncement(0) require.NoError(t, err, "unable to create chan ann") chanUpdAnn1, err := createUpdateAnnouncement( 0, 0, remoteKeyPriv1, timestamp, @@ -2833,21 +2860,27 @@ func TestExtraDataChannelUpdateValidation(t *testing.T) { // We should be able to properly validate all three messages without // any issue. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanAnn, remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, chanAnn, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanUpdAnn1, remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, chanUpdAnn1, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanUpdAnn2, remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, chanUpdAnn2, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -2859,8 +2892,9 @@ func TestExtraDataChannelUpdateValidation(t *testing.T) { // currently know of. func TestExtraDataNodeAnnouncementValidation(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") remotePeer := &mockPeer{ @@ -2877,7 +2911,9 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) { require.NoError(t, err, "can't create node announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(nodeAnn, remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, nodeAnn, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -2929,11 +2965,12 @@ func assertProcessAnnouncement(t *testing.T, result chan error) { // the retransmit ticker ticks. func TestRetransmit(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") - batch, err := ctx.createLocalAnnouncements(0) + batch, err := tCtx.createLocalAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) @@ -2944,39 +2981,39 @@ func TestRetransmit(t *testing.T) { // announcement. No messages should be broadcasted yet, since no proof // has been exchanged. assertProcessAnnouncement( - t, ctx.gossiper.ProcessLocalAnnouncement(batch.chanAnn), + t, tCtx.gossiper.ProcessLocalAnnouncement(batch.chanAnn), ) - assertBroadcast(t, ctx, 0) + assertBroadcast(t, tCtx, 0) assertProcessAnnouncement( - t, ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1), + t, tCtx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1), ) - assertBroadcast(t, ctx, 0) + assertBroadcast(t, tCtx, 0) assertProcessAnnouncement( - t, ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1), + t, tCtx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1), ) - assertBroadcast(t, ctx, 0) + assertBroadcast(t, tCtx, 0) // Add the remote channel update to the gossiper. Similarly, nothing // should be broadcasted. assertProcessAnnouncement( - t, ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, + t, tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, ), ) - assertBroadcast(t, ctx, 0) + assertBroadcast(t, tCtx, 0) // Now add the local and remote proof to the gossiper, which should // trigger a broadcast of the announcements. assertProcessAnnouncement( - t, ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn), + t, tCtx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn), ) - assertBroadcast(t, ctx, 0) + assertBroadcast(t, tCtx, 0) assertProcessAnnouncement( - t, ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, + t, tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, ), ) @@ -2988,7 +3025,7 @@ func TestRetransmit(t *testing.T) { t.Helper() num := chanAnns + chanUpds + nodeAnns - anns := assertBroadcast(t, ctx, num) + anns := assertBroadcast(t, tCtx, num) // Count the received announcements. var chanAnn, chanUpd, nodeAnn int @@ -3016,12 +3053,15 @@ func TestRetransmit(t *testing.T) { // update. checkAnnouncements(t, 1, 2, 1) + retransmit, ok := tCtx.gossiper.cfg.RetransmitTicker.(*ticker.Force) + require.True(t, ok) + // Now let the retransmit ticker tick, which should trigger updates to // be rebroadcast. now := time.Unix(int64(testTimestamp), 0) future := now.Add(rebroadcastInterval + 10*time.Second) select { - case ctx.gossiper.cfg.RetransmitTicker.(*ticker.Force).Force <- future: + case retransmit.Force <- future: case <-time.After(2 * time.Second): t.Fatalf("unable to force tick") } @@ -3035,11 +3075,12 @@ func TestRetransmit(t *testing.T) { // no existing channels in the graph do not get forwarded. func TestNodeAnnouncementNoChannels(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") - batch, err := ctx.createRemoteAnnouncements(0) + batch, err := tCtx.createRemoteAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) @@ -3048,8 +3089,9 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { // Process the remote node announcement. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, - remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.nodeAnn2, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -3058,7 +3100,7 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { // Since no channels or node announcements were already in the graph, // the node announcement should be ignored, and not forwarded. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -3066,16 +3108,18 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { // Now add the node's channel to the graph by processing the channel // announcement and channel update. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanAnn, - remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanAnn, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, - remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -3083,7 +3127,9 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { // Now process the node announcement again. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.nodeAnn2, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -3093,7 +3139,7 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { // the channel announcement and update be. for i := 0; i < 3; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(time.Second): t.Fatal("announcement wasn't broadcast") } @@ -3102,15 +3148,16 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { // Processing the same node announcement again should be ignored, as it // is stale. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, - remotePeer): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.nodeAnn2, remotePeer, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process announcement") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -3120,11 +3167,12 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { // validate the msg flags and max HTLC field of a ChannelUpdate. func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, 0, false) + tCtx, err := createTestCtx(t, 0, false) require.NoError(t, err, "can't create context") - processRemoteAnnouncement := ctx.gossiper.ProcessRemoteAnnouncement + processRemoteAnnouncement := tCtx.gossiper.ProcessRemoteAnnouncement chanUpdateHeight := uint32(0) timestamp := uint32(123456) @@ -3132,11 +3180,11 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { // In this scenario, we'll test whether the message flags field in a // channel update is properly handled. - chanAnn, err := ctx.createRemoteChannelAnnouncement(chanUpdateHeight) + chanAnn, err := tCtx.createRemoteChannelAnnouncement(chanUpdateHeight) require.NoError(t, err, "can't create channel announcement") select { - case err = <-processRemoteAnnouncement(chanAnn, nodePeer): + case err = <-processRemoteAnnouncement(ctx, chanAnn, nodePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -3156,7 +3204,7 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { } select { - case err = <-processRemoteAnnouncement(chanUpdAnn, nodePeer): + case err = <-processRemoteAnnouncement(ctx, chanUpdAnn, nodePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -3173,7 +3221,7 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { } select { - case err = <-processRemoteAnnouncement(chanUpdAnn, nodePeer): + case err = <-processRemoteAnnouncement(ctx, chanUpdAnn, nodePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -3189,7 +3237,7 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { } select { - case err = <-processRemoteAnnouncement(chanUpdAnn, nodePeer): + case err = <-processRemoteAnnouncement(ctx, chanUpdAnn, nodePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -3206,7 +3254,7 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { } select { - case err = <-processRemoteAnnouncement(chanUpdAnn, nodePeer): + case err = <-processRemoteAnnouncement(ctx, chanUpdAnn, nodePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -3217,13 +3265,14 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { // channel is always sent upon the remote party reconnecting. func TestSendChannelUpdateReliably(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll start by creating our test context and a batch of // announcements. - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "unable to create test context") - batch, err := ctx.createLocalAnnouncements(0) + batch, err := tCtx.createLocalAnnouncements(0) require.NoError(t, err, "can't generate announcements") // We'll also create two keys, one for ourselves and another for the @@ -3236,7 +3285,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // gossiper to the remote peer. sentToPeer := make(chan lnwire.Message, 1) remotePeer := &mockPeer{ - remoteKey, sentToPeer, ctx.gossiper.quit, atomic.Bool{}, + remoteKey, sentToPeer, tCtx.gossiper.quit, atomic.Bool{}, } // Since we first wait to be notified of the peer before attempting to @@ -3244,13 +3293,13 @@ func TestSendChannelUpdateReliably(t *testing.T) { // NotifyWhenOffline to instead give us access to the channel that will // receive the notification. notifyOnline := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte, + tCtx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte, peerChan chan<- lnpeer.Peer) { notifyOnline <- peerChan } notifyOffline := make(chan chan struct{}, 1) - ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( + tCtx.gossiper.reliableSender.cfg.NotifyWhenOffline = func( _ [33]byte) <-chan struct{} { c := make(chan struct{}, 1) @@ -3275,7 +3324,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // Process the channel announcement for which we'll send a channel // update for. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): case <-time.After(2 * time.Second): t.Fatal("did not process local channel announcement") } @@ -3283,14 +3332,14 @@ func TestSendChannelUpdateReliably(t *testing.T) { // It should not be broadcast due to not having an announcement proof. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } // Now, we'll process the channel update. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1): case <-time.After(2 * time.Second): t.Fatal("did not process local channel update") } @@ -3299,7 +3348,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // It should also not be broadcast due to the announcement not having an // announcement proof. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -3348,7 +3397,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // With the new update created, we'll go ahead and process it. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( batch.chanUpdAnn1, ): case <-time.After(2 * time.Second): @@ -3359,7 +3408,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // It should also not be broadcast due to the announcement not having an // announcement proof. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -3387,7 +3436,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // We'll then exchange proofs with the remote peer in order to announce // the channel. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( batch.localProofAnn, ): case <-time.After(2 * time.Second): @@ -3397,7 +3446,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // No messages should be broadcast as we don't have the full proof yet. select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -3406,8 +3455,8 @@ func TestSendChannelUpdateReliably(t *testing.T) { assertMsgSent(batch.localProofAnn) select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote channel proof") @@ -3418,7 +3467,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // channel has been announced. for i := 0; i < 2; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(2 * trickleDelay): t.Fatal("expected channel to be announced") } @@ -3440,7 +3489,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // directly since the reliable sender only applies when the channel is // not announced. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( newChannelUpdate, ): case <-time.After(2 * time.Second): @@ -3448,7 +3497,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { } require.NoError(t, err, "unable to process local channel update") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(2 * trickleDelay): t.Fatal("channel update was not broadcast") } @@ -3495,7 +3544,7 @@ func TestSendChannelUpdateReliably(t *testing.T) { // Since the messages above are now deemed as stale, they should be // removed from the message store. err = wait.NoError(func() error { - msgs, err := ctx.gossiper.cfg.MessageStore.Messages() + msgs, err := tCtx.gossiper.cfg.MessageStore.Messages() if err != nil { return fmt.Errorf("unable to retrieve pending "+ "messages: %v", err) @@ -3533,7 +3582,9 @@ func sendRemoteMsg(t *testing.T, ctx *testCtx, msg lnwire.Message, t.Helper() select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement(msg, remotePeer): + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + context.Background(), msg, remotePeer, + ): if err != nil { t.Fatalf("unable to process channel msg: %v", err) } @@ -3933,14 +3984,15 @@ func (m *SyncManager) markGraphSyncing() { // initial historical sync has completed. func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, 10, false) + tCtx, err := createTestCtx(t, 10, false) require.NoError(t, err, "can't create context") // We'll mark the graph as not synced. This should prevent us from // broadcasting any messages we've received as part of our initial // historical sync. - ctx.gossiper.syncMgr.markGraphSyncing() + tCtx.gossiper.syncMgr.markGraphSyncing() assertBroadcast := func(msg lnwire.Message, isRemote bool, shouldBroadcast bool) { @@ -3952,11 +4004,11 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { } var errChan chan error if isRemote { - errChan = ctx.gossiper.ProcessRemoteAnnouncement( - msg, nodePeer, + errChan = tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, msg, nodePeer, ) } else { - errChan = ctx.gossiper.ProcessLocalAnnouncement(msg) + errChan = tCtx.gossiper.ProcessLocalAnnouncement(msg) } select { @@ -3970,7 +4022,7 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: if !shouldBroadcast { t.Fatal("gossip message was broadcast") } @@ -3983,7 +4035,7 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { // A remote channel announcement should not be broadcast since the graph // has not yet been synced. - chanAnn1, err := ctx.createRemoteChannelAnnouncement(0) + chanAnn1, err := tCtx.createRemoteChannelAnnouncement(0) require.NoError(t, err, "unable to create channel announcement") assertBroadcast(chanAnn1, true, false) @@ -3995,9 +4047,9 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { // Mark the graph as synced, which should allow the channel announcement // should to be broadcast. - ctx.gossiper.syncMgr.markGraphSynced() + tCtx.gossiper.syncMgr.markGraphSynced() - chanAnn2, err := ctx.createRemoteChannelAnnouncement(1) + chanAnn2, err := tCtx.createRemoteChannelAnnouncement(1) require.NoError(t, err, "unable to create channel announcement") assertBroadcast(chanAnn2, true, true) } @@ -4010,15 +4062,16 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { // is tested by TestRateLimitChannelUpdates. func TestRateLimitDeDup(t *testing.T) { t.Parallel() + ctx := context.Background() // Create our test harness. const blockHeight = 100 - ctx, err := createTestCtx(t, blockHeight, false) + tCtx, err := createTestCtx(t, blockHeight, false) require.NoError(t, err, "can't create context") - ctx.gossiper.cfg.RebroadcastInterval = time.Hour + tCtx.gossiper.cfg.RebroadcastInterval = time.Hour var findBaseByAliasCount atomic.Int32 - ctx.gossiper.cfg.FindBaseByAlias = func(alias lnwire.ShortChannelID) ( + tCtx.gossiper.cfg.FindBaseByAlias = func(alias lnwire.ShortChannelID) ( lnwire.ShortChannelID, error) { findBaseByAliasCount.Add(1) @@ -4027,33 +4080,33 @@ func TestRateLimitDeDup(t *testing.T) { } getUpdateEdgeCount := func() int { - ctx.router.mu.Lock() - defer ctx.router.mu.Unlock() + tCtx.router.mu.Lock() + defer tCtx.router.mu.Unlock() - return ctx.router.updateEdgeCount + return tCtx.router.updateEdgeCount } // We set the burst to 2 here. The very first update should not count // towards this _and_ any duplicates should also not count towards it. - ctx.gossiper.cfg.MaxChannelUpdateBurst = 2 - ctx.gossiper.cfg.ChannelUpdateInterval = time.Minute + tCtx.gossiper.cfg.MaxChannelUpdateBurst = 2 + tCtx.gossiper.cfg.ChannelUpdateInterval = time.Minute // The graph should start empty. - require.Empty(t, ctx.router.infos) - require.Empty(t, ctx.router.edges) + require.Empty(t, tCtx.router.infos) + require.Empty(t, tCtx.router.edges) // We'll create a batch of signed announcements, including updates for // both sides, for a channel and process them. They should all be // forwarded as this is our first time learning about the channel. - batch, err := ctx.createRemoteAnnouncements(blockHeight) + batch, err := tCtx.createRemoteAnnouncements(blockHeight) require.NoError(t, err) nodePeer1 := &mockPeer{ remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, } select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanAnn, nodePeer1, + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanAnn, nodePeer1, ): require.NoError(t, err) case <-time.After(time.Second): @@ -4061,8 +4114,8 @@ func TestRateLimitDeDup(t *testing.T) { } select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn1, nodePeer1, + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn1, nodePeer1, ): require.NoError(t, err) case <-time.After(time.Second): @@ -4073,8 +4126,8 @@ func TestRateLimitDeDup(t *testing.T) { remoteKeyPriv2.PubKey(), nil, nil, atomic.Bool{}, } select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, nodePeer2, + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, nodePeer2, ): require.NoError(t, err) case <-time.After(time.Second): @@ -4084,21 +4137,21 @@ func TestRateLimitDeDup(t *testing.T) { timeout := time.After(2 * trickleDelay) for i := 0; i < 3; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-timeout: t.Fatal("expected announcement to be broadcast") } } shortChanID := batch.chanAnn.ShortChannelID.ToUint64() - require.Contains(t, ctx.router.infos, shortChanID) - require.Contains(t, ctx.router.edges, shortChanID) + require.Contains(t, tCtx.router.infos, shortChanID) + require.Contains(t, tCtx.router.edges, shortChanID) // Before we send anymore updates, we want to let our test harness // hang during GetChannelByID so that we can ensure that two threads are // waiting for the chan. pause := make(chan struct{}) - ctx.router.pauseGetChannelByID <- pause + tCtx.router.pauseGetChannelByID <- pause // Take note of how many times FindBaseByAlias has been called. // It should be 2 since we have processed two channel updates. @@ -4123,10 +4176,14 @@ func TestRateLimitDeDup(t *testing.T) { // succession. We wait for both to have hit the FindBaseByAlias check // before we un-pause the GetChannelByID call. go func() { - ctx.gossiper.ProcessRemoteAnnouncement(&update, nodePeer1) + tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, &update, nodePeer1, + ) }() go func() { - ctx.gossiper.ProcessRemoteAnnouncement(&update, nodePeer1) + tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, &update, nodePeer1, + ) }() // We know that both are being processed once the count for @@ -4164,7 +4221,7 @@ func TestRateLimitDeDup(t *testing.T) { t.Helper() select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: require.True(t, shouldBroadcast) case <-time.After(2 * trickleDelay): require.False(t, shouldBroadcast) @@ -4173,8 +4230,8 @@ func TestRateLimitDeDup(t *testing.T) { processUpdate := func(msg lnwire.Message, peer lnpeer.Peer) { select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement( - msg, peer, + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, msg, peer, ): require.NoError(t, err) case <-time.After(time.Second): @@ -4202,31 +4259,32 @@ func TestRateLimitDeDup(t *testing.T) { // channel updates. func TestRateLimitChannelUpdates(t *testing.T) { t.Parallel() + ctx := context.Background() // Create our test harness. const blockHeight = 100 - ctx, err := createTestCtx(t, blockHeight, false) + tCtx, err := createTestCtx(t, blockHeight, false) require.NoError(t, err, "can't create context") - ctx.gossiper.cfg.RebroadcastInterval = time.Hour - ctx.gossiper.cfg.MaxChannelUpdateBurst = 5 - ctx.gossiper.cfg.ChannelUpdateInterval = 5 * time.Second + tCtx.gossiper.cfg.RebroadcastInterval = time.Hour + tCtx.gossiper.cfg.MaxChannelUpdateBurst = 5 + tCtx.gossiper.cfg.ChannelUpdateInterval = 5 * time.Second // The graph should start empty. - require.Empty(t, ctx.router.infos) - require.Empty(t, ctx.router.edges) + require.Empty(t, tCtx.router.infos) + require.Empty(t, tCtx.router.edges) // We'll create a batch of signed announcements, including updates for // both sides, for a channel and process them. They should all be // forwarded as this is our first time learning about the channel. - batch, err := ctx.createRemoteAnnouncements(blockHeight) + batch, err := tCtx.createRemoteAnnouncements(blockHeight) require.NoError(t, err) nodePeer1 := &mockPeer{ remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}, } select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanAnn, nodePeer1, + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanAnn, nodePeer1, ): require.NoError(t, err) case <-time.After(time.Second): @@ -4234,8 +4292,8 @@ func TestRateLimitChannelUpdates(t *testing.T) { } select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn1, nodePeer1, + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn1, nodePeer1, ): require.NoError(t, err) case <-time.After(time.Second): @@ -4246,8 +4304,8 @@ func TestRateLimitChannelUpdates(t *testing.T) { remoteKeyPriv2.PubKey(), nil, nil, atomic.Bool{}, } select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, nodePeer2, + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, nodePeer2, ): require.NoError(t, err) case <-time.After(time.Second): @@ -4257,15 +4315,15 @@ func TestRateLimitChannelUpdates(t *testing.T) { timeout := time.After(2 * trickleDelay) for i := 0; i < 3; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-timeout: t.Fatal("expected announcement to be broadcast") } } shortChanID := batch.chanAnn.ShortChannelID.ToUint64() - require.Contains(t, ctx.router.infos, shortChanID) - require.Contains(t, ctx.router.edges, shortChanID) + require.Contains(t, tCtx.router.infos, shortChanID) + require.Contains(t, tCtx.router.edges, shortChanID) // We'll define a helper to assert whether updates should be rate // limited or not depending on their contents. @@ -4275,14 +4333,16 @@ func TestRateLimitChannelUpdates(t *testing.T) { t.Helper() select { - case err := <-ctx.gossiper.ProcessRemoteAnnouncement(update, peer): + case err := <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, update, peer, + ): require.NoError(t, err) case <-time.After(time.Second): t.Fatal("remote announcement not processed") } select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: if shouldRateLimit { t.Fatal("unexpected channel update broadcast") } @@ -4305,7 +4365,7 @@ func TestRateLimitChannelUpdates(t *testing.T) { keepAliveUpdate := *batch.chanUpdAnn1 keepAliveUpdate.Timestamp = uint32( time.Unix(int64(batch.chanUpdAnn1.Timestamp), 0). - Add(ctx.gossiper.cfg.RebroadcastInterval).Unix(), + Add(tCtx.gossiper.cfg.RebroadcastInterval).Unix(), ) require.NoError(t, signUpdate(remoteKeyPriv1, &keepAliveUpdate)) assertRateLimit(&keepAliveUpdate, nodePeer1, false) @@ -4316,7 +4376,7 @@ func TestRateLimitChannelUpdates(t *testing.T) { // seconds with a max burst of 5 per direction. We'll process the max // burst of one direction first. None of these should be rate limited. updateSameDirection := keepAliveUpdate - for i := uint32(0); i < uint32(ctx.gossiper.cfg.MaxChannelUpdateBurst); i++ { + for i := uint32(0); i < uint32(tCtx.gossiper.cfg.MaxChannelUpdateBurst); i++ { //nolint:ll updateSameDirection.Timestamp++ updateSameDirection.BaseFee++ require.NoError(t, signUpdate(remoteKeyPriv1, &updateSameDirection)) @@ -4339,8 +4399,8 @@ func TestRateLimitChannelUpdates(t *testing.T) { // Wait for the next interval to tick. Since we've only waited for one, // only one more update is allowed. - <-time.After(ctx.gossiper.cfg.ChannelUpdateInterval) - for i := 0; i < ctx.gossiper.cfg.MaxChannelUpdateBurst; i++ { + <-time.After(tCtx.gossiper.cfg.ChannelUpdateInterval) + for i := 0; i < tCtx.gossiper.cfg.MaxChannelUpdateBurst; i++ { updateSameDirection.Timestamp++ updateSameDirection.BaseFee++ require.NoError(t, signUpdate(remoteKeyPriv1, &updateSameDirection)) @@ -4354,11 +4414,12 @@ func TestRateLimitChannelUpdates(t *testing.T) { // about our own channels when coming from a remote peer. func TestIgnoreOwnAnnouncement(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") - batch, err := ctx.createLocalAnnouncements(0) + batch, err := tCtx.createLocalAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) @@ -4367,8 +4428,8 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { // Try to let the remote peer tell us about the channel we are part of. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanAnn, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") @@ -4383,66 +4444,66 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { // update. No messages should be broadcast yet, since we don't have // the announcement signatures. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanAnn): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process channel ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process channel update") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } require.NoError(t, err, "unable to process node ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } // We should accept the remote's channel update and node announcement. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanUpdAnn2, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process channel update") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.nodeAnn2, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.nodeAnn2, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process node ann") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") case <-time.After(2 * trickleDelay): } @@ -4450,21 +4511,23 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { // Now we exchange the proofs, the messages will be broadcasted to the // network. select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn): + case err = <-tCtx.gossiper.ProcessLocalAnnouncement( + batch.localProofAnn, + ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } require.NoError(t, err, "unable to process local proof") select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: t.Fatal("announcements were broadcast") case <-time.After(2 * trickleDelay): } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.remoteProofAnn, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") @@ -4473,7 +4536,7 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { for i := 0; i < 5; i++ { select { - case <-ctx.broadcastedMessage: + case <-tCtx.broadcastedMessage: case <-time.After(time.Second): t.Fatal("announcement wasn't broadcast") } @@ -4482,8 +4545,8 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { // Finally, we again check that we'll ignore the remote giving us // announcements about our own channel. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanAnn, remotePeer, ): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") @@ -4498,13 +4561,14 @@ func TestIgnoreOwnAnnouncement(t *testing.T) { // error. func TestRejectCacheChannelAnn(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, proofMatureDelta, false) + tCtx, err := createTestCtx(t, proofMatureDelta, false) require.NoError(t, err, "can't create context") // First, we create a channel announcement to send over to our test // peer. - batch, err := ctx.createRemoteAnnouncements(0) + batch, err := tCtx.createRemoteAnnouncements(0) require.NoError(t, err, "can't generate announcements") remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:]) @@ -4514,12 +4578,12 @@ func TestRejectCacheChannelAnn(t *testing.T) { // Before sending over the announcement, we'll modify it such that we // know it will always fail. chanID := batch.chanAnn.ShortChannelID.ToUint64() - ctx.router.queueValidationFail(chanID) + tCtx.router.queueValidationFail(chanID) // If we process the batch the first time we should get an error. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanAnn, remotePeer, ): require.NotNil(t, err) case <-time.After(2 * time.Second): @@ -4529,8 +4593,8 @@ func TestRejectCacheChannelAnn(t *testing.T) { // If we process it a *second* time, then we should get an error saying // we rejected it already. select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanAnn, remotePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, batch.chanAnn, remotePeer, ): errStr := err.Error() require.Contains(t, errStr, "recently rejected") @@ -4578,8 +4642,9 @@ func TestFutureMsgCacheEviction(t *testing.T) { // channel announcements are banned properly. func TestChanAnnBanningNonChanPeer(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, 1000, false) + tCtx, err := createTestCtx(t, 1000, false) require.NoError(t, err, "can't create context") nodePeer1 := &mockPeer{ @@ -4594,15 +4659,15 @@ func TestChanAnnBanningNonChanPeer(t *testing.T) { // Craft a valid channel announcement for a channel we don't // have. We will ensure that it fails validation by modifying // the tx script. - ca, err := ctx.createRemoteChannelAnnouncement( + ca, err := tCtx.createRemoteChannelAnnouncement( uint32(i), withFundingTxPrep(fundingTxPrepTypeInvalidOutput), ) require.NoError(t, err, "can't create channel announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - ca, nodePeer1, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, ca, nodePeer1, ): require.ErrorIs(t, err, ErrInvalidFundingOutput) @@ -4612,20 +4677,23 @@ func TestChanAnnBanningNonChanPeer(t *testing.T) { } // The peer should be banned now. - require.True(t, ctx.gossiper.isBanned(nodePeer1.PubKey())) + require.True(t, tCtx.gossiper.isBanned(nodePeer1.PubKey())) // Assert that nodePeer has been disconnected. require.True(t, nodePeer1.disconnected.Load()) // Mark the UTXO as spent so that we get the ErrChannelSpent error and // can thus tests that the gossiper ignores closed channels. - ca, err := ctx.createRemoteChannelAnnouncement( + ca, err := tCtx.createRemoteChannelAnnouncement( 101, withFundingTxPrep(fundingTxPrepTypeSpent), ) require.NoError(t, err, "can't create channel announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer2): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, ca, nodePeer2, + ): + require.ErrorIs(t, err, ErrChannelSpent) case <-time.After(2 * time.Second): @@ -4633,7 +4701,7 @@ func TestChanAnnBanningNonChanPeer(t *testing.T) { } // Check that the announcement's scid is marked as closed. - isClosed, err := ctx.gossiper.cfg.ScidCloser.IsClosedScid( + isClosed, err := tCtx.gossiper.cfg.ScidCloser.IsClosedScid( ca.ShortChannelID, ) require.Nil(t, err) @@ -4645,16 +4713,19 @@ func TestChanAnnBanningNonChanPeer(t *testing.T) { sourceToPub(nodePeer2.IdentityKey()), ) - ctx.gossiper.recentRejects.Delete(key) + tCtx.gossiper.recentRejects.Delete(key) // The validateFundingTransaction method will mark this channel // as a zombie if any error occurs in the chanvalidate.Validate call. // For the sake of the rest of the test, however, we mark it as live // here. - _ = ctx.router.MarkEdgeLive(ca.ShortChannelID) + _ = tCtx.router.MarkEdgeLive(ca.ShortChannelID) select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer2): + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, ca, nodePeer2, + ): + require.ErrorContains(t, err, "ignoring closed channel") case <-time.After(2 * time.Second): @@ -4666,8 +4737,9 @@ func TestChanAnnBanningNonChanPeer(t *testing.T) { // get disconnected. func TestChanAnnBanningChanPeer(t *testing.T) { t.Parallel() + ctx := context.Background() - ctx, err := createTestCtx(t, 1000, true) + tCtx, err := createTestCtx(t, 1000, true) require.NoError(t, err, "can't create context") nodePeer := &mockPeer{remoteKeyPriv1.PubKey(), nil, nil, atomic.Bool{}} @@ -4677,15 +4749,15 @@ func TestChanAnnBanningChanPeer(t *testing.T) { // Craft a valid channel announcement for a channel we don't // have. We will ensure that it fails validation by modifying // the router. - ca, err := ctx.createRemoteChannelAnnouncement( + ca, err := tCtx.createRemoteChannelAnnouncement( uint32(i), withFundingTxPrep(fundingTxPrepTypeInvalidOutput), ) require.NoError(t, err, "can't create channel announcement") select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - ca, nodePeer, + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, ca, nodePeer, ): require.ErrorIs(t, err, ErrInvalidFundingOutput) @@ -4695,7 +4767,7 @@ func TestChanAnnBanningChanPeer(t *testing.T) { } // The peer should be banned now. - require.True(t, ctx.gossiper.isBanned(nodePeer.PubKey())) + require.True(t, tCtx.gossiper.isBanned(nodePeer.PubKey())) // Assert that the peer wasn't disconnected. require.False(t, nodePeer.disconnected.Load()) diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 32a90ae5f..366dc26d2 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -454,7 +454,7 @@ func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) { }() // We'll now attempt to apply the gossip filter for the remote peer. - syncer.ApplyGossipFilter(ctx, remoteHorizon) + require.NoError(t, syncer.ApplyGossipFilter(ctx, remoteHorizon)) // Ensure that the syncer's remote horizon was properly updated. if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) { diff --git a/peer/brontide.go b/peer/brontide.go index da4aa610a..17e9bc409 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1989,9 +1989,13 @@ func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream { // channel announcements. func newDiscMsgStream(p *Brontide) *msgStream { apply := func(msg lnwire.Message) { + // TODO(elle): thread contexts through the peer system properly + // so that a parent context can be passed in here. + ctx := context.TODO() + // TODO(yy): `ProcessRemoteAnnouncement` returns an error chan // and we need to process it. - p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p) + p.cfg.AuthGossiper.ProcessRemoteAnnouncement(ctx, msg, p) } return newMsgStream(