From 81c063eb9636e447b3d834c70e6484c013a8bdac Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 18 Jun 2025 08:11:44 +0200 Subject: [PATCH] graph/db: thread context through to UpdateEdgePolicy --- autopilot/prefattach_test.go | 4 +-- discovery/gossiper.go | 6 ++-- discovery/gossiper_test.go | 4 +-- graph/builder.go | 16 +++++---- graph/builder_test.go | 14 ++++---- graph/db/graph.go | 6 ++-- graph/db/graph_test.go | 70 ++++++++++++++++++------------------ graph/db/interfaces.go | 2 +- graph/db/kv_store.go | 4 +-- graph/db/sql_store.go | 5 ++- graph/interfaces.go | 4 +-- graph/notifications_test.go | 4 +-- lnrpc/devrpc/dev_server.go | 6 ++-- routing/pathfind_test.go | 23 ++++++------ routing/router_test.go | 12 ++++--- 15 files changed, 94 insertions(+), 86 deletions(-) diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index 5127936fb..4e1bbf7b1 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -504,7 +504,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey, ChannelFlags: 0, } - if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil { + if err := d.db.UpdateEdgePolicy(ctx, edgePolicy); err != nil { return nil, nil, err } edgePolicy = &models.ChannelEdgePolicy{ @@ -519,7 +519,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey, MessageFlags: 1, ChannelFlags: 1, } - if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil { + if err := d.db.UpdateEdgePolicy(ctx, edgePolicy); err != nil { return nil, nil, err } diff --git a/discovery/gossiper.go b/discovery/gossiper.go index be36ac2cb..18b603e1b 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2288,7 +2288,7 @@ func (d *AuthenticatedGossiper) isMsgStale(_ context.Context, // updateChannel creates a new fully signed update for the channel, and updates // the underlying graph with the new state. -func (d *AuthenticatedGossiper) updateChannel(_ context.Context, +func (d *AuthenticatedGossiper) updateChannel(ctx context.Context, info *models.ChannelEdgeInfo, edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1, *lnwire.ChannelUpdate1, error) { @@ -2322,7 +2322,7 @@ func (d *AuthenticatedGossiper) updateChannel(_ context.Context, } // Finally, we'll write the new edge policy to disk. - if err := d.cfg.Graph.UpdateEdge(edge); err != nil { + if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil { return nil, nil, err } @@ -3263,7 +3263,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context, ExtraOpaqueData: upd.ExtraOpaqueData, } - if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil { + if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil { if graph.IsError( err, graph.ErrOutdated, graph.ErrIgnored, diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 0c845118e..dd8eca40d 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -161,8 +161,8 @@ func (r *mockGraphSource) queueValidationFail(chanID uint64) { r.chansToReject[chanID] = struct{}{} } -func (r *mockGraphSource) UpdateEdge(edge *models.ChannelEdgePolicy, - _ ...batch.SchedulerOption) error { +func (r *mockGraphSource) UpdateEdge(_ context.Context, + edge *models.ChannelEdgePolicy, _ ...batch.SchedulerOption) error { r.mu.Lock() defer func() { diff --git a/graph/builder.go b/graph/builder.go index df34cf28a..028e1c631 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -915,6 +915,8 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error { // ApplyChannelUpdate validates a channel update and if valid, applies it to the // database. It returns a bool indicating whether the updates were successful. func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { + ctx := context.TODO() + ch, _, _, err := b.GetChannelByID(msg.ShortChannelID) if err != nil { log.Errorf("Unable to retrieve channel by id: %v", err) @@ -959,7 +961,7 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { ExtraOpaqueData: msg.ExtraOpaqueData, } - err = b.UpdateEdge(update) + err = b.UpdateEdge(ctx, update) if err != nil && !IsError(err, ErrIgnored, ErrOutdated) { log.Errorf("Unable to apply channel update: %v", err) return false @@ -1118,10 +1120,10 @@ func (b *Builder) addEdge(ctx context.Context, edge *models.ChannelEdgeInfo, // considered as not fully constructed. // // NOTE: This method is part of the ChannelGraphSource interface. -func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, - op ...batch.SchedulerOption) error { +func (b *Builder) UpdateEdge(ctx context.Context, + update *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { - err := b.updateEdge(update, op...) + err := b.updateEdge(ctx, update, op...) if err != nil { logNetworkMsgProcessError(err) @@ -1135,8 +1137,8 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, // persisted in the graph, and then applies it to the graph if the update is // considered fresh enough and if we actually have a channel persisted for the // given update. -func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy, - op ...batch.SchedulerOption) error { +func (b *Builder) updateEdge(ctx context.Context, + policy *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { log.Debugf("Received ChannelEdgePolicy for channel %v", policy.ChannelID) @@ -1209,7 +1211,7 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy, // Now that we know this isn't a stale update, we'll apply the new edge // policy to the proper directional edge within the channel graph. - if err = b.cfg.Graph.UpdateEdgePolicy(policy, op...); err != nil { + if err = b.cfg.Graph.UpdateEdgePolicy(ctx, policy, op...); err != nil { err := errors.Errorf("unable to add channel: %v", err) log.Error(err) return err diff --git a/graph/builder_test.go b/graph/builder_test.go index 94dc8c057..3abb345e0 100644 --- a/graph/builder_test.go +++ b/graph/builder_test.go @@ -172,7 +172,7 @@ func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { // Attempt to update the edge. This should be ignored, since the edge // is not yet added to the router. - err = ctx.builder.UpdateEdge(edgePolicy) + err = ctx.builder.UpdateEdge(ctxb, edgePolicy) if !IsError(err, ErrIgnored) { t.Fatalf("expected to get ErrIgnore, instead got: %v", err) } @@ -183,7 +183,7 @@ func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { "though the vertexes were unknown: %v.", err) // Now updating the edge policy should succeed. - require.NoError(t, ctx.builder.UpdateEdge(edgePolicy)) + require.NoError(t, ctx.builder.UpdateEdge(ctxb, edgePolicy)) } // TestWakeUpOnStaleBranch tests that upon startup of the ChannelRouter, if the @@ -1205,7 +1205,7 @@ func TestIsStaleEdgePolicy(t *testing.T) { FeeProportionalMillionths: 10000, } edgePolicy.ChannelFlags = 0 - if err := ctx.builder.UpdateEdge(edgePolicy); err != nil { + if err := ctx.builder.UpdateEdge(ctxb, edgePolicy); err != nil { t.Fatalf("unable to update edge policy: %v", err) } @@ -1219,7 +1219,7 @@ func TestIsStaleEdgePolicy(t *testing.T) { FeeProportionalMillionths: 10000, } edgePolicy.ChannelFlags = 1 - if err := ctx.builder.UpdateEdge(edgePolicy); err != nil { + if err := ctx.builder.UpdateEdge(ctxb, edgePolicy); err != nil { t.Fatalf("unable to update edge policy: %v", err) } @@ -1543,7 +1543,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( ), ToNode: targetNode, } - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edgePolicy); err != nil { return nil, err } @@ -1912,7 +1912,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, ToNode: node2Vertex, ExtraOpaqueData: getExtraData(node1), } - err := graph.UpdateEdgePolicy(edgePolicy) + err := graph.UpdateEdgePolicy(ctx, edgePolicy) if err != nil { return nil, err } @@ -1943,7 +1943,7 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, ToNode: node1Vertex, ExtraOpaqueData: getExtraData(node2), } - err := graph.UpdateEdgePolicy(edgePolicy) + err := graph.UpdateEdgePolicy(ctx, edgePolicy) if err != nil { return nil, err } diff --git a/graph/db/graph.go b/graph/db/graph.go index 1fda4afe3..7150107a2 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -565,10 +565,10 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, // updated, otherwise it's the second node's information. The node ordering is // determined by the lexicographical ordering of the identity public keys of the // nodes on either side of the channel. -func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, - op ...batch.SchedulerOption) error { +func (c *ChannelGraph) UpdateEdgePolicy(ctx context.Context, + edge *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { - from, to, err := c.V1Store.UpdateEdgePolicy(edge, op...) + from, to, err := c.V1Store.UpdateEdgePolicy(ctx, edge, op...) if err != nil { return err } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 1b54a8832..bd8b62293 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -832,7 +832,7 @@ func TestEdgeInfoUpdates(t *testing.T) { // Make sure inserting the policy at this point, before the edge info // is added, will fail. - err := graph.UpdateEdgePolicy(edge1) + err := graph.UpdateEdgePolicy(ctx, edge1) require.ErrorIs(t, err, ErrEdgeNotFound) require.Len(t, graph.graphCache.nodeChannels, 0) @@ -847,11 +847,11 @@ func TestEdgeInfoUpdates(t *testing.T) { // Next, insert both edge policies into the database, they should both // be inserted without any issues. - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } assertEdgeWithPolicyInCache(t, graph, edgeInfo, edge1, true) - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } assertEdgeWithPolicyInCache(t, graph, edgeInfo, edge2, false) @@ -1227,14 +1227,14 @@ func TestForEachSourceNodeChannel(t *testing.T) { if !bytes.Equal(abPolicy1.ToNode[:], nodeB.PubKeyBytes[:]) { abPolicyAOutgoing = abPolicy2 } - require.NoError(t, graph.UpdateEdgePolicy(abPolicyAOutgoing)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, abPolicyAOutgoing)) // Now, set the incoming policy for the A-C channel. acPolicyAIncoming := acPolicy1 if !bytes.Equal(acPolicy1.ToNode[:], nodeA.PubKeyBytes[:]) { acPolicyAIncoming = acPolicy2 } - require.NoError(t, graph.UpdateEdgePolicy(acPolicyAIncoming)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, acPolicyAIncoming)) type sourceNodeChan struct { otherNode route.Vertex @@ -1566,7 +1566,7 @@ func fillTestGraph(t testing.TB, graph *ChannelGraph, numNodes, edge.ChannelFlags = 0 edge.ToNode = node2.PubKeyBytes edge.SigBytes = testSig.Serialize() - require.NoError(t, graph.UpdateEdgePolicy(edge)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge)) // Create another random edge that points from // node2 -> node1 this time. @@ -1574,7 +1574,7 @@ func fillTestGraph(t testing.TB, graph *ChannelGraph, numNodes, edge.ChannelFlags = 1 edge.ToNode = node1.PubKeyBytes edge.SigBytes = testSig.Serialize() - require.NoError(t, graph.UpdateEdgePolicy(edge)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge)) chanIndex[chanID] = struct{}{} } @@ -1764,7 +1764,7 @@ func TestGraphPruning(t *testing.T) { edge.ChannelFlags = 0 edge.ToNode = graphNodes[i].PubKeyBytes edge.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -1774,7 +1774,7 @@ func TestGraphPruning(t *testing.T) { edge.ChannelFlags = 1 edge.ToNode = graphNodes[i].PubKeyBytes edge.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge); err != nil { t.Fatalf("unable to update edge: %v", err) } } @@ -1985,7 +1985,7 @@ func TestChanUpdatesInHorizon(t *testing.T) { edge1.ChannelFlags = 0 edge1.ToNode = node2.PubKeyBytes edge1.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -1995,7 +1995,7 @@ func TestChanUpdatesInHorizon(t *testing.T) { edge2.ChannelFlags = 1 edge2.ToNode = node1.PubKeyBytes edge2.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -2770,12 +2770,14 @@ func TestFilterChannelRange(t *testing.T) { var updateTime = time.Unix(0, 0) if rand.Int31n(2) == 0 { updateTime = time.Unix(updateTimeSeed, 0) - err = graph.UpdateEdgePolicy(&models.ChannelEdgePolicy{ - ToNode: node.PubKeyBytes, - ChannelFlags: chanFlags, - ChannelID: chanID, - LastUpdate: updateTime, - }) + err = graph.UpdateEdgePolicy( + ctx, &models.ChannelEdgePolicy{ + ToNode: node.PubKeyBytes, + ChannelFlags: chanFlags, + ChannelID: chanID, + LastUpdate: updateTime, + }, + ) require.NoError(t, err) } updateTimeSeed++ @@ -2980,7 +2982,7 @@ func TestFetchChanInfos(t *testing.T) { edge1.ChannelFlags = 0 edge1.ToNode = node2.PubKeyBytes edge1.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -2988,7 +2990,7 @@ func TestFetchChanInfos(t *testing.T) { edge2.ChannelFlags = 1 edge2.ToNode = node1.PubKeyBytes edge2.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3115,7 +3117,7 @@ func TestIncompleteChannelPolicies(t *testing.T) { edgePolicy.ChannelFlags = 0 edgePolicy.ToNode = node2.PubKeyBytes edgePolicy.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edgePolicy); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3128,7 +3130,7 @@ func TestIncompleteChannelPolicies(t *testing.T) { edgePolicy.ChannelFlags = 1 edgePolicy.ToNode = node1.PubKeyBytes edgePolicy.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edgePolicy); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3178,7 +3180,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { edge1.ChannelFlags = 0 edge1.ToNode = node1.PubKeyBytes edge1.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions. @@ -3187,7 +3189,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { edge2.ChannelFlags = 1 edge2.ToNode = node2.PubKeyBytes edge2.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } edge2 = copyEdgePolicy(edge2) // Avoid read/write race conditions. @@ -3254,12 +3256,12 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { // removed from the update index. edge1.ChannelFlags = 2 edge1.LastUpdate = time.Now() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } edge2.ChannelFlags = 3 edge2.LastUpdate = edge1.LastUpdate.Add(time.Hour) - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3329,7 +3331,7 @@ func TestPruneGraphNodes(t *testing.T) { edge1.ChannelFlags = 0 edge1.ToNode = node1.PubKeyBytes edge1.SigBytes = testSig.Serialize() - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3613,7 +3615,7 @@ func TestDisabledChannelIDs(t *testing.T) { // Add one disabled policy and ensure the channel is still not in the // disabled list. edge1.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } disabledChanIds, err = graph.DisabledChannelIDs() @@ -3626,7 +3628,7 @@ func TestDisabledChannelIDs(t *testing.T) { // Add second disabled policy and ensure the channel is now in the // disabled list. edge2.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } disabledChanIds, err = graph.DisabledChannelIDs() @@ -3752,7 +3754,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { require.NoError(t, err, "error writing db") // And add the second, unmodified edge. - if err := graph.UpdateEdgePolicy(edge2); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -3774,7 +3776,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { // Now add the original, unmodified edge policy, and make sure the edge // policies then become fully populated. - if err := graph.UpdateEdgePolicy(edge1); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -4082,7 +4084,7 @@ func TestBatchedUpdateEdgePolicy(t *testing.T) { // Make sure inserting the policy at this point, before the edge info // is added, will fail. - require.Error(t, ErrEdgeNotFound, graph.UpdateEdgePolicy(edge1)) + require.Error(t, ErrEdgeNotFound, graph.UpdateEdgePolicy(ctx, edge1)) // Add the edge info. require.NoError(t, graph.AddChannelEdge(ctx, edgeInfo)) @@ -4101,7 +4103,7 @@ func TestBatchedUpdateEdgePolicy(t *testing.T) { defer wg.Done() select { - case errChan <- graph.UpdateEdgePolicy(update): + case errChan <- graph.UpdateEdgePolicy(ctx, update): case <-time.After(2 * time.Second): errChan <- errTimeout } @@ -4226,7 +4228,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { FeeRate: 20, } edge1.InboundFee = fn.Some(inboundFee) - require.NoError(t, graph.UpdateEdgePolicy(edge1)) + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge1)) edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions. directedChan := getSingleChannel() @@ -4241,7 +4243,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { // error when we try to update the edge policy. edge1.LastUpdate = edge1.LastUpdate.Add(time.Second) require.ErrorIs( - t, graph.UpdateEdgePolicy(edge1), ErrParsingExtraTLVBytes, + t, graph.UpdateEdgePolicy(ctx, edge1), ErrParsingExtraTLVBytes, ) // Since persistence of the last update failed, we should still bet diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index a6875252d..238688569 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -332,7 +332,7 @@ type V1Store interface { //nolint:interfacebloat // node's information. The node ordering is determined by the // lexicographical ordering of the identity public keys of the nodes on // either side of the channel. - UpdateEdgePolicy(edge *models.ChannelEdgePolicy, + UpdateEdgePolicy(ctx context.Context, edge *models.ChannelEdgePolicy, op ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) // SourceNode returns the source node of the graph. The source node is diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 6e443f716..b47e4fb32 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -2801,11 +2801,11 @@ func makeZombiePubkeys(info *models.ChannelEdgeInfo, // updated, otherwise it's the second node's information. The node ordering is // determined by the lexicographical ordering of the identity public keys of the // nodes on either side of the channel. -func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, +func (c *KVStore) UpdateEdgePolicy(ctx context.Context, + edge *models.ChannelEdgePolicy, opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) { var ( - ctx = context.TODO() isUpdate1 bool edgeNotFound bool from, to route.Vertex diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 31651b0e3..6cf64b1ed 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -577,11 +577,10 @@ func (s *SQLStore) HighestChanID() (uint64, error) { // nodes on either side of the channel. // // NOTE: part of the V1Store interface. -func (s *SQLStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, +func (s *SQLStore) UpdateEdgePolicy(ctx context.Context, + edge *models.ChannelEdgePolicy, opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) { - ctx := context.TODO() - var ( isUpdate1 bool edgeNotFound bool diff --git a/graph/interfaces.go b/graph/interfaces.go index 837673754..be226a495 100644 --- a/graph/interfaces.go +++ b/graph/interfaces.go @@ -39,7 +39,7 @@ type ChannelGraphSource interface { // UpdateEdge is used to update edge information, without this message // edge considered as not fully constructed. - UpdateEdge(policy *models.ChannelEdgePolicy, + UpdateEdge(ctx context.Context, policy *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error // IsStaleNode returns true if the graph source has a node announcement @@ -231,7 +231,7 @@ type DB interface { // node's information. The node ordering is determined by the // lexicographical ordering of the identity public keys of the nodes on // either side of the channel. - UpdateEdgePolicy(edge *models.ChannelEdgePolicy, + UpdateEdgePolicy(ctx context.Context, edge *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error // HasLightningNode determines if the graph has a vertex identified by diff --git a/graph/notifications_test.go b/graph/notifications_test.go index 03ea7aa8f..7b4fa69cb 100644 --- a/graph/notifications_test.go +++ b/graph/notifications_test.go @@ -484,10 +484,10 @@ func TestEdgeUpdateNotification(t *testing.T) { require.NoError(t, err, "unable to create a random chan policy") edge2.ChannelFlags = 1 - if err := ctx.builder.UpdateEdge(edge1); err != nil { + if err := ctx.builder.UpdateEdge(ctxb, edge1); err != nil { t.Fatalf("unable to add edge update: %v", err) } - if err := ctx.builder.UpdateEdge(edge2); err != nil { + if err := ctx.builder.UpdateEdge(ctxb, edge2); err != nil { t.Fatalf("unable to add edge update: %v", err) } diff --git a/lnrpc/devrpc/dev_server.go b/lnrpc/devrpc/dev_server.go index 2a278d074..d25118864 100644 --- a/lnrpc/devrpc/dev_server.go +++ b/lnrpc/devrpc/dev_server.go @@ -331,7 +331,8 @@ func (s *Server) ImportGraph(ctx context.Context, if rpcEdge.Node1Policy != nil { policy := makePolicy(rpcEdge.Node1Policy) policy.ChannelFlags = 0 - if err := graphDB.UpdateEdgePolicy(policy); err != nil { + err := graphDB.UpdateEdgePolicy(ctx, policy) + if err != nil { return nil, fmt.Errorf( "unable to update policy: %v", err) } @@ -340,7 +341,8 @@ func (s *Server) ImportGraph(ctx context.Context, if rpcEdge.Node2Policy != nil { policy := makePolicy(rpcEdge.Node2Policy) policy.ChannelFlags = 1 - if err := graphDB.UpdateEdgePolicy(policy); err != nil { + err := graphDB.UpdateEdgePolicy(ctx, policy) + if err != nil { return nil, fmt.Errorf( "unable to update policy: %v", err) } diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index 23f62579a..e1962f5d9 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -381,7 +381,7 @@ func parseTestGraph(t *testing.T, useCache bool, path string) ( FeeProportionalMillionths: lnwire.MilliSatoshi(edge.FeeRate), ToNode: targetNode, } - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + if err := graph.UpdateEdgePolicy(ctx, edgePolicy); err != nil { return nil, err } @@ -719,7 +719,8 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, InboundFee: getInboundFees(node1), //nolint:ll ExtraOpaqueData: getExtraData(node1), } - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + err := graph.UpdateEdgePolicy(ctx, edgePolicy) + if err != nil { return nil, err } } @@ -750,7 +751,8 @@ func createTestGraphFromChannels(t *testing.T, useCache bool, InboundFee: getInboundFees(node2), //nolint:ll ExtraOpaqueData: getExtraData(node2), } - if err := graph.UpdateEdgePolicy(edgePolicy); err != nil { + err := graph.UpdateEdgePolicy(ctx, edgePolicy) + if err != nil { return nil, err } } @@ -2155,9 +2157,8 @@ func runRouteFailMaxHTLC(t *testing.T, useCache bool) { require.NoError(t, err, "unable to fetch channel edges by ID") midEdge.MessageFlags = 1 midEdge.MaxHTLC = payAmt - 1 - if err := graph.UpdateEdgePolicy(midEdge); err != nil { - t.Fatalf("unable to update edge: %v", err) - } + err = graph.UpdateEdgePolicy(context.Background(), midEdge) + require.NoError(t, err) // We'll now attempt to route through that edge with a payment above // 100k msat, which should fail. @@ -2198,11 +2199,11 @@ func runRouteFailDisabledEdge(t *testing.T, useCache bool) { _, e1, e2, err := graph.graph.FetchChannelEdgesByID(roasToPham) require.NoError(t, err, "unable to fetch edge") e1.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e1); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e1); err != nil { t.Fatalf("unable to update edge: %v", err) } e2.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e2); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e2); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -2219,7 +2220,7 @@ func runRouteFailDisabledEdge(t *testing.T, useCache bool) { _, e, _, err := graph.graph.FetchChannelEdgesByID(phamToSophon) require.NoError(t, err, "unable to fetch edge") e.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e); err != nil { t.Fatalf("unable to update edge: %v", err) } @@ -2301,11 +2302,11 @@ func runPathSourceEdgesBandwidth(t *testing.T, useCache bool) { _, e1, e2, err := graph.graph.FetchChannelEdgesByID(roasToSongoku) require.NoError(t, err, "unable to fetch edge") e1.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e1); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e1); err != nil { t.Fatalf("unable to update edge: %v", err) } e2.ChannelFlags |= lnwire.ChanUpdateDisabled - if err := graph.graph.UpdateEdgePolicy(e2); err != nil { + if err := graph.graph.UpdateEdgePolicy(ctx, e2); err != nil { t.Fatalf("unable to update edge: %v", err) } diff --git a/routing/router_test.go b/routing/router_test.go index 385d4442f..3394e2427 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -2760,7 +2760,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { } edgePolicy.ChannelFlags = 0 - require.NoError(t, ctx.graph.UpdateEdgePolicy(edgePolicy)) + require.NoError(t, ctx.graph.UpdateEdgePolicy(ctxb, edgePolicy)) // Create edge in the other direction as well. edgePolicy = &models.ChannelEdgePolicy{ @@ -2775,7 +2775,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { } edgePolicy.ChannelFlags = 1 - require.NoError(t, ctx.graph.UpdateEdgePolicy(edgePolicy)) + require.NoError(t, ctx.graph.UpdateEdgePolicy(ctxb, edgePolicy)) // After adding the edge between the two previously unknown nodes, they // should have been added to the graph. @@ -2838,7 +2838,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { } edgePolicy.ChannelFlags = 0 - require.NoError(t, ctx.graph.UpdateEdgePolicy(edgePolicy)) + require.NoError(t, ctx.graph.UpdateEdgePolicy(ctxb, edgePolicy)) edgePolicy = &models.ChannelEdgePolicy{ SigBytes: testSig.Serialize(), @@ -2852,7 +2852,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { } edgePolicy.ChannelFlags = 1 - require.NoError(t, ctx.graph.UpdateEdgePolicy(edgePolicy)) + require.NoError(t, ctx.graph.UpdateEdgePolicy(ctxb, edgePolicy)) // We should now be able to find a route to node 2. paymentAmt := lnwire.NewMSatFromSatoshis(100) @@ -2943,7 +2943,9 @@ type mockGraphBuilder struct { func newMockGraphBuilder(graph graph.DB) *mockGraphBuilder { return &mockGraphBuilder{ updateEdge: func(update *models.ChannelEdgePolicy) error { - return graph.UpdateEdgePolicy(update) + return graph.UpdateEdgePolicy( + context.Background(), update, + ) }, } }