diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index fd557a02c..282e32625 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -736,7 +736,8 @@ func (t *testNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo, policy2 *models.ChannelEdgePolicy) error { return f(edge, policy1, policy2) - }) + }, func() {}, + ) } func (t *testNodeTx) FetchNode(pub route.Vertex) (graphdb.NodeRTx, error) { diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 70c1caf2e..c4677bbf5 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1758,6 +1758,9 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(ctx context.Context, } return nil + }, func() { + havePublicChannels = false + edgesToUpdate = nil }) if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { return fmt.Errorf("unable to retrieve outgoing channels: %w", diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index c10bdbbe1..6b9a1d58f 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -213,7 +213,7 @@ func (r *mockGraphSource) ForEachNode( func (r *mockGraphSource) ForAllOutgoingChannels(_ context.Context, cb func(i *models.ChannelEdgeInfo, - c *models.ChannelEdgePolicy) error) error { + c *models.ChannelEdgePolicy) error, _ func()) error { r.mu.Lock() defer r.mu.Unlock() @@ -3728,10 +3728,8 @@ out: }) return nil - }) - if err != nil { - t.Fatal(err) - } + }, func() {}) + require.NoError(t, err) err = ctx.gossiper.PropagateChanPolicyUpdate(edgesToUpdate) require.NoError(t, err, "unable to chan policies") diff --git a/graph/builder.go b/graph/builder.go index 073716a83..5cfb79e4c 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -1276,10 +1276,11 @@ func (b *Builder) FetchLightningNode(ctx context.Context, // // NOTE: This method is part of the ChannelGraphSource interface. func (b *Builder) ForAllOutgoingChannels(ctx context.Context, - cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy) error) error { + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error, + reset func()) error { - return b.cfg.Graph.ForEachNodeChannel(ctx, b.cfg.SelfNode, + return b.cfg.Graph.ForEachNodeChannel( + ctx, b.cfg.SelfNode, func(c *models.ChannelEdgeInfo, e *models.ChannelEdgePolicy, _ *models.ChannelEdgePolicy) error { @@ -1289,7 +1290,7 @@ func (b *Builder) ForAllOutgoingChannels(ctx context.Context, } return cb(c, e) - }, + }, reset, ) } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 50d288cdc..8d62f65d4 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -1394,7 +1394,8 @@ func TestGraphTraversal(t *testing.T) { // outgoing channels for a particular node. numNodeChans := 0 firstNode, secondNode := nodeList[0], nodeList[1] - err = graph.ForEachNodeChannel(ctx, firstNode.PubKeyBytes, + err = graph.ForEachNodeChannel( + ctx, firstNode.PubKeyBytes, func(_ *models.ChannelEdgeInfo, outEdge, inEdge *models.ChannelEdgePolicy) error { @@ -1425,7 +1426,8 @@ func TestGraphTraversal(t *testing.T) { numNodeChans++ return nil - }) + }, func() {}, + ) require.NoError(t, err) require.Equal(t, numChannels, numNodeChans) } @@ -3144,7 +3146,8 @@ func TestIncompleteChannelPolicies(t *testing.T) { expectedOut bool) { calls := 0 - err := graph.ForEachNodeChannel(ctx, node.PubKeyBytes, + err := graph.ForEachNodeChannel( + ctx, node.PubKeyBytes, func(_ *models.ChannelEdgeInfo, outEdge, inEdge *models.ChannelEdgePolicy) error { @@ -3167,14 +3170,10 @@ func TestIncompleteChannelPolicies(t *testing.T) { calls++ return nil - }) - if err != nil { - t.Fatalf("unable to scan channels: %v", err) - } - - if calls != 1 { - t.Fatalf("Expected only one callback call") - } + }, func() {}, + ) + require.NoError(t, err) + require.Equal(t, 1, calls) } checkPolicies(node2, false, false) @@ -4248,7 +4247,7 @@ func BenchmarkForEachChannel(b *testing.B) { return nil } - err := graph.ForEachNodeChannel(ctx, n, cb) + err := graph.ForEachNodeChannel(ctx, n, cb, func() {}) require.NoError(b, err) } } diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index 652e7bd34..8c4b0c855 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -86,7 +86,7 @@ type V1Store interface { //nolint:interfacebloat // Unknown policies are passed into the callback as nil values. ForEachNodeChannel(ctx context.Context, nodePub route.Vertex, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error + *models.ChannelEdgePolicy) error, reset func()) error // ForEachNodeCached is similar to forEachNode, but it returns // DirectedChannel data to the call-back. diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 2a52848bf..19d412d83 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -406,7 +406,7 @@ func (c *KVStore) AddrsForNode(ctx context.Context, // callback. func (c *KVStore) ForEachChannel(_ context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error { + *models.ChannelEdgePolicy) error, reset func()) error { return forEachChannel(c.db, cb, reset) } @@ -679,7 +679,7 @@ func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) ( // NOTE: The callback contents MUST not be modified. func (c *KVStore) ForEachNodeCached(_ context.Context, cb func(node route.Vertex, - chans map[uint64]*DirectedChannel) error) error { + chans map[uint64]*DirectedChannel) error) error { reset := func() {} @@ -3182,7 +3182,7 @@ func (c *KVStore) HasLightningNode(_ context.Context, // function's View/Update call which will then apply to the whole transaction. func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend, cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error { + *models.ChannelEdgePolicy) error, reset func()) error { traversal := func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) @@ -3271,9 +3271,7 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend, // Unknown policies are passed into the callback as nil values. func (c *KVStore) ForEachNodeChannel(_ context.Context, nodePub route.Vertex, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error { - - reset := func() {} + *models.ChannelEdgePolicy) error, reset func()) error { return nodeTraversal( nil, nodePub[:], c.db, func(_ kvdb.RTx, @@ -3291,7 +3289,7 @@ func (c *KVStore) ForEachNodeChannel(_ context.Context, nodePub route.Vertex, // peer's node information. func (c *KVStore) ForEachSourceNodeChannel(_ context.Context, cb func(chanPoint wire.OutPoint, havePolicy bool, - otherNode *models.LightningNode) error, reset func()) error { + otherNode *models.LightningNode) error, reset func()) error { return kvdb.View(c.db, func(tx kvdb.RTx) error { nodes := tx.ReadBucket(nodeBucket) @@ -3341,7 +3339,7 @@ func (c *KVStore) ForEachSourceNodeChannel(_ context.Context, // NOTE: the reset function is only meaningful if the tx param is nil. func (c *KVStore) forEachNodeChannelTx(tx kvdb.RTx, nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, reset func()) error { return nodeTraversal(tx, nodePub[:], c.db, cb, reset) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 5a8c75a4f..dc9365a17 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -962,9 +962,7 @@ func (s *SQLStore) ForEachNodeCacheable(ctx context.Context, // NOTE: part of the V1Store interface. func (s *SQLStore) ForEachNodeChannel(ctx context.Context, nodePub route.Vertex, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error { - - reset := func() {} + *models.ChannelEdgePolicy) error, reset func()) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { dbNode, err := db.GetNodeByPubKey( diff --git a/graph/interfaces.go b/graph/interfaces.go index 20b63dd15..5494db61a 100644 --- a/graph/interfaces.go +++ b/graph/interfaces.go @@ -72,7 +72,7 @@ type ChannelGraphSource interface { // star-graph. ForAllOutgoingChannels(ctx context.Context, cb func(c *models.ChannelEdgeInfo, - e *models.ChannelEdgePolicy) error) error + e *models.ChannelEdgePolicy) error, reset func()) error // CurrentBlockHeight returns the block height from POV of the router // subsystem. @@ -260,7 +260,7 @@ type DB interface { // Unknown policies are passed into the callback as nil values. ForEachNodeChannel(ctx context.Context, nodePub route.Vertex, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error + *models.ChannelEdgePolicy) error, reset func()) error // AddEdgeProof sets the proof of an existing edge in the graph // database. diff --git a/routing/localchans/manager.go b/routing/localchans/manager.go index faba9fa8b..b1d281187 100644 --- a/routing/localchans/manager.go +++ b/routing/localchans/manager.go @@ -42,7 +42,7 @@ type Manager struct { // channels. The ChannelEdgePolicy parameter may be nil. ForAllOutgoingChannels func(ctx context.Context, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy) error) error + *models.ChannelEdgePolicy) error, reset func()) error // FetchChannel is used to query local channel parameters. Optionally an // existing db tx can be supplied. @@ -152,7 +152,14 @@ func (r *Manager) UpdatePolicy(ctx context.Context, // Next, we'll loop over all the outgoing channels the router knows of. // If we have a filter then we'll only collect those channels, otherwise // we'll collect them all. - err := r.ForAllOutgoingChannels(ctx, processChan) + err := r.ForAllOutgoingChannels( + ctx, processChan, + func() { + failedUpdates = nil + edgesToUpdate = nil + clear(policiesToUpdate) + }, + ) if err != nil { return nil, err } diff --git a/routing/localchans/manager_test.go b/routing/localchans/manager_test.go index 6e53f4a27..09b4f0bc8 100644 --- a/routing/localchans/manager_test.go +++ b/routing/localchans/manager_test.go @@ -125,7 +125,7 @@ func TestManager(t *testing.T) { forAllOutgoingChannels := func(_ context.Context, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy) error) error { + *models.ChannelEdgePolicy) error, _ func()) error { for _, c := range channelSet { if err := cb(c.edgeInfo, ¤tPolicy); err != nil { diff --git a/rpcserver.go b/rpcserver.go index 258be4abd..0d003739f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7048,7 +7048,8 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context, channels []*lnrpc.ChannelEdge ) - err = graph.ForEachNodeChannel(ctx, node.PubKeyBytes, + err = graph.ForEachNodeChannel( + ctx, node.PubKeyBytes, func(edge *models.ChannelEdgeInfo, c1, c2 *models.ChannelEdgePolicy) error { @@ -7074,6 +7075,10 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context, } return nil + }, func() { + numChannels = 0 + totalCapacity = 0 + channels = nil }, ) if err != nil { @@ -7730,7 +7735,8 @@ func (r *rpcServer) FeeReport(ctx context.Context, } var feeReports []*lnrpc.ChannelFeeReport - err = channelGraph.ForEachNodeChannel(ctx, selfNode.PubKeyBytes, + err = channelGraph.ForEachNodeChannel( + ctx, selfNode.PubKeyBytes, func(chanInfo *models.ChannelEdgeInfo, edgePolicy, _ *models.ChannelEdgePolicy) error { @@ -7768,6 +7774,8 @@ func (r *rpcServer) FeeReport(ctx context.Context, }) return nil + }, func() { + feeReports = nil }, ) if err != nil { diff --git a/server.go b/server.go index 2c42dab6c..bfc1850d7 100644 --- a/server.go +++ b/server.go @@ -1254,7 +1254,8 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, DefaultRoutingPolicy: cc.RoutingPolicy, ForAllOutgoingChannels: func(ctx context.Context, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy) error) error { + *models.ChannelEdgePolicy) error, + reset func()) error { return s.graphDB.ForEachNodeChannel(ctx, selfVertex, func(c *models.ChannelEdgeInfo, @@ -1264,7 +1265,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, // NOTE: The invoked callback here may // receive a nil channel policy. return cb(c, e) - }, + }, reset, ) }, PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,