discovery: thread contexts to syncer

The `GossiperSyncer` makes various calls to the `ChannelGraphTimeSeries`
interface which threads through to the graph DB. So in preparation for
threading context through to all the methods on that interface, we
update the GossipSyncer accordingly by passing contexts through.

Two `context.TODO()`s are added in this commit. They will be removed in
the upcoming commits.
This commit is contained in:
Elle Mouton
2025-04-07 10:17:35 +02:00
parent 4a30d6243d
commit 2a5235a79f
4 changed files with 89 additions and 55 deletions

View File

@@ -228,6 +228,7 @@ func newTestSyncer(hID lnwire.ShortChannelID,
// doesn't have a horizon set, then we won't send any incoming messages to it.
func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -249,7 +250,7 @@ func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
// We'll then attempt to filter the set of messages through the target
// peer.
syncer.FilterGossipMsgs(msgs...)
syncer.FilterGossipMsgs(ctx, msgs...)
// As the remote peer doesn't yet have a gossip timestamp set, we
// shouldn't receive any outbound messages.
@@ -273,6 +274,7 @@ func unixStamp(a int64) uint32 {
// channel ann that already has a channel update on disk.
func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -384,7 +386,7 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
}()
// We'll then instruct the gossiper to filter this set of messages.
syncer.FilterGossipMsgs(msgs...)
syncer.FilterGossipMsgs(ctx, msgs...)
// Out of all the messages we sent in, we should only get 2 of them
// back.
@@ -415,6 +417,7 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
// messages which are within their desired time horizon.
func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -451,7 +454,7 @@ func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) {
}()
// We'll now attempt to apply the gossip filter for the remote peer.
syncer.ApplyGossipFilter(remoteHorizon)
syncer.ApplyGossipFilter(ctx, remoteHorizon)
// Ensure that the syncer's remote horizon was properly updated.
if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) {
@@ -475,6 +478,7 @@ func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) {
// within their desired time horizon.
func TestGossipSyncerApplyGossipFilter(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -515,7 +519,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
}()
// We'll now attempt to apply the gossip filter for the remote peer.
err := syncer.ApplyGossipFilter(remoteHorizon)
err := syncer.ApplyGossipFilter(ctx, remoteHorizon)
require.NoError(t, err, "unable to apply filter")
// There should be no messages in the message queue as we didn't send
@@ -563,7 +567,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
errCh <- nil
}
}()
err = syncer.ApplyGossipFilter(remoteHorizon)
err = syncer.ApplyGossipFilter(ctx, remoteHorizon)
require.NoError(t, err, "unable to apply filter")
// We should get back the exact same message.
@@ -594,6 +598,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
// channels and complete=0.
func TestGossipSyncerQueryChannelRangeWrongChainHash(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -609,7 +614,7 @@ func TestGossipSyncerQueryChannelRangeWrongChainHash(t *testing.T) {
FirstBlockHeight: 0,
NumBlocks: math.MaxUint32,
}
err := syncer.replyChanRangeQuery(query)
err := syncer.replyChanRangeQuery(ctx, query)
require.NoError(t, err, "unable to process short chan ID's")
select {
@@ -646,6 +651,7 @@ func TestGossipSyncerQueryChannelRangeWrongChainHash(t *testing.T) {
// complete=0.
func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -656,7 +662,7 @@ func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
// We'll now ask the syncer to reply to a chan ID query, but for a
// chain that it isn't aware of.
err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{
err := syncer.replyShortChanIDs(ctx, &lnwire.QueryShortChanIDs{
ChainHash: *chaincfg.SimNetParams.GenesisHash,
})
require.NoError(t, err, "unable to process short chan ID's")
@@ -695,6 +701,7 @@ func TestGossipSyncerReplyShortChanIDsWrongChainHash(t *testing.T) {
// announcements, as well as an ending ReplyShortChanIDsEnd message.
func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -745,7 +752,7 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
// With our set up above complete, we'll now attempt to obtain a reply
// from the channel syncer for our target chan ID query.
err := syncer.replyShortChanIDs(&lnwire.QueryShortChanIDs{
err := syncer.replyShortChanIDs(ctx, &lnwire.QueryShortChanIDs{
ShortChanIDs: queryChanIDs,
})
require.NoError(t, err, "unable to query for chan IDs")
@@ -800,6 +807,7 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
// the remote peer.
func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll use a smaller chunk size so we can easily test all the edge
// cases.
@@ -866,7 +874,7 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
}()
// With our goroutine active, we'll now issue the query.
if err := syncer.replyChanRangeQuery(query); err != nil {
if err := syncer.replyChanRangeQuery(ctx, query); err != nil {
t.Fatalf("unable to issue query: %v", err)
}
@@ -971,6 +979,7 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
// executed with the correct block range.
func TestGossipSyncerReplyChanRangeQueryBlockRange(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First create our test gossip syncer that will handle and
// respond to the test queries
@@ -1052,7 +1061,8 @@ func TestGossipSyncerReplyChanRangeQueryBlockRange(t *testing.T) {
// will be reached
go func() {
for _, query := range queryReqs {
if err := syncer.replyChanRangeQuery(query); err != nil {
err := syncer.replyChanRangeQuery(ctx, query)
if err != nil {
errCh <- fmt.Errorf("unable to issue query: %w",
err)
return
@@ -1083,6 +1093,7 @@ func TestGossipSyncerReplyChanRangeQueryBlockRange(t *testing.T) {
// back a single response that signals completion.
func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll now create our test gossip syncer that will shortly respond to
// our canned query.
@@ -1121,7 +1132,7 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
}()
// With our goroutine active, we'll now issue the query.
if err := syncer.replyChanRangeQuery(query); err != nil {
if err := syncer.replyChanRangeQuery(ctx, query); err != nil {
t.Fatalf("unable to issue query: %v", err)
}
@@ -1162,6 +1173,7 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
// channel ID, we properly generate an correct initial channel range response.
func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -1174,7 +1186,7 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
// If we now ask the syncer to generate an initial range query, it
// should return a start height that's back chanRangeQueryBuffer
// blocks.
rangeQuery, err := syncer.genChanRangeQuery(false)
rangeQuery, err := syncer.genChanRangeQuery(ctx, false)
require.NoError(t, err, "unable to resp")
firstHeight := uint32(startingHeight - chanRangeQueryBuffer)
@@ -1190,7 +1202,7 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
// Generating a historical range query should result in a start height
// of 0.
rangeQuery, err = syncer.genChanRangeQuery(true)
rangeQuery, err = syncer.genChanRangeQuery(ctx, true)
require.NoError(t, err, "unable to resp")
if rangeQuery.FirstBlockHeight != 0 {
t.Fatalf("incorrect chan range query: expected %v, %v", 0,
@@ -1222,6 +1234,7 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
// each reply instead.
func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
t.Parallel()
ctx := context.Background()
// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
@@ -1234,7 +1247,7 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
startingState := syncer.state
query, err := syncer.genChanRangeQuery(true)
query, err := syncer.genChanRangeQuery(ctx, true)
require.NoError(t, err, "unable to generate channel range query")
currentTimestamp := time.Now().Unix()
@@ -1359,13 +1372,13 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
// We'll begin by sending the syncer a set of non-complete channel
// range replies.
if err := syncer.processChanRangeReply(replies[0]); err != nil {
if err := syncer.processChanRangeReply(ctx, replies[0]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
if err := syncer.processChanRangeReply(replies[1]); err != nil {
if err := syncer.processChanRangeReply(ctx, replies[1]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
if err := syncer.processChanRangeReply(replies[2]); err != nil {
if err := syncer.processChanRangeReply(ctx, replies[2]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
@@ -1427,7 +1440,7 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
// If we send the final message, then we should transition to
// queryNewChannels as we've sent a non-empty set of new channels.
if err := syncer.processChanRangeReply(replies[3]); err != nil {
if err := syncer.processChanRangeReply(ctx, replies[3]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
@@ -1690,6 +1703,7 @@ func queryBatch(t *testing.T,
// them.
func TestGossipSyncerRoutineSync(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
@@ -1704,13 +1718,13 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
msgChan1, syncer1, chanSeries1 := newTestSyncer(
highestID, defaultEncoding, chunkSize, true, false,
)
syncer1.Start()
syncer1.Start(ctx)
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
highestID, defaultEncoding, chunkSize, false, true,
)
syncer2.Start()
syncer2.Start(ctx)
defer syncer2.Stop()
// Although both nodes are at the same height, syncer will have 3 chan
@@ -1837,6 +1851,7 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
// final state and not perform any channel queries.
func TestGossipSyncerAlreadySynced(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
@@ -1852,13 +1867,13 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
msgChan1, syncer1, chanSeries1 := newTestSyncer(
highestID, defaultEncoding, chunkSize,
)
syncer1.Start()
syncer1.Start(ctx)
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
highestID, defaultEncoding, chunkSize,
)
syncer2.Start()
syncer2.Start(ctx)
defer syncer2.Stop()
// The channel state of both syncers will be identical. They should
@@ -2058,6 +2073,7 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
// carries out its duties when accepting a new sync transition request.
func TestGossipSyncerSyncTransitions(t *testing.T) {
t.Parallel()
ctx := context.Background()
assertMsgSent := func(t *testing.T, msgChan chan []lnwire.Message,
msg lnwire.Message) {
@@ -2178,7 +2194,7 @@ func TestGossipSyncerSyncTransitions(t *testing.T) {
// We'll then start the syncer in order to process the
// request.
syncer.Start()
syncer.Start(ctx)
defer syncer.Stop()
syncer.ProcessSyncTransition(test.finalSyncType)
@@ -2203,6 +2219,7 @@ func TestGossipSyncerSyncTransitions(t *testing.T) {
// historical sync with the remote peer.
func TestGossipSyncerHistoricalSync(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll create a new gossip syncer and manually override its state to
// chansSynced. This is necessary as the syncer can only process
@@ -2214,7 +2231,7 @@ func TestGossipSyncerHistoricalSync(t *testing.T) {
syncer.setSyncType(PassiveSync)
syncer.setSyncState(chansSynced)
syncer.Start()
syncer.Start(ctx)
defer syncer.Stop()
syncer.historicalSync()
@@ -2247,6 +2264,7 @@ func TestGossipSyncerHistoricalSync(t *testing.T) {
// syncer reaches its terminal chansSynced state.
func TestGossipSyncerSyncedSignal(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll create a new gossip syncer and manually override its state to
// chansSynced.
@@ -2261,7 +2279,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) {
signalChan := syncer.ResetSyncedSignal()
// Starting the gossip syncer should cause the signal to be delivered.
syncer.Start()
syncer.Start(ctx)
select {
case <-signalChan:
@@ -2280,7 +2298,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) {
syncer.setSyncState(chansSynced)
syncer.Start()
syncer.Start(ctx)
defer syncer.Stop()
signalChan = syncer.ResetSyncedSignal()
@@ -2299,6 +2317,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) {
// said limit are not processed.
func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) {
t.Parallel()
ctx := context.Background()
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.ShortChannelID{BlockHeight: latestKnownHeight},
@@ -2309,7 +2328,7 @@ func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) {
// the sake of testing.
syncer.cfg.maxQueryChanRangeReplies = 100
syncer.Start()
syncer.Start(ctx)
defer syncer.Stop()
// Upon initialization, the syncer should submit a QueryChannelRange