graph/db: expose KVStore locations where a reset is needed

This commit passes no-op reset callbacks to various places where they
are needed in the KVStore. Upcoming commits will replace the no-ops by
adding reset params to the methods that require it.
This commit is contained in:
Elle Mouton
2025-07-11 09:50:42 +02:00
parent 302551ade0
commit 57d4ce0f9b
2 changed files with 78 additions and 32 deletions

View File

@@ -406,9 +406,11 @@ func (c *KVStore) AddrsForNode(ctx context.Context,
// callback.
func (c *KVStore) ForEachChannel(_ context.Context,
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {
*models.ChannelEdgePolicy) error) error {
return forEachChannel(c.db, cb)
reset := func() {}
return forEachChannel(c.db, cb, reset)
}
// forEachChannel iterates through all the channel edges stored within the
@@ -421,7 +423,8 @@ func (c *KVStore) ForEachChannel(_ context.Context,
// for that particular channel edge routing policy will be passed into the
// callback.
func forEachChannel(db kvdb.Backend, cb func(*models.ChannelEdgeInfo,
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error,
reset func()) error {
return db.View(func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
@@ -469,7 +472,7 @@ func forEachChannel(db kvdb.Backend, cb func(*models.ChannelEdgeInfo,
return cb(&info, policy1, policy2)
},
)
}, func() {})
}, reset)
}
// ForEachChannelCacheable iterates through all the channel edges stored within
@@ -487,6 +490,8 @@ func forEachChannel(db kvdb.Backend, cb func(*models.ChannelEdgeInfo,
func (c *KVStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
*models.CachedEdgePolicy, *models.CachedEdgePolicy) error) error {
reset := func() {}
return c.db.View(func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
@@ -558,7 +563,7 @@ func (c *KVStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
)
},
)
}, func() {})
}, reset)
}
// forEachNodeDirectedChannel iterates through all channels of a given node,
@@ -568,8 +573,13 @@ func (c *KVStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
// transaction may be provided. If none is provided, a new one will be created.
//
// Unknown policies are passed into the callback as nil values.
//
// NOTE: the reset param is only meaningful if the tx param is nil. If it is
// not nil, the caller is expected to have passed in a reset to the parent
// function's View/Update call which will then apply to the whole transaction.
func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
node route.Vertex, cb func(channel *DirectedChannel) error) error {
node route.Vertex, cb func(channel *DirectedChannel) error,
reset func()) error {
// Fallback that uses the database.
toNodeCallback := func() route.Vertex {
@@ -612,7 +622,7 @@ func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
return cb(directedChannel)
}
return nodeTraversal(tx, node[:], c.db, dbCallback)
return nodeTraversal(tx, node[:], c.db, dbCallback, reset)
}
// fetchNodeFeatures returns the features of a given node. If no features are
@@ -650,7 +660,9 @@ func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
cb func(channel *DirectedChannel) error) error {
return c.forEachNodeDirectedChannel(nil, nodePub, cb)
reset := func() {}
return c.forEachNodeDirectedChannel(nil, nodePub, cb, reset)
}
// FetchNodeFeatures returns the features of the given node. If no features are
@@ -669,7 +681,9 @@ func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
// NOTE: The callback contents MUST not be modified.
func (c *KVStore) ForEachNodeCached(_ context.Context,
cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error {
chans map[uint64]*DirectedChannel) error) error {
reset := func() {}
// Otherwise call back to a version that uses the database directly.
// We'll iterate over each node, then the set of channels for each
@@ -723,13 +737,14 @@ func (c *KVStore) ForEachNodeCached(_ context.Context,
channels[e.ChannelID] = directedChannel
return nil
})
}, reset,
)
if err != nil {
return err
}
return cb(node.PubKeyBytes, channels)
})
}, reset)
}
// DisabledChannelIDs returns the channel ids of disabled channels.
@@ -793,11 +808,13 @@ func (c *KVStore) DisabledChannelIDs() ([]uint64, error) {
func (c *KVStore) ForEachNode(_ context.Context,
cb func(tx NodeRTx) error) error {
reset := func() {}
return forEachNode(c.db, func(tx kvdb.RTx,
node *models.LightningNode) error {
return cb(newChanGraphNodeTx(tx, c, node))
})
}, reset)
}
// forEachNode iterates through all the stored vertices/nodes in the graph,
@@ -808,7 +825,7 @@ func (c *KVStore) ForEachNode(_ context.Context,
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
// traversal when graph gets mega.
func forEachNode(db kvdb.Backend,
cb func(kvdb.RTx, *models.LightningNode) error) error {
cb func(kvdb.RTx, *models.LightningNode) error, reset func()) error {
traversal := func(tx kvdb.RTx) error {
// First grab the nodes bucket which stores the mapping from
@@ -838,7 +855,7 @@ func forEachNode(db kvdb.Backend,
})
}
return kvdb.View(db, traversal, func() {})
return kvdb.View(db, traversal, reset)
}
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
@@ -848,6 +865,8 @@ func forEachNode(db kvdb.Backend,
func (c *KVStore) ForEachNodeCacheable(_ context.Context,
cb func(route.Vertex, *lnwire.FeatureVector) error) error {
reset := func() {}
traversal := func(tx kvdb.RTx) error {
// First grab the nodes bucket which stores the mapping from
// pubKey to node information.
@@ -878,7 +897,7 @@ func (c *KVStore) ForEachNodeCacheable(_ context.Context,
})
}
return kvdb.View(c.db, traversal, func() {})
return kvdb.View(c.db, traversal, reset)
}
// SourceNode returns the source node of the graph. The source node is treated
@@ -3018,6 +3037,8 @@ func (c *KVStore) isPublic(tx kvdb.RTx, nodePub route.Vertex,
// Otherwise, we'll continue our search.
return nil
}, func() {
nodeIsPublic = false
})
if err != nil && !errors.Is(err, errDone) {
return false, err
@@ -3157,9 +3178,13 @@ func (c *KVStore) HasLightningNode(_ context.Context,
// nodeTraversal is used to traverse all channels of a node given by its
// public key and passes channel information into the specified callback.
//
// NOTE: the reset param is only meaningful if the tx param is nil. If it is
// not nil, the caller is expected to have passed in a reset to the parent
// function's View/Update call which will then apply to the whole transaction.
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {
*models.ChannelEdgePolicy) error, reset func()) error {
traversal := func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
@@ -3230,7 +3255,7 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
// If no transaction was provided, then we'll create a new transaction
// to execute the transaction within.
if tx == nil {
return kvdb.View(db, traversal, func() {})
return kvdb.View(db, traversal, reset)
}
// Otherwise, we re-use the existing transaction to execute the graph
@@ -3248,14 +3273,18 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
// Unknown policies are passed into the callback as nil values.
func (c *KVStore) ForEachNodeChannel(_ context.Context, nodePub route.Vertex,
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {
*models.ChannelEdgePolicy) error) error {
return nodeTraversal(nil, nodePub[:], c.db, func(_ kvdb.RTx,
info *models.ChannelEdgeInfo, policy,
policy2 *models.ChannelEdgePolicy) error {
reset := func() {}
return cb(info, policy, policy2)
})
return nodeTraversal(
nil, nodePub[:], c.db, func(_ kvdb.RTx,
info *models.ChannelEdgeInfo, policy,
policy2 *models.ChannelEdgePolicy) error {
return cb(info, policy, policy2)
}, reset,
)
}
// ForEachSourceNodeChannel iterates through all channels of the source node,
@@ -3264,7 +3293,9 @@ func (c *KVStore) ForEachNodeChannel(_ context.Context, nodePub route.Vertex,
// peer's node information.
func (c *KVStore) ForEachSourceNodeChannel(_ context.Context,
cb func(chanPoint wire.OutPoint, havePolicy bool,
otherNode *models.LightningNode) error) error {
otherNode *models.LightningNode) error) error {
reset := func() {}
return kvdb.View(c.db, func(tx kvdb.RTx) error {
nodes := tx.ReadBucket(nodeBucket)
@@ -3292,9 +3323,9 @@ func (c *KVStore) ForEachSourceNodeChannel(_ context.Context,
return cb(
info.ChannelPoint, policy != nil, peer,
)
},
}, reset,
)
}, func() {})
}, reset)
}
// forEachNodeChannelTx iterates through all channels of the given node,
@@ -3310,12 +3341,14 @@ func (c *KVStore) ForEachSourceNodeChannel(_ context.Context,
// should be passed as the first argument. Otherwise, the first argument should
// be nil and a fresh transaction will be created to execute the graph
// traversal.
//
// NOTE: the reset function is only meaningful if the tx param is nil.
func (c *KVStore) forEachNodeChannelTx(tx kvdb.RTx,
nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
*models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error {
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error,
reset func()) error {
return nodeTraversal(tx, nodePub[:], c.db, cb)
return nodeTraversal(tx, nodePub[:], c.db, cb, reset)
}
// fetchOtherNode attempts to fetch the full LightningNode that's opposite of
@@ -3956,12 +3989,14 @@ func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
// GraphSession will provide the call-back with access to a NodeTraverser
// instance which can be used to perform queries against the channel graph.
func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
reset := func() {}
return c.db.View(func(tx walletdb.ReadTx) error {
return cb(&nodeTraverserSession{
db: c,
tx: tx,
})
}, func() {})
}, reset)
}
// nodeTraverserSession implements the NodeTraverser interface but with a
@@ -3978,7 +4013,7 @@ type nodeTraverserSession struct {
func (c *nodeTraverserSession) ForEachNodeDirectedChannel(nodePub route.Vertex,
cb func(channel *DirectedChannel) error) error {
return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb)
return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb, func() {})
}
// FetchNodeFeatures returns the features of the given node. If the node is
@@ -4902,11 +4937,16 @@ func (c *chanGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) {
func (c *chanGraphNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo,
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
return c.db.forEachNodeChannelTx(c.tx, c.node.PubKeyBytes,
return c.db.forEachNodeChannelTx(
c.tx, c.node.PubKeyBytes,
func(_ kvdb.RTx, info *models.ChannelEdgeInfo, policy1,
policy2 *models.ChannelEdgePolicy) error {
return f(info, policy1, policy2)
},
// NOTE: We don't need to reset anything here as the caller is
// expected to pass in the reset function to the ForEachNode
// method that constructed the chanGraphNodeTx.
func() {},
)
}

View File

@@ -206,6 +206,9 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
return sqldb.CompareRecords(
node, migratedNode, fmt.Sprintf("node %x", pub),
)
}, func() {
// No reset is needed since if a retry occurs, the entire
// migration will be retried from the start.
})
if err != nil {
return fmt.Errorf("could not migrate nodes: %w", err)
@@ -384,6 +387,9 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
}
return nil
}, func() {
// No reset is needed since if a retry occurs, the entire
// migration will be retried from the start.
})
if err != nil {
return fmt.Errorf("could not migrate channels and policies: %w",