From ece157b40b23147153a09d6cab4d982f8b084d4b Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 11 Jun 2025 16:27:09 +0200 Subject: [PATCH] graph/db: implement ForEachNodeChannel Which lets us then run `TestIncompleteChannelPolicies` and `BenchmarkForEachChannel` against our SQL backends. --- docs/release-notes/release-notes-0.20.0.md | 1 + graph/db/graph_test.go | 4 +-- graph/db/sql_store.go | 35 ++++++++++++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/docs/release-notes/release-notes-0.20.0.md b/docs/release-notes/release-notes-0.20.0.md index d5edcab17..496a20475 100644 --- a/docs/release-notes/release-notes-0.20.0.md +++ b/docs/release-notes/release-notes-0.20.0.md @@ -77,6 +77,7 @@ circuit. The indices are only available for forwarding events saved after v0.20. * [2](https://github.com/lightningnetwork/lnd/pull/9869) * [3](https://github.com/lightningnetwork/lnd/pull/9887) * [4](https://github.com/lightningnetwork/lnd/pull/9931) + * [5](https://github.com/lightningnetwork/lnd/pull/9935) ## RPC Updates diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index f681a1f75..789a4c0fb 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -3041,7 +3041,7 @@ func TestIncompleteChannelPolicies(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) // Create two nodes. node1 := createTestVertex(t) @@ -4110,7 +4110,7 @@ func TestBatchedUpdateEdgePolicy(t *testing.T) { // BenchmarkForEachChannel is a benchmark test that measures the number of // allocations and the total memory consumed by the full graph traversal. func BenchmarkForEachChannel(b *testing.B) { - graph := MakeTestGraph(b) + graph := MakeTestGraphNew(b) const numNodes = 100 const numChannels = 4 diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index e7bb18904..22512619c 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -895,6 +895,41 @@ func (s *SQLStore) ForEachNodeCacheable(cb func(route.Vertex, return nil } +// ForEachNodeChannel iterates through all channels of the given node, +// executing the passed callback with an edge info structure and the policies +// of each end of the channel. The first edge policy is the outgoing edge *to* +// the connecting node, while the second is the incoming edge *from* the +// connecting node. If the callback returns an error, then the iteration is +// halted with the error propagated back up to the caller. +// +// Unknown policies are passed into the callback as nil values. +// +// NOTE: part of the V1Store interface. +func (s *SQLStore) ForEachNodeChannel(nodePub route.Vertex, + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy) error) error { + + var ctx = context.TODO() + + return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + dbNode, err := db.GetNodeByPubKey( + ctx, sqlc.GetNodeByPubKeyParams{ + Version: int16(ProtocolV1), + PubKey: nodePub[:], + }, + ) + if errors.Is(err, sql.ErrNoRows) { + return nil + } else if err != nil { + return fmt.Errorf("unable to fetch node: %w", err) + } + + return forEachNodeChannel( + ctx, db, s.cfg.ChainHash, dbNode.ID, cb, + ) + }, sqldb.NoOpReset) +} + // forEachNodeDirectedChannel iterates through all channels of a given // node, executing the passed callback on the directed edge representing the // channel and its incoming policy. If the node is not found, no error is