graph/db: implement ForEachNodeChannel

Which lets us then run `TestIncompleteChannelPolicies` and
`BenchmarkForEachChannel` against our SQL backends.
This commit is contained in:
Elle Mouton
2025-06-11 16:27:09 +02:00
parent 0607982886
commit ece157b40b
3 changed files with 38 additions and 2 deletions

View File

@@ -77,6 +77,7 @@ circuit. The indices are only available for forwarding events saved after v0.20.
* [2](https://github.com/lightningnetwork/lnd/pull/9869)
* [3](https://github.com/lightningnetwork/lnd/pull/9887)
* [4](https://github.com/lightningnetwork/lnd/pull/9931)
* [5](https://github.com/lightningnetwork/lnd/pull/9935)
## RPC Updates

View File

@@ -3041,7 +3041,7 @@ func TestIncompleteChannelPolicies(t *testing.T) {
t.Parallel()
ctx := context.Background()
graph := MakeTestGraph(t)
graph := MakeTestGraphNew(t)
// Create two nodes.
node1 := createTestVertex(t)
@@ -4110,7 +4110,7 @@ func TestBatchedUpdateEdgePolicy(t *testing.T) {
// BenchmarkForEachChannel is a benchmark test that measures the number of
// allocations and the total memory consumed by the full graph traversal.
func BenchmarkForEachChannel(b *testing.B) {
graph := MakeTestGraph(b)
graph := MakeTestGraphNew(b)
const numNodes = 100
const numChannels = 4

View File

@@ -895,6 +895,41 @@ func (s *SQLStore) ForEachNodeCacheable(cb func(route.Vertex,
return nil
}
// ForEachNodeChannel iterates through all channels of the given node,
// executing the passed callback with an edge info structure and the policies
// of each end of the channel. The first edge policy is the outgoing edge *to*
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
// NOTE: part of the V1Store interface.
func (s *SQLStore) ForEachNodeChannel(nodePub route.Vertex,
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {
var ctx = context.TODO()
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
dbNode, err := db.GetNodeByPubKey(
ctx, sqlc.GetNodeByPubKeyParams{
Version: int16(ProtocolV1),
PubKey: nodePub[:],
},
)
if errors.Is(err, sql.ErrNoRows) {
return nil
} else if err != nil {
return fmt.Errorf("unable to fetch node: %w", err)
}
return forEachNodeChannel(
ctx, db, s.cfg.ChainHash, dbNode.ID, cb,
)
}, sqldb.NoOpReset)
}
// forEachNodeDirectedChannel iterates through all channels of a given
// node, executing the passed callback on the directed edge representing the
// channel and its incoming policy. If the node is not found, no error is