diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ac95d55ba..c1f81df53 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -860,6 +860,8 @@ func (d *AuthenticatedGossiper) stop() { func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, peer lnpeer.Peer) chan error { + ctx := context.TODO() + log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey()) errChan := make(chan error, 1) @@ -907,7 +909,7 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, // If we've found the message target, then we'll dispatch the // message directly to it. - if err := syncer.ApplyGossipFilter(m); err != nil { + if err := syncer.ApplyGossipFilter(ctx, m); err != nil { log.Warnf("Unable to apply gossip filter for peer=%x: "+ "%v", peer.PubKey(), err) @@ -1404,7 +1406,7 @@ func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) { // sendRemoteBatch broadcasts a list of remotely generated announcements to our // peers. -func (d *AuthenticatedGossiper) sendRemoteBatch(_ context.Context, +func (d *AuthenticatedGossiper) sendRemoteBatch(ctx context.Context, annBatch []msgWithSenders) { syncerPeers := d.syncMgr.GossipSyncers() @@ -1413,7 +1415,7 @@ func (d *AuthenticatedGossiper) sendRemoteBatch(_ context.Context, // that have active gossip syncers active. for pub, syncer := range syncerPeers { log.Tracef("Sending messages batch to GossipSyncer(%s)", pub) - syncer.FilterGossipMsgs(annBatch...) + syncer.FilterGossipMsgs(ctx, annBatch...) } for _, msgChunk := range annBatch { diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 65e0774a4..172598848 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -380,7 +380,7 @@ func (m *SyncManager) syncerHandler() { } m.syncersMu.Unlock() - s.Start() + s.Start(context.TODO()) // Once we create the GossipSyncer, we'll signal to the // caller that they can proceed since the SyncManager's diff --git a/discovery/syncer.go b/discovery/syncer.go index da3749050..16a2fa720 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -405,20 +405,22 @@ func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer { // Start starts the GossipSyncer and any goroutines that it needs to carry out // its duties. -func (g *GossipSyncer) Start() { +func (g *GossipSyncer) Start(ctx context.Context) { g.started.Do(func() { log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:]) + ctx, _ := g.cg.Create(ctx) + // TODO(conner): only spawn channelGraphSyncer if remote // supports gossip queries, and only spawn replyHandler if we // advertise support if !g.cfg.noSyncChannels { g.cg.WgAdd(1) - go g.channelGraphSyncer() + go g.channelGraphSyncer(ctx) } if !g.cfg.noReplyQueries { g.cg.WgAdd(1) - go g.replyHandler() + go g.replyHandler(ctx) } }) } @@ -437,9 +439,11 @@ func (g *GossipSyncer) Stop() { // handleSyncingChans handles the state syncingChans for the GossipSyncer. When // in this state, we will send a QueryChannelRange msg to our peer and advance // the syncer's state to waitingQueryRangeReply. -func (g *GossipSyncer) handleSyncingChans() { +func (g *GossipSyncer) handleSyncingChans(ctx context.Context) { // Prepare the query msg. - queryRangeMsg, err := g.genChanRangeQuery(g.genHistoricalChanRangeQuery) + queryRangeMsg, err := g.genChanRangeQuery( + ctx, g.genHistoricalChanRangeQuery, + ) if err != nil { log.Errorf("Unable to gen chan range query: %v", err) return @@ -456,7 +460,6 @@ func (g *GossipSyncer) handleSyncingChans() { // Send the msg to the remote peer, which is non-blocking as // `sendToPeer` only queues the msg in Brontide. - ctx, _ := g.cg.Create(context.Background()) err = g.cfg.sendToPeer(ctx, queryRangeMsg) if err != nil { log.Errorf("Unable to send chan range query: %v", err) @@ -471,7 +474,7 @@ func (g *GossipSyncer) handleSyncingChans() { // channelGraphSyncer is the main goroutine responsible for ensuring that we // properly channel graph state with the remote peer, and also that we only // send them messages which actually pass their defined update horizon. -func (g *GossipSyncer) channelGraphSyncer() { +func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) { defer g.cg.WgDone() for { @@ -488,7 +491,7 @@ func (g *GossipSyncer) channelGraphSyncer() { // understand, as we'll as responding to any other queries by // them. case syncingChans: - g.handleSyncingChans() + g.handleSyncingChans(ctx) // In this state, we've sent out our initial channel range // query and are waiting for the final response from the remote @@ -507,7 +510,9 @@ func (g *GossipSyncer) channelGraphSyncer() { // for the new channels. queryReply, ok := msg.(*lnwire.ReplyChannelRange) if ok { - err := g.processChanRangeReply(queryReply) + err := g.processChanRangeReply( + ctx, queryReply, + ) if err != nil { log.Errorf("Unable to "+ "process chan range "+ @@ -630,13 +635,13 @@ func (g *GossipSyncer) channelGraphSyncer() { // from the state machine maintained on the same node. // // NOTE: This method MUST be run as a goroutine. -func (g *GossipSyncer) replyHandler() { +func (g *GossipSyncer) replyHandler(ctx context.Context) { defer g.cg.WgDone() for { select { case msg := <-g.queryMsgs: - err := g.replyPeerQueries(msg) + err := g.replyPeerQueries(ctx, msg) switch { case err == ErrGossipSyncerExiting: return @@ -759,7 +764,9 @@ func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange, // processChanRangeReply is called each time the GossipSyncer receives a new // reply to the initial range query to discover new channels that it didn't // previously know of. -func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error { +func (g *GossipSyncer) processChanRangeReply(_ context.Context, + msg *lnwire.ReplyChannelRange) error { + // isStale returns whether the timestamp is too far into the past. isStale := func(timestamp time.Time) bool { return time.Since(timestamp) > graph.DefaultChannelPruneExpiry @@ -948,7 +955,7 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro // party when we're kicking off the channel graph synchronization upon // connection. The historicalQuery boolean can be used to generate a query from // the genesis block of the chain. -func (g *GossipSyncer) genChanRangeQuery( +func (g *GossipSyncer) genChanRangeQuery(_ context.Context, historicalQuery bool) (*lnwire.QueryChannelRange, error) { // First, we'll query our channel graph time series for its highest @@ -1005,18 +1012,20 @@ func (g *GossipSyncer) genChanRangeQuery( // replyPeerQueries is called in response to any query by the remote peer. // We'll examine our state and send back our best response. -func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error { +func (g *GossipSyncer) replyPeerQueries(ctx context.Context, + msg lnwire.Message) error { + switch msg := msg.(type) { // In this state, we'll also handle any incoming channel range queries // from the remote peer as they're trying to sync their state as well. case *lnwire.QueryChannelRange: - return g.replyChanRangeQuery(msg) + return g.replyChanRangeQuery(ctx, msg) // If the remote peer skips straight to requesting new channels that // they don't know of, then we'll ensure that we also handle this case. case *lnwire.QueryShortChanIDs: - return g.replyShortChanIDs(msg) + return g.replyShortChanIDs(ctx, msg) default: return fmt.Errorf("unknown message: %T", msg) @@ -1028,7 +1037,9 @@ func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error { // meet the channel range, then chunk our responses to the remote node. We also // ensure that our final fragment carries the "complete" bit to indicate the // end of our streaming response. -func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error { +func (g *GossipSyncer) replyChanRangeQuery(_ context.Context, + query *lnwire.QueryChannelRange) error { + // Before responding, we'll check to ensure that the remote peer is // querying for the same chain that we're on. If not, we'll send back a // response with a complete value of zero to indicate we're on a @@ -1209,7 +1220,9 @@ func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) erro // node for information concerning a set of short channel ID's. Our response // will be sent in a streaming chunked manner to ensure that we remain below // the current transport level message size. -func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error { +func (g *GossipSyncer) replyShortChanIDs(ctx context.Context, + query *lnwire.QueryShortChanIDs) error { + // Before responding, we'll check to ensure that the remote peer is // querying for the same chain that we're on. If not, we'll send back a // response with a complete value of zero to indicate we're on a @@ -1219,8 +1232,6 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error "chain=%v, we're on chain=%v", query.ChainHash, g.cfg.chainHash) - ctx, _ := g.cg.Create(context.Background()) - return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{ ChainHash: query.ChainHash, Complete: 0, @@ -1261,8 +1272,6 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error // Regardless of whether we had any messages to reply with, send over // the sentinel message to signal that the stream has terminated. - ctx, _ := g.cg.Create(context.Background()) - return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{ ChainHash: query.ChainHash, Complete: 1, @@ -1272,7 +1281,9 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error // ApplyGossipFilter applies a gossiper filter sent by the remote node to the // state machine. Once applied, we'll ensure that we don't forward any messages // to the peer that aren't within the time range of the filter. -func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error { +func (g *GossipSyncer) ApplyGossipFilter(_ context.Context, + filter *lnwire.GossipTimestampRange) error { + g.Lock() g.remoteUpdateHorizon = filter @@ -1351,7 +1362,9 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er // FilterGossipMsgs takes a set of gossip messages, and only send it to a peer // iff the message is within the bounds of their set gossip filter. If the peer // doesn't have a gossip filter set, then no messages will be forwarded. -func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) { +func (g *GossipSyncer) FilterGossipMsgs(_ context.Context, + msgs ...msgWithSenders) { + // If the peer doesn't have an update horizon set, then we won't send // it any new update messages. if g.remoteUpdateHorizon == nil { diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index a63997909..32a90ae5f 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -228,6 +228,7 @@ func newTestSyncer(hID lnwire.ShortChannelID, // doesn't have a horizon set, then we won't send any incoming messages to it. func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -249,7 +250,7 @@ func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) { // We'll then attempt to filter the set of messages through the target // peer. - syncer.FilterGossipMsgs(msgs...) + syncer.FilterGossipMsgs(ctx, msgs...) // As the remote peer doesn't yet have a gossip timestamp set, we // shouldn't receive any outbound messages. @@ -273,6 +274,7 @@ func unixStamp(a int64) uint32 { // channel ann that already has a channel update on disk. func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -384,7 +386,7 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) { }() // We'll then instruct the gossiper to filter this set of messages. - syncer.FilterGossipMsgs(msgs...) + syncer.FilterGossipMsgs(ctx, msgs...) // Out of all the messages we sent in, we should only get 2 of them // back. @@ -415,6 +417,7 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) { // messages which are within their desired time horizon. func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -451,7 +454,7 @@ func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) { }() // We'll now attempt to apply the gossip filter for the remote peer. - syncer.ApplyGossipFilter(remoteHorizon) + syncer.ApplyGossipFilter(ctx, remoteHorizon) // Ensure that the syncer's remote horizon was properly updated. if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) { @@ -475,6 +478,7 @@ func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) { // within their desired time horizon. func TestGossipSyncerApplyGossipFilter(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -515,7 +519,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { }() // We'll now attempt to apply the gossip filter for the remote peer. - err := syncer.ApplyGossipFilter(remoteHorizon) + err := syncer.ApplyGossipFilter(ctx, remoteHorizon) require.NoError(t, err, "unable to apply filter") // There should be no messages in the message queue as we didn't send @@ -563,7 +567,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { errCh <- nil } }() - err = syncer.ApplyGossipFilter(remoteHorizon) + err = syncer.ApplyGossipFilter(ctx, remoteHorizon) require.NoError(t, err, "unable to apply filter") // We should get back the exact same message. @@ -594,6 +598,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) { // channels and complete=0. func TestGossipSyncerQueryChannelRangeWrongChainHash(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -609,7 +614,7 @@ func TestGossipSyncerQueryChannelRangeWrongChainHash(t *testing.T) { FirstBlockHeight: 0, NumBlocks: math.MaxUint32, } - err := syncer.replyChanRangeQuery(query) + err := syncer.replyChanRangeQuery(ctx, query) require.NoError(t, err, "unable to process short chan ID's") select { @@ -646,6 +651,7 @@ func TestGossipSyncerQueryChannelRangeWrongChainHash(t *testing.T) { // complete=0. func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -656,7 +662,7 @@ func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) { // We'll now ask the syncer to reply to a chan ID query, but for a // chain that it isn't aware of. - err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{ + err := syncer.replyShortChanIDs(ctx, &lnwire.QueryShortChanIDs{ ChainHash: *chaincfg.SimNetParams.GenesisHash, }) require.NoError(t, err, "unable to process short chan ID's") @@ -695,6 +701,7 @@ func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) { // announcements, as well as an ending ReplyShortChanIDsEnd message. func TestGossipSyncerReplyShortChanIDs(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -745,7 +752,7 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) { // With our set up above complete, we'll now attempt to obtain a reply // from the channel syncer for our target chan ID query. - err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{ + err := syncer.replyShortChanIDs(ctx, &lnwire.QueryShortChanIDs{ ShortChanIDs: queryChanIDs, }) require.NoError(t, err, "unable to query for chan IDs") @@ -800,6 +807,7 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) { // the remote peer. func TestGossipSyncerReplyChanRangeQuery(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll use a smaller chunk size so we can easily test all the edge // cases. @@ -866,7 +874,7 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) { }() // With our goroutine active, we'll now issue the query. - if err := syncer.replyChanRangeQuery(query); err != nil { + if err := syncer.replyChanRangeQuery(ctx, query); err != nil { t.Fatalf("unable to issue query: %v", err) } @@ -971,6 +979,7 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) { // executed with the correct block range. func TestGossipSyncerReplyChanRangeQueryBlockRange(t *testing.T) { t.Parallel() + ctx := context.Background() // First create our test gossip syncer that will handle and // respond to the test queries @@ -1052,7 +1061,8 @@ func TestGossipSyncerReplyChanRangeQueryBlockRange(t *testing.T) { // will be reached go func() { for _, query := range queryReqs { - if err := syncer.replyChanRangeQuery(query); err != nil { + err := syncer.replyChanRangeQuery(ctx, query) + if err != nil { errCh <- fmt.Errorf("unable to issue query: %w", err) return @@ -1083,6 +1093,7 @@ func TestGossipSyncerReplyChanRangeQueryBlockRange(t *testing.T) { // back a single response that signals completion. func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll now create our test gossip syncer that will shortly respond to // our canned query. @@ -1121,7 +1132,7 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) { }() // With our goroutine active, we'll now issue the query. - if err := syncer.replyChanRangeQuery(query); err != nil { + if err := syncer.replyChanRangeQuery(ctx, query); err != nil { t.Fatalf("unable to issue query: %v", err) } @@ -1162,6 +1173,7 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) { // channel ID, we properly generate an correct initial channel range response. func TestGossipSyncerGenChanRangeQuery(t *testing.T) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -1174,7 +1186,7 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) { // If we now ask the syncer to generate an initial range query, it // should return a start height that's back chanRangeQueryBuffer // blocks. - rangeQuery, err := syncer.genChanRangeQuery(false) + rangeQuery, err := syncer.genChanRangeQuery(ctx, false) require.NoError(t, err, "unable to resp") firstHeight := uint32(startingHeight - chanRangeQueryBuffer) @@ -1190,7 +1202,7 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) { // Generating a historical range query should result in a start height // of 0. - rangeQuery, err = syncer.genChanRangeQuery(true) + rangeQuery, err = syncer.genChanRangeQuery(ctx, true) require.NoError(t, err, "unable to resp") if rangeQuery.FirstBlockHeight != 0 { t.Fatalf("incorrect chan range query: expected %v, %v", 0, @@ -1222,6 +1234,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) { // each reply instead. func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { t.Parallel() + ctx := context.Background() // First, we'll create a GossipSyncer instance with a canned sendToPeer // message to allow us to intercept their potential sends. @@ -1234,7 +1247,7 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { startingState := syncer.state - query, err := syncer.genChanRangeQuery(true) + query, err := syncer.genChanRangeQuery(ctx, true) require.NoError(t, err, "unable to generate channel range query") currentTimestamp := time.Now().Unix() @@ -1359,13 +1372,13 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { // We'll begin by sending the syncer a set of non-complete channel // range replies. - if err := syncer.processChanRangeReply(replies[0]); err != nil { + if err := syncer.processChanRangeReply(ctx, replies[0]); err != nil { t.Fatalf("unable to process reply: %v", err) } - if err := syncer.processChanRangeReply(replies[1]); err != nil { + if err := syncer.processChanRangeReply(ctx, replies[1]); err != nil { t.Fatalf("unable to process reply: %v", err) } - if err := syncer.processChanRangeReply(replies[2]); err != nil { + if err := syncer.processChanRangeReply(ctx, replies[2]); err != nil { t.Fatalf("unable to process reply: %v", err) } @@ -1427,7 +1440,7 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { // If we send the final message, then we should transition to // queryNewChannels as we've sent a non-empty set of new channels. - if err := syncer.processChanRangeReply(replies[3]); err != nil { + if err := syncer.processChanRangeReply(ctx, replies[3]); err != nil { t.Fatalf("unable to process reply: %v", err) } @@ -1690,6 +1703,7 @@ func queryBatch(t *testing.T, // them. func TestGossipSyncerRoutineSync(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll modify the chunk size to be a smaller value, so we can ensure // our chunk parsing works properly. With this value we should get 3 @@ -1704,13 +1718,13 @@ func TestGossipSyncerRoutineSync(t *testing.T) { msgChan1, syncer1, chanSeries1 := newTestSyncer( highestID, defaultEncoding, chunkSize, true, false, ) - syncer1.Start() + syncer1.Start(ctx) defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( highestID, defaultEncoding, chunkSize, false, true, ) - syncer2.Start() + syncer2.Start(ctx) defer syncer2.Stop() // Although both nodes are at the same height, syncer will have 3 chan @@ -1837,6 +1851,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) { // final state and not perform any channel queries. func TestGossipSyncerAlreadySynced(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll modify the chunk size to be a smaller value, so we can ensure // our chunk parsing works properly. With this value we should get 3 @@ -1852,13 +1867,13 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { msgChan1, syncer1, chanSeries1 := newTestSyncer( highestID, defaultEncoding, chunkSize, ) - syncer1.Start() + syncer1.Start(ctx) defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( highestID, defaultEncoding, chunkSize, ) - syncer2.Start() + syncer2.Start(ctx) defer syncer2.Stop() // The channel state of both syncers will be identical. They should @@ -2058,6 +2073,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { // carries out its duties when accepting a new sync transition request. func TestGossipSyncerSyncTransitions(t *testing.T) { t.Parallel() + ctx := context.Background() assertMsgSent := func(t *testing.T, msgChan chan []lnwire.Message, msg lnwire.Message) { @@ -2178,7 +2194,7 @@ func TestGossipSyncerSyncTransitions(t *testing.T) { // We'll then start the syncer in order to process the // request. - syncer.Start() + syncer.Start(ctx) defer syncer.Stop() syncer.ProcessSyncTransition(test.finalSyncType) @@ -2203,6 +2219,7 @@ func TestGossipSyncerSyncTransitions(t *testing.T) { // historical sync with the remote peer. func TestGossipSyncerHistoricalSync(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll create a new gossip syncer and manually override its state to // chansSynced. This is necessary as the syncer can only process @@ -2214,7 +2231,7 @@ func TestGossipSyncerHistoricalSync(t *testing.T) { syncer.setSyncType(PassiveSync) syncer.setSyncState(chansSynced) - syncer.Start() + syncer.Start(ctx) defer syncer.Stop() syncer.historicalSync() @@ -2247,6 +2264,7 @@ func TestGossipSyncerHistoricalSync(t *testing.T) { // syncer reaches its terminal chansSynced state. func TestGossipSyncerSyncedSignal(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll create a new gossip syncer and manually override its state to // chansSynced. @@ -2261,7 +2279,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) { signalChan := syncer.ResetSyncedSignal() // Starting the gossip syncer should cause the signal to be delivered. - syncer.Start() + syncer.Start(ctx) select { case <-signalChan: @@ -2280,7 +2298,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) { syncer.setSyncState(chansSynced) - syncer.Start() + syncer.Start(ctx) defer syncer.Stop() signalChan = syncer.ResetSyncedSignal() @@ -2299,6 +2317,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) { // said limit are not processed. func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) { t.Parallel() + ctx := context.Background() msgChan, syncer, chanSeries := newTestSyncer( lnwire.ShortChannelID{BlockHeight: latestKnownHeight}, @@ -2309,7 +2328,7 @@ func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) { // the sake of testing. syncer.cfg.maxQueryChanRangeReplies = 100 - syncer.Start() + syncer.Start(ctx) defer syncer.Stop() // Upon initialization, the syncer should submit a QueryChannelRange