From 57d4ce0f9bee3e85ffd2111b76c8a1aceb0d76d0 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Jul 2025 09:50:42 +0200 Subject: [PATCH] graph/db: expose KVStore locations where a reset is needed This commit passes no-op reset callbacks to various places where they are needed in the KVStore. Upcoming commits will replace the no-ops by adding reset params to the methods that require it. --- graph/db/kv_store.go | 104 ++++++++++++++++++++++++++------------ graph/db/sql_migration.go | 6 +++ 2 files changed, 78 insertions(+), 32 deletions(-) diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 6ca9cdedb..92700376d 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -406,9 +406,11 @@ func (c *KVStore) AddrsForNode(ctx context.Context, // callback. func (c *KVStore) ForEachChannel(_ context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error { + *models.ChannelEdgePolicy) error) error { - return forEachChannel(c.db, cb) + reset := func() {} + + return forEachChannel(c.db, cb, reset) } // forEachChannel iterates through all the channel edges stored within the @@ -421,7 +423,8 @@ func (c *KVStore) ForEachChannel(_ context.Context, // for that particular channel edge routing policy will be passed into the // callback. func forEachChannel(db kvdb.Backend, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + reset func()) error { return db.View(func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) @@ -469,7 +472,7 @@ func forEachChannel(db kvdb.Backend, cb func(*models.ChannelEdgeInfo, return cb(&info, policy1, policy2) }, ) - }, func() {}) + }, reset) } // ForEachChannelCacheable iterates through all the channel edges stored within @@ -487,6 +490,8 @@ func forEachChannel(db kvdb.Backend, cb func(*models.ChannelEdgeInfo, func (c *KVStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo, *models.CachedEdgePolicy, *models.CachedEdgePolicy) error) error { + reset := func() {} + return c.db.View(func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) if edges == nil { @@ -558,7 +563,7 @@ func (c *KVStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo, ) }, ) - }, func() {}) + }, reset) } // forEachNodeDirectedChannel iterates through all channels of a given node, @@ -568,8 +573,13 @@ func (c *KVStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo, // transaction may be provided. If none is provided, a new one will be created. // // Unknown policies are passed into the callback as nil values. +// +// NOTE: the reset param is only meaningful if the tx param is nil. If it is +// not nil, the caller is expected to have passed in a reset to the parent +// function's View/Update call which will then apply to the whole transaction. func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx, - node route.Vertex, cb func(channel *DirectedChannel) error) error { + node route.Vertex, cb func(channel *DirectedChannel) error, + reset func()) error { // Fallback that uses the database. toNodeCallback := func() route.Vertex { @@ -612,7 +622,7 @@ func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx, return cb(directedChannel) } - return nodeTraversal(tx, node[:], c.db, dbCallback) + return nodeTraversal(tx, node[:], c.db, dbCallback, reset) } // fetchNodeFeatures returns the features of a given node. If no features are @@ -650,7 +660,9 @@ func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx, func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex, cb func(channel *DirectedChannel) error) error { - return c.forEachNodeDirectedChannel(nil, nodePub, cb) + reset := func() {} + + return c.forEachNodeDirectedChannel(nil, nodePub, cb, reset) } // FetchNodeFeatures returns the features of the given node. If no features are @@ -669,7 +681,9 @@ func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) ( // NOTE: The callback contents MUST not be modified. func (c *KVStore) ForEachNodeCached(_ context.Context, cb func(node route.Vertex, - chans map[uint64]*DirectedChannel) error) error { + chans map[uint64]*DirectedChannel) error) error { + + reset := func() {} // Otherwise call back to a version that uses the database directly. // We'll iterate over each node, then the set of channels for each @@ -723,13 +737,14 @@ func (c *KVStore) ForEachNodeCached(_ context.Context, channels[e.ChannelID] = directedChannel return nil - }) + }, reset, + ) if err != nil { return err } return cb(node.PubKeyBytes, channels) - }) + }, reset) } // DisabledChannelIDs returns the channel ids of disabled channels. @@ -793,11 +808,13 @@ func (c *KVStore) DisabledChannelIDs() ([]uint64, error) { func (c *KVStore) ForEachNode(_ context.Context, cb func(tx NodeRTx) error) error { + reset := func() {} + return forEachNode(c.db, func(tx kvdb.RTx, node *models.LightningNode) error { return cb(newChanGraphNodeTx(tx, c, node)) - }) + }, reset) } // forEachNode iterates through all the stored vertices/nodes in the graph, @@ -808,7 +825,7 @@ func (c *KVStore) ForEachNode(_ context.Context, // TODO(roasbeef): add iterator interface to allow for memory efficient graph // traversal when graph gets mega. func forEachNode(db kvdb.Backend, - cb func(kvdb.RTx, *models.LightningNode) error) error { + cb func(kvdb.RTx, *models.LightningNode) error, reset func()) error { traversal := func(tx kvdb.RTx) error { // First grab the nodes bucket which stores the mapping from @@ -838,7 +855,7 @@ func forEachNode(db kvdb.Backend, }) } - return kvdb.View(db, traversal, func() {}) + return kvdb.View(db, traversal, reset) } // ForEachNodeCacheable iterates through all the stored vertices/nodes in the @@ -848,6 +865,8 @@ func forEachNode(db kvdb.Backend, func (c *KVStore) ForEachNodeCacheable(_ context.Context, cb func(route.Vertex, *lnwire.FeatureVector) error) error { + reset := func() {} + traversal := func(tx kvdb.RTx) error { // First grab the nodes bucket which stores the mapping from // pubKey to node information. @@ -878,7 +897,7 @@ func (c *KVStore) ForEachNodeCacheable(_ context.Context, }) } - return kvdb.View(c.db, traversal, func() {}) + return kvdb.View(c.db, traversal, reset) } // SourceNode returns the source node of the graph. The source node is treated @@ -3018,6 +3037,8 @@ func (c *KVStore) isPublic(tx kvdb.RTx, nodePub route.Vertex, // Otherwise, we'll continue our search. return nil + }, func() { + nodeIsPublic = false }) if err != nil && !errors.Is(err, errDone) { return false, err @@ -3157,9 +3178,13 @@ func (c *KVStore) HasLightningNode(_ context.Context, // nodeTraversal is used to traverse all channels of a node given by its // public key and passes channel information into the specified callback. +// +// NOTE: the reset param is only meaningful if the tx param is nil. If it is +// not nil, the caller is expected to have passed in a reset to the parent +// function's View/Update call which will then apply to the whole transaction. func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend, cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error { + *models.ChannelEdgePolicy) error, reset func()) error { traversal := func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) @@ -3230,7 +3255,7 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend, // If no transaction was provided, then we'll create a new transaction // to execute the transaction within. if tx == nil { - return kvdb.View(db, traversal, func() {}) + return kvdb.View(db, traversal, reset) } // Otherwise, we re-use the existing transaction to execute the graph @@ -3248,14 +3273,18 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend, // Unknown policies are passed into the callback as nil values. func (c *KVStore) ForEachNodeChannel(_ context.Context, nodePub route.Vertex, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error { + *models.ChannelEdgePolicy) error) error { - return nodeTraversal(nil, nodePub[:], c.db, func(_ kvdb.RTx, - info *models.ChannelEdgeInfo, policy, - policy2 *models.ChannelEdgePolicy) error { + reset := func() {} - return cb(info, policy, policy2) - }) + return nodeTraversal( + nil, nodePub[:], c.db, func(_ kvdb.RTx, + info *models.ChannelEdgeInfo, policy, + policy2 *models.ChannelEdgePolicy) error { + + return cb(info, policy, policy2) + }, reset, + ) } // ForEachSourceNodeChannel iterates through all channels of the source node, @@ -3264,7 +3293,9 @@ func (c *KVStore) ForEachNodeChannel(_ context.Context, nodePub route.Vertex, // peer's node information. func (c *KVStore) ForEachSourceNodeChannel(_ context.Context, cb func(chanPoint wire.OutPoint, havePolicy bool, - otherNode *models.LightningNode) error) error { + otherNode *models.LightningNode) error) error { + + reset := func() {} return kvdb.View(c.db, func(tx kvdb.RTx) error { nodes := tx.ReadBucket(nodeBucket) @@ -3292,9 +3323,9 @@ func (c *KVStore) ForEachSourceNodeChannel(_ context.Context, return cb( info.ChannelPoint, policy != nil, peer, ) - }, + }, reset, ) - }, func() {}) + }, reset) } // forEachNodeChannelTx iterates through all channels of the given node, @@ -3310,12 +3341,14 @@ func (c *KVStore) ForEachSourceNodeChannel(_ context.Context, // should be passed as the first argument. Otherwise, the first argument should // be nil and a fresh transaction will be created to execute the graph // traversal. +// +// NOTE: the reset function is only meaningful if the tx param is nil. func (c *KVStore) forEachNodeChannelTx(tx kvdb.RTx, nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error { + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + reset func()) error { - return nodeTraversal(tx, nodePub[:], c.db, cb) + return nodeTraversal(tx, nodePub[:], c.db, cb, reset) } // fetchOtherNode attempts to fetch the full LightningNode that's opposite of @@ -3956,12 +3989,14 @@ func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) { // GraphSession will provide the call-back with access to a NodeTraverser // instance which can be used to perform queries against the channel graph. func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error { + reset := func() {} + return c.db.View(func(tx walletdb.ReadTx) error { return cb(&nodeTraverserSession{ db: c, tx: tx, }) - }, func() {}) + }, reset) } // nodeTraverserSession implements the NodeTraverser interface but with a @@ -3978,7 +4013,7 @@ type nodeTraverserSession struct { func (c *nodeTraverserSession) ForEachNodeDirectedChannel(nodePub route.Vertex, cb func(channel *DirectedChannel) error) error { - return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb) + return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb, func() {}) } // FetchNodeFeatures returns the features of the given node. If the node is @@ -4902,11 +4937,16 @@ func (c *chanGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) { func (c *chanGraphNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { - return c.db.forEachNodeChannelTx(c.tx, c.node.PubKeyBytes, + return c.db.forEachNodeChannelTx( + c.tx, c.node.PubKeyBytes, func(_ kvdb.RTx, info *models.ChannelEdgeInfo, policy1, policy2 *models.ChannelEdgePolicy) error { return f(info, policy1, policy2) }, + // NOTE: We don't need to reset anything here as the caller is + // expected to pass in the reset function to the ForEachNode + // method that constructed the chanGraphNodeTx. + func() {}, ) } diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index dfe5eb985..6cc6ab2ff 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -206,6 +206,9 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, return sqldb.CompareRecords( node, migratedNode, fmt.Sprintf("node %x", pub), ) + }, func() { + // No reset is needed since if a retry occurs, the entire + // migration will be retried from the start. }) if err != nil { return fmt.Errorf("could not migrate nodes: %w", err) @@ -384,6 +387,9 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend, } return nil + }, func() { + // No reset is needed since if a retry occurs, the entire + // migration will be retried from the start. }) if err != nil { return fmt.Errorf("could not migrate channels and policies: %w",