From 3fdb9b84f430b6f806b7c3cefcc92fbb714b9231 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 17 Jun 2025 21:10:17 +0200 Subject: [PATCH] graph/db: thread context to AddChannelEdge --- autopilot/prefattach_test.go | 2 +- discovery/gossiper.go | 2 +- discovery/gossiper_test.go | 4 +- graph/builder.go | 8 ++-- graph/builder_test.go | 35 +++++++++------ graph/db/graph.go | 6 +-- graph/db/graph_test.go | 82 +++++++++++++++++++----------------- graph/db/interfaces.go | 2 +- graph/db/kv_store.go | 6 +-- graph/db/sql_store.go | 6 +-- graph/interfaces.go | 4 +- graph/notifications_test.go | 10 +++-- lnrpc/devrpc/dev_server.go | 2 +- routing/pathfind_test.go | 4 +- routing/router_test.go | 4 +- server.go | 2 +- 16 files changed, 94 insertions(+), 85 deletions(-) diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index 8d785a81a..5127936fb 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -488,7 +488,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey, Capacity: capacity, } edge.AddNodeKeys(lnNode1, lnNode2, lnNode1, lnNode2) - if err := d.db.AddChannelEdge(edge); err != nil { + if err := d.db.AddChannelEdge(ctx, edge); err != nil { return nil, nil, err } edgePolicy := &models.ChannelEdgePolicy{ diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 2f390da5b..be36ac2cb 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2808,7 +2808,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context, // We will add the edge to the channel router. If the nodes present in // this channel are not present in the database, a partial node will be // added to represent each node while we wait for a node announcement. - err = d.cfg.Graph.AddEdge(edge, ops...) + err = d.cfg.Graph.AddEdge(ctx, edge, ops...) if err != nil { log.Debugf("Graph rejected edge for short_chan_id(%v): %v", scid.ToUint64(), err) diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index fb219f41c..0c845118e 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -136,8 +136,8 @@ func (r *mockGraphSource) IsZombieEdge(chanID lnwire.ShortChannelID) (bool, return ok, nil } -func (r *mockGraphSource) AddEdge(info *models.ChannelEdgeInfo, - _ ...batch.SchedulerOption) error { +func (r *mockGraphSource) AddEdge(_ context.Context, + info *models.ChannelEdgeInfo, _ ...batch.SchedulerOption) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/graph/builder.go b/graph/builder.go index ce8dff179..df34cf28a 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -1017,10 +1017,10 @@ func (b *Builder) addNode(ctx context.Context, node *models.LightningNode, // in construction of payment path. // // NOTE: This method is part of the ChannelGraphSource interface. -func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, +func (b *Builder) AddEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { - err := b.addEdge(edge, op...) + err := b.addEdge(ctx, edge, op...) if err != nil { logNetworkMsgProcessError(err) @@ -1038,7 +1038,7 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, // // TODO(elle): this currently also does funding-transaction validation. But this // should be moved to the gossiper instead. -func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, +func (b *Builder) addEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID) @@ -1061,7 +1061,7 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, edge.ChannelID) } - if err := b.cfg.Graph.AddChannelEdge(edge, op...); err != nil { + if err := b.cfg.Graph.AddChannelEdge(ctx, edge, op...); err != nil { return fmt.Errorf("unable to add edge: %w", err) } diff --git a/graph/builder_test.go b/graph/builder_test.go index 4da93297d..94dc8c057 100644 --- a/graph/builder_test.go +++ b/graph/builder_test.go @@ -44,6 +44,7 @@ const ( // info was added to the database. func TestAddProof(t *testing.T) { t.Parallel() + ctxb := context.Background() ctx := createTestCtxSingleNode(t, 0) @@ -75,7 +76,7 @@ func TestAddProof(t *testing.T) { copy(edge.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - require.NoError(t, ctx.builder.AddEdge(edge)) + require.NoError(t, ctx.builder.AddEdge(ctxb, edge)) // Now we'll attempt to update the proof and check that it has been // properly updated. @@ -117,6 +118,7 @@ func TestIgnoreNodeAnnouncement(t *testing.T) { // ignore a channel policy for a channel not in the graph. func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 @@ -176,9 +178,9 @@ func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { } // Add the edge. - require.NoErrorf(t, ctx.builder.AddEdge(edge), "expected to be able "+ - "to add edge to the channel graph, even though the vertexes "+ - "were unknown: %v.", err) + require.NoErrorf(t, ctx.builder.AddEdge(ctxb, edge), + "expected to be able to add edge to the channel graph, even "+ + "though the vertexes were unknown: %v.", err) // Now updating the edge policy should succeed. require.NoError(t, ctx.builder.UpdateEdge(edgePolicy)) @@ -190,6 +192,7 @@ func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { // confirmed on the stale chain, and resync to the main chain. func TestWakeUpOnStaleBranch(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -283,7 +286,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) { copy(edge1.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge1.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge1); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge1); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -302,7 +305,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) { copy(edge2.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge2.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge2); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge2); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -401,6 +404,7 @@ func TestWakeUpOnStaleBranch(t *testing.T) { // it is active. func TestDisconnectedBlocks(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -492,7 +496,7 @@ func TestDisconnectedBlocks(t *testing.T) { copy(edge1.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge1.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge1); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge1); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -513,7 +517,7 @@ func TestDisconnectedBlocks(t *testing.T) { copy(edge2.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge2.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge2); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge2); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -599,6 +603,7 @@ func TestDisconnectedBlocks(t *testing.T) { // ChannelRouter, then the channels are properly pruned. func TestChansClosedOfflinePruneGraph(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -644,7 +649,7 @@ func TestChansClosedOfflinePruneGraph(t *testing.T) { } copy(edge1.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge1.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge1); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge1); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1047,7 +1052,7 @@ func TestIsStaleNode(t *testing.T) { AuthProof: nil, FundingScript: fn.Some(script), } - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1092,6 +1097,7 @@ func TestIsStaleNode(t *testing.T) { // channel announcements. func TestIsKnownEdge(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -1125,7 +1131,7 @@ func TestIsKnownEdge(t *testing.T) { AuthProof: nil, FundingScript: fn.Some(script), } - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1140,6 +1146,7 @@ func TestIsKnownEdge(t *testing.T) { // stale channel edge update announcements. func TestIsStaleEdgePolicy(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxFromFile(t, startingBlockHeight, basicGraphFilePath) @@ -1183,7 +1190,7 @@ func TestIsStaleEdgePolicy(t *testing.T) { AuthProof: nil, FundingScript: fn.Some(script), } - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -1501,7 +1508,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( ), } - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) if err != nil && !errors.Is(err, graphdb.ErrEdgeAlreadyExist) { return nil, err } @@ -1861,7 +1868,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, BitcoinKey2Bytes: node2Vertex, } - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) if err != nil && !errors.Is(err, graphdb.ErrEdgeAlreadyExist) { diff --git a/graph/db/graph.go b/graph/db/graph.go index fb2a9975a..1fda4afe3 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -307,10 +307,10 @@ func (c *ChannelGraph) DeleteLightningNode(ctx context.Context, // involved in creation of the channel, and the set of features that the channel // supports. The chanPoint and chanID are used to uniquely identify the edge // globally within the database. -func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo, - op ...batch.SchedulerOption) error { +func (c *ChannelGraph) AddChannelEdge(ctx context.Context, + edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { - err := c.V1Store.AddChannelEdge(edge, op...) + err := c.V1Store.AddChannelEdge(ctx, edge, op...) if err != nil { return err } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 59488d4e0..1b54a8832 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -276,7 +276,7 @@ func TestPartialNode(t *testing.T) { // Create an edge attached to these nodes and add it to the graph. edgeInfo, _ := createEdge(140, 0, 0, 0, &node1, &node2) - require.NoError(t, graph.AddChannelEdge(&edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, &edgeInfo)) // Both of the nodes should now be in both the graph (as partial/shell) // nodes _and_ the cache should also have an awareness of both nodes. @@ -390,6 +390,7 @@ func TestSourceNode(t *testing.T) { // TestEdgeInsertionDeletion tests the basic CRUD operations for channel edges. func TestEdgeInsertionDeletion(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraph(t) @@ -429,12 +430,12 @@ func TestEdgeInsertionDeletion(t *testing.T) { copy(edgeInfo.BitcoinKey1Bytes[:], node1Pub.SerializeCompressed()) copy(edgeInfo.BitcoinKey2Bytes[:], node2Pub.SerializeCompressed()) - require.NoError(t, graph.AddChannelEdge(&edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, &edgeInfo)) assertEdgeWithNoPoliciesInCache(t, graph, &edgeInfo) // Show that trying to insert the same channel again will return the // expected error. - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) require.ErrorIs(t, err, ErrEdgeAlreadyExist) // Ensure that both policies are returned as unknown (nil). @@ -562,15 +563,15 @@ func TestDisconnectBlockAtHeight(t *testing.T) { edgeInfo3, _ := createEdge(height-1, 0, 0, 2, node1, node2) // Now add all these new edges to the database. - if err := graph.AddChannelEdge(&edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } - if err := graph.AddChannelEdge(&edgeInfo2); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo2); err != nil { t.Fatalf("unable to create channel edge: %v", err) } - if err := graph.AddChannelEdge(&edgeInfo3); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo3); err != nil { t.Fatalf("unable to create channel edge: %v", err) } assertEdgeWithNoPoliciesInCache(t, graph, &edgeInfo) @@ -836,7 +837,7 @@ func TestEdgeInfoUpdates(t *testing.T) { require.Len(t, graph.graphCache.nodeChannels, 0) // Add the edge info. - if err := graph.AddChannelEdge(edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } assertEdgeWithNoPoliciesInCache(t, graph, edgeInfo) @@ -1127,6 +1128,7 @@ func newEdgePolicy(chanID uint64, updateTime int64) *models.ChannelEdgePolicy { // TestAddEdgeProof tests the ability to add an edge proof to an existing edge. func TestAddEdgeProof(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraph(t) @@ -1134,7 +1136,7 @@ func TestAddEdgeProof(t *testing.T) { node1 := createTestVertex(t) node2 := createTestVertex(t) edge1, _, _ := createChannelEdge(node1, node2, withSkipProofs()) - require.NoError(t, graph.AddChannelEdge(edge1)) + require.NoError(t, graph.AddChannelEdge(ctx, edge1)) // Fetch the edge and assert that the proof is nil and that the rest // of the edge info is correct. @@ -1155,7 +1157,7 @@ func TestAddEdgeProof(t *testing.T) { // to call AddChannelEdge again - this should fail due to the channel // already existing. edge1.AuthProof = proof - err = graph.AddChannelEdge(edge1) + err = graph.AddChannelEdge(ctx, edge1) require.Error(t, err, ErrEdgeAlreadyExist) // Now add just the proof. @@ -1171,7 +1173,7 @@ func TestAddEdgeProof(t *testing.T) { // For completeness, also test the case where we insert a new edge with // an edge proof. Show that the proof is present from the get go. edge2, _, _ := createChannelEdge(node1, node2) - require.NoError(t, graph.AddChannelEdge(edge2)) + require.NoError(t, graph.AddChannelEdge(ctx, edge2)) // Fetch the edge and assert that the proof is nil and that the rest // of the edge info is correct. @@ -1211,11 +1213,11 @@ func TestForEachSourceNodeChannel(t *testing.T) { nodeD := createTestVertex(t) abEdge, abPolicy1, abPolicy2 := createChannelEdge(nodeA, nodeB) - require.NoError(t, graph.AddChannelEdge(abEdge)) + require.NoError(t, graph.AddChannelEdge(ctx, abEdge)) acEdge, acPolicy1, acPolicy2 := createChannelEdge(nodeA, nodeC) - require.NoError(t, graph.AddChannelEdge(acEdge)) + require.NoError(t, graph.AddChannelEdge(ctx, acEdge)) bdEdge, _, _ := createChannelEdge(nodeB, nodeD) - require.NoError(t, graph.AddChannelEdge(bdEdge)) + require.NoError(t, graph.AddChannelEdge(ctx, bdEdge)) // Figure out which of the policies returned above are node A's so that // we know which to persist. @@ -1555,7 +1557,7 @@ func fillTestGraph(t testing.TB, graph *ChannelGraph, numNodes, copy(edgeInfo.NodeKey2Bytes[:], node2.PubKeyBytes[:]) copy(edgeInfo.BitcoinKey1Bytes[:], node1.PubKeyBytes[:]) copy(edgeInfo.BitcoinKey2Bytes[:], node2.PubKeyBytes[:]) - err := graph.AddChannelEdge(&edgeInfo) + err := graph.AddChannelEdge(ctx, &edgeInfo) require.NoError(t, err) // Create and add an edge with random data that points @@ -1740,7 +1742,7 @@ func TestGraphPruning(t *testing.T) { edgeInfo.BitcoinKey2Bytes[:], graphNodes[i+1].PubKeyBytes[:], ) - if err := graph.AddChannelEdge(&edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo); err != nil { t.Fatalf("unable to add node: %v", err) } @@ -1873,6 +1875,7 @@ func TestGraphPruning(t *testing.T) { // known channel ID in the database. func TestHighestChanID(t *testing.T) { t.Parallel() + ctx := context.Background() graph := MakeTestGraphNew(t) @@ -1895,10 +1898,10 @@ func TestHighestChanID(t *testing.T) { edge1, _ := createEdge(10, 0, 0, 0, node1, node2) edge2, chanID2 := createEdge(100, 0, 0, 0, node1, node2) - if err := graph.AddChannelEdge(&edge1); err != nil { + if err := graph.AddChannelEdge(ctx, &edge1); err != nil { t.Fatalf("unable to create channel edge: %v", err) } - if err := graph.AddChannelEdge(&edge2); err != nil { + if err := graph.AddChannelEdge(ctx, &edge2); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -1915,7 +1918,7 @@ func TestHighestChanID(t *testing.T) { // If we add another edge, then the current best chan ID should be // updated as well. edge3, chanID3 := createEdge(1000, 0, 0, 0, node1, node2) - if err := graph.AddChannelEdge(&edge3); err != nil { + if err := graph.AddChannelEdge(ctx, &edge3); err != nil { t.Fatalf("unable to create channel edge: %v", err) } bestID, err = graph.HighestChanID() @@ -1968,7 +1971,7 @@ func TestChanUpdatesInHorizon(t *testing.T) { uint32(i*10), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -2309,7 +2312,7 @@ func TestFilterKnownChanIDs(t *testing.T) { uint32(i*10), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -2324,7 +2327,7 @@ func TestFilterKnownChanIDs(t *testing.T) { channel, chanID := createEdge( uint32(i*10+1), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } err := graph.DeleteChannelEdges(false, true, channel.ChannelID) @@ -2681,7 +2684,7 @@ func TestStressTestChannelGraphAPI(t *testing.T) { fn: func() error { channel := addNewChan() - return graph.AddChannelEdge(&channel.info) + return graph.AddChannelEdge(ctx, &channel.info) }, }, } @@ -2785,12 +2788,12 @@ func TestFilterChannelRange(t *testing.T) { channel1, chanID1 := createEdge( chanHeight, uint32(i+1), 0, 0, node1, node2, ) - require.NoError(t, graph.AddChannelEdge(&channel1)) + require.NoError(t, graph.AddChannelEdge(ctx, &channel1)) channel2, chanID2 := createEdge( chanHeight, uint32(i+2), 0, 0, node1, node2, ) - require.NoError(t, graph.AddChannelEdge(&channel2)) + require.NoError(t, graph.AddChannelEdge(ctx, &channel2)) chanInfo1 := NewChannelUpdateInfo( chanID1, time.Time{}, time.Time{}, @@ -2966,7 +2969,7 @@ func TestFetchChanInfos(t *testing.T) { uint32(i*10), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -3007,7 +3010,7 @@ func TestFetchChanInfos(t *testing.T) { zombieChan, zombieChanID := createEdge( 666, 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&zombieChan); err != nil { + if err := graph.AddChannelEdge(ctx, &zombieChan); err != nil { t.Fatalf("unable to create channel edge: %v", err) } err := graph.DeleteChannelEdges(false, true, zombieChan.ChannelID) @@ -3060,7 +3063,7 @@ func TestIncompleteChannelPolicies(t *testing.T) { uint32(0), 0, 0, 0, node1, node2, ) - if err := graph.AddChannelEdge(&channel); err != nil { + if err := graph.AddChannelEdge(ctx, &channel); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -3167,7 +3170,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { // With the two nodes created, we'll now create a random channel, as // well as two edges in the database with distinct update times. edgeInfo, chanID := createEdge(100, 0, 0, 0, node1, node2) - if err := graph.AddChannelEdge(&edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -3316,7 +3319,7 @@ func TestPruneGraphNodes(t *testing.T) { // We'll now add a new edge to the graph, but only actually advertise // the edge of *one* of the nodes. edgeInfo, chanID := createEdge(100, 0, 0, 0, node1, node2) - if err := graph.AddChannelEdge(&edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, &edgeInfo); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -3365,7 +3368,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) { // We'll now create an edge between the two nodes, as a result, node2 // should be inserted into the database as a shell node. edgeInfo, _ := createEdge(100, 0, 0, 0, node1, node2) - require.NoError(t, graph.AddChannelEdge(&edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, &edgeInfo)) // Ensure that node1 was inserted as a full node, while node2 only has // a shell node present. @@ -3379,7 +3382,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) { // Show that attempting to add the channel again will result in an // error. - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) require.ErrorIs(t, err, ErrEdgeAlreadyExist) // Show that updating the shell node to a full node record works. @@ -3480,7 +3483,7 @@ func TestNodeIsPublic(t *testing.T) { require.NoError(t, err) } for _, edge := range edges { - if err := graph.AddChannelEdge(edge); err != nil { + if err := graph.AddChannelEdge(ctx, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } } @@ -3554,7 +3557,7 @@ func TestNodeIsPublic(t *testing.T) { } bobCarolEdge.AuthProof = nil - if err := graph.AddChannelEdge(&bobCarolEdge); err != nil { + if err := graph.AddChannelEdge(ctx, &bobCarolEdge); err != nil { t.Fatalf("unable to add edge: %v", err) } } @@ -3595,7 +3598,7 @@ func TestDisabledChannelIDs(t *testing.T) { t.Fatalf("unable to add node: %v", err) } - if err := graph.AddChannelEdge(edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -3679,7 +3682,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { if err := graph.AddLightningNode(ctx, node2); err != nil { t.Fatalf("unable to add node: %v", err) } - if err := graph.AddChannelEdge(edgeInfo); err != nil { + if err := graph.AddChannelEdge(ctx, edgeInfo); err != nil { t.Fatalf("unable to create channel edge: %v", err) } @@ -3803,6 +3806,7 @@ func assertNumZombies(t *testing.T, graph *ChannelGraph, expZombies uint64) { // TestGraphZombieIndex ensures that we can mark edges correctly as zombie/live. func TestGraphZombieIndex(t *testing.T) { t.Parallel() + ctx := context.Background() // We'll start by creating our test graph along with a test edge. graph := MakeTestGraph(t) @@ -3817,7 +3821,7 @@ func TestGraphZombieIndex(t *testing.T) { } edge, _, _ := createChannelEdge(node1, node2) - require.NoError(t, graph.AddChannelEdge(edge)) + require.NoError(t, graph.AddChannelEdge(ctx, edge)) // Since the edge is known the graph and it isn't a zombie, IsZombieEdge // should not report the channel as a zombie. @@ -4044,7 +4048,7 @@ func TestBatchedAddChannelEdge(t *testing.T) { defer wg.Done() select { - case errChan <- graph.AddChannelEdge(&edge): + case errChan <- graph.AddChannelEdge(ctx, &edge): case <-time.After(2 * time.Second): errChan <- errTimeout } @@ -4081,7 +4085,7 @@ func TestBatchedUpdateEdgePolicy(t *testing.T) { require.Error(t, ErrEdgeNotFound, graph.UpdateEdgePolicy(edge1)) // Add the edge info. - require.NoError(t, graph.AddChannelEdge(edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, edgeInfo)) errTimeout := errors.New("timeout adding batched channel") @@ -4192,7 +4196,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { } // Add the channel, but only insert a single edge into the graph. - require.NoError(t, graph.AddChannelEdge(edgeInfo)) + require.NoError(t, graph.AddChannelEdge(ctx, edgeInfo)) getSingleChannel := func() *DirectedChannel { var ch *DirectedChannel diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index 60833c5dd..a6875252d 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -189,7 +189,7 @@ type V1Store interface { //nolint:interfacebloat // and the set of features that the channel supports. The chanPoint and // chanID are used to uniquely identify the edge globally within the // database. - AddChannelEdge(edge *models.ChannelEdgeInfo, + AddChannelEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error // HasChannelEdge returns true if the database knows of a channel edge diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 4a10bd1a9..6e443f716 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -1098,10 +1098,8 @@ func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket, // involved in creation of the channel, and the set of features that the channel // supports. The chanPoint and chanID are used to uniquely identify the edge // globally within the database. -func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, - opts ...batch.SchedulerOption) error { - - ctx := context.TODO() +func (c *KVStore) AddChannelEdge(ctx context.Context, + edge *models.ChannelEdgeInfo, opts ...batch.SchedulerOption) error { var alreadyExists bool r := &batch.Request[kvdb.RwTx]{ diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 90c4d34a1..31651b0e3 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -501,10 +501,8 @@ func (s *SQLStore) NodeUpdatesInHorizon(startTime, // globally within the database. // // NOTE: part of the V1Store interface. -func (s *SQLStore) AddChannelEdge(edge *models.ChannelEdgeInfo, - opts ...batch.SchedulerOption) error { - - ctx := context.TODO() +func (s *SQLStore) AddChannelEdge(ctx context.Context, + edge *models.ChannelEdgeInfo, opts ...batch.SchedulerOption) error { var alreadyExists bool r := &batch.Request[SQLQueries]{ diff --git a/graph/interfaces.go b/graph/interfaces.go index 2d1e98568..837673754 100644 --- a/graph/interfaces.go +++ b/graph/interfaces.go @@ -29,7 +29,7 @@ type ChannelGraphSource interface { // AddEdge is used to add edge/channel to the topology of the router, // after all information about channel will be gathered this // edge/channel might be used in construction of payment path. - AddEdge(edge *models.ChannelEdgeInfo, + AddEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error // AddProof updates the channel edge info with proof which is needed to @@ -215,7 +215,7 @@ type DB interface { // and the set of features that the channel supports. The chanPoint and // chanID are used to uniquely identify the edge globally within the // database. - AddChannelEdge(edge *models.ChannelEdgeInfo, + AddChannelEdge(ctx context.Context, edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error // MarkEdgeZombie attempts to mark a channel identified by its channel diff --git a/graph/notifications_test.go b/graph/notifications_test.go index 91f2b2345..03ea7aa8f 100644 --- a/graph/notifications_test.go +++ b/graph/notifications_test.go @@ -422,6 +422,7 @@ func (m *mockChainView) FilterBlock(blockHash *chainhash.Hash) (*chainview.Filte // a proper notification is sent of to all registered clients. func TestEdgeUpdateNotification(t *testing.T) { t.Parallel() + ctxb := context.Background() ctx := createTestCtxSingleNode(t, 0) @@ -464,7 +465,7 @@ func TestEdgeUpdateNotification(t *testing.T) { copy(edge.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -657,7 +658,7 @@ func TestNodeUpdateNotification(t *testing.T) { // Adding the edge will add the nodes to the graph, but with no info // except the pubkey known. - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -844,7 +845,7 @@ func TestNotificationCancellation(t *testing.T) { } copy(edge.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } @@ -875,6 +876,7 @@ func TestNotificationCancellation(t *testing.T) { // properly dispatched to all registered clients. func TestChannelCloseNotification(t *testing.T) { t.Parallel() + ctxb := context.Background() const startingBlockHeight = 101 ctx := createTestCtxSingleNode(t, startingBlockHeight) @@ -918,7 +920,7 @@ func TestChannelCloseNotification(t *testing.T) { } copy(edge.BitcoinKey1Bytes[:], bitcoinKey1.SerializeCompressed()) copy(edge.BitcoinKey2Bytes[:], bitcoinKey2.SerializeCompressed()) - if err := ctx.builder.AddEdge(edge); err != nil { + if err := ctx.builder.AddEdge(ctxb, edge); err != nil { t.Fatalf("unable to add edge: %v", err) } diff --git a/lnrpc/devrpc/dev_server.go b/lnrpc/devrpc/dev_server.go index a28a2500a..2a278d074 100644 --- a/lnrpc/devrpc/dev_server.go +++ b/lnrpc/devrpc/dev_server.go @@ -293,7 +293,7 @@ func (s *Server) ImportGraph(ctx context.Context, } edge.ChannelPoint = *channelPoint - if err := graphDB.AddChannelEdge(edge); err != nil { + if err := graphDB.AddChannelEdge(ctx, edge); err != nil { return nil, fmt.Errorf("unable to add edge %v: %w", rpcEdge.ChanPoint, err) } diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index c4aeb5af1..23f62579a 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -356,7 +356,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( ), } - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) if err != nil && !errors.Is(err, graphdb.ErrEdgeAlreadyExist) { return nil, err } @@ -666,7 +666,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, BitcoinKey2Bytes: node2Vertex, } - err = graph.AddChannelEdge(&edgeInfo) + err = graph.AddChannelEdge(ctx, &edgeInfo) if err != nil && !errors.Is(err, graphdb.ErrEdgeAlreadyExist) { return nil, err } diff --git a/routing/router_test.go b/routing/router_test.go index 0dc42e18e..385d4442f 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -2744,7 +2744,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { BitcoinKey2Bytes: pub2, AuthProof: nil, } - require.NoError(t, ctx.graph.AddChannelEdge(edge)) + require.NoError(t, ctx.graph.AddChannelEdge(ctxb, edge)) // We must add the edge policy to be able to use the edge for route // finding. @@ -2824,7 +2824,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { copy(edge.BitcoinKey1Bytes[:], node1Bytes) edge.BitcoinKey2Bytes = node2Bytes - require.NoError(t, ctx.graph.AddChannelEdge(edge)) + require.NoError(t, ctx.graph.AddChannelEdge(ctxb, edge)) edgePolicy = &models.ChannelEdgePolicy{ SigBytes: testSig.Serialize(), diff --git a/server.go b/server.go index 06f271bbd..ce728caf5 100644 --- a/server.go +++ b/server.go @@ -1225,7 +1225,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies, FetchChannel: s.chanStateDB.FetchChannel, AddEdge: func(edge *models.ChannelEdgeInfo) error { - return s.graphBuilder.AddEdge(edge) + return s.graphBuilder.AddEdge(context.TODO(), edge) }, }