graph/db: impl ForEachChannelCacheable

This commit is contained in:
Elle Mouton
2025-06-11 18:07:17 +02:00
parent 34cb6a9f27
commit 23c8f7099c
2 changed files with 98 additions and 0 deletions

View File

@@ -89,6 +89,7 @@ circuit. The indices are only available for forwarding events saved after v0.20.
* [7](https://github.com/lightningnetwork/lnd/pull/9937)
* [8](https://github.com/lightningnetwork/lnd/pull/9938)
* [9](https://github.com/lightningnetwork/lnd/pull/9939)
* [10](https://github.com/lightningnetwork/lnd/pull/9971)
## RPC Updates

View File

@@ -1238,6 +1238,103 @@ func (s *SQLStore) ForEachNodeCached(cb func(node route.Vertex,
}, sqldb.NoOpReset)
}
// ForEachChannelCacheable iterates through all the channel edges stored
// within the graph and invokes the passed callback for each edge. The
// callback takes two edges as since this is a directed graph, both the
// in/out edges are visited. If the callback returns an error, then the
// transaction is aborted and the iteration stops early.
//
// NOTE: If an edge can't be found, or wasn't advertised, then a nil
// pointer for that particular channel edge routing policy will be
// passed into the callback.
//
// NOTE: this method is like ForEachChannel but fetches only the data
// required for the graph cache.
func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
*models.CachedEdgePolicy,
*models.CachedEdgePolicy) error) error {
ctx := context.TODO()
handleChannel := func(db SQLQueries,
row sqlc.ListChannelsWithPoliciesPaginatedRow) error {
node1, node2, err := buildNodeVertices(
row.Node1Pubkey, row.Node2Pubkey,
)
if err != nil {
return err
}
edge := buildCacheableChannelInfo(row.Channel, node1, node2)
dbPol1, dbPol2, err := extractChannelPolicies(row)
if err != nil {
return err
}
var pol1, pol2 *models.CachedEdgePolicy
if dbPol1 != nil {
policy1, err := buildChanPolicy(
*dbPol1, edge.ChannelID, nil, node2, true,
)
if err != nil {
return err
}
pol1 = models.NewCachedPolicy(policy1)
}
if dbPol2 != nil {
policy2, err := buildChanPolicy(
*dbPol2, edge.ChannelID, nil, node1, false,
)
if err != nil {
return err
}
pol2 = models.NewCachedPolicy(policy2)
}
if err := cb(edge, pol1, pol2); err != nil {
return err
}
return nil
}
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
lastID := int64(-1)
for {
//nolint:ll
rows, err := db.ListChannelsWithPoliciesPaginated(
ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{
Version: int16(ProtocolV1),
ID: lastID,
Limit: pageSize,
},
)
if err != nil {
return err
}
if len(rows) == 0 {
break
}
for _, row := range rows {
err := handleChannel(db, row)
if err != nil {
return err
}
lastID = row.Channel.ID
}
}
return nil
}, sqldb.NoOpReset)
}
// ForEachChannel iterates through all the channel edges stored within the
// graph and invokes the passed callback for each edge. The callback takes two
// edges as since this is a directed graph, both the in/out edges are visited.