graph/db: move cache read checks to ChannelGraph.

This commit moves the graph cache checks for FetchNodeFeatures,
ForEachNodeDirectedChannel, GraphSession and ForEachNodeCached from the
KVStore to the ChannelGraph. Since the ChannelGraph is currently just a
pass-through for any of the KVStore methods, all that needs to be done
for calls to go via the ChannelGraph instead directly to the KVStore is
for the ChannelGraph to go and implement those methods.
This commit is contained in:
Elle Mouton 2025-02-18 14:27:00 -03:00
parent 88398e3dd9
commit 9fe9e32c6e
No known key found for this signature in database
GPG Key ID: D7D916376026F177
3 changed files with 68 additions and 30 deletions

View File

@ -91,3 +91,65 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
graphCache: graphCache, graphCache: graphCache,
}, nil }, nil
} }
// ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller. If the graphCache
// is available, then it will be used to retrieve the node's channels instead
// of the database.
//
// Unknown policies are passed into the callback as nil values.
//
// NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
cb func(channel *DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
}
return c.KVStore.ForEachNodeDirectedChannel(node, cb)
}
// FetchNodeFeatures returns the features of the given node. If no features are
// known for the node, an empty feature vector is returned.
// If the graphCache is available, then it will be used to retrieve the node's
// features instead of the database.
//
// NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
*lnwire.FeatureVector, error) {
if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}
return c.KVStore.FetchNodeFeatures(node)
}
// GraphSession will provide the call-back with access to a NodeTraverser
// instance which can be used to perform queries against the channel graph. If
// the graph cache is not enabled, then the call-back will be provided with
// access to the graph via a consistent read-only transaction.
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
if c.graphCache != nil {
return cb(c)
}
return c.KVStore.GraphSession(cb)
}
// ForEachNodeCached iterates through all the stored vertices/nodes in the
// graph, executing the passed callback with each node encountered.
//
// NOTE: The callback contents MUST not be modified.
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachNode(cb)
}
return c.KVStore.ForEachNodeCached(cb)
}

View File

@ -3953,7 +3953,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) {
getSingleChannel := func() *DirectedChannel { getSingleChannel := func() *DirectedChannel {
var ch *DirectedChannel var ch *DirectedChannel
err = graph.forEachNodeDirectedChannel(nil, node1.PubKeyBytes, err = graph.ForEachNodeDirectedChannel(node1.PubKeyBytes,
func(c *DirectedChannel) error { func(c *DirectedChannel) error {
require.Nil(t, ch) require.Nil(t, ch)
ch = c ch = c

View File

@ -480,10 +480,6 @@ func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx, func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
node route.Vertex, cb func(channel *DirectedChannel) error) error { node route.Vertex, cb func(channel *DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
}
// Fallback that uses the database. // Fallback that uses the database.
toNodeCallback := func() route.Vertex { toNodeCallback := func() route.Vertex {
return node return node
@ -539,10 +535,6 @@ func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx, func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
node route.Vertex) (*lnwire.FeatureVector, error) { node route.Vertex) (*lnwire.FeatureVector, error) {
if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}
// Fallback that uses the database. // Fallback that uses the database.
targetNode, err := c.FetchLightningNodeTx(tx, node) targetNode, err := c.FetchLightningNodeTx(tx, node)
switch { switch {
@ -564,9 +556,7 @@ func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
// ForEachNodeDirectedChannel iterates through all channels of a given node, // ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel // executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration // and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller. If the graphCache // is halted with the error propagated back up to the caller.
// is available, then it will be used to retrieve the node's channels instead
// of the database.
// //
// Unknown policies are passed into the callback as nil values. // Unknown policies are passed into the callback as nil values.
// //
@ -579,8 +569,6 @@ func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
// FetchNodeFeatures returns the features of the given node. If no features are // FetchNodeFeatures returns the features of the given node. If no features are
// known for the node, an empty feature vector is returned. // known for the node, an empty feature vector is returned.
// If the graphCache is available, then it will be used to retrieve the node's
// features instead of the database.
// //
// NOTE: this is part of the graphdb.NodeTraverser interface. // NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) ( func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
@ -589,18 +577,13 @@ func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
return c.fetchNodeFeatures(nil, nodePub) return c.fetchNodeFeatures(nil, nodePub)
} }
// ForEachNodeCached is similar to forEachNode, but it utilizes the channel // ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel
// graph cache instead. Note that this doesn't return all the information the // data to the call-back.
// regular forEachNode method does.
// //
// NOTE: The callback contents MUST not be modified. // NOTE: The callback contents MUST not be modified.
func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex, func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error { chans map[uint64]*DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachNode(cb)
}
// Otherwise call back to a version that uses the database directly. // Otherwise call back to a version that uses the database directly.
// We'll iterate over each node, then the set of channels for each // We'll iterate over each node, then the set of channels for each
// node, and construct a similar callback functiopn signature as the // node, and construct a similar callback functiopn signature as the
@ -3883,14 +3866,8 @@ func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
} }
// GraphSession will provide the call-back with access to a NodeTraverser // GraphSession will provide the call-back with access to a NodeTraverser
// instance which can be used to perform queries against the channel graph. If // instance which can be used to perform queries against the channel graph.
// the graph cache is not enabled, then the call-back will be provided with
// access to the graph via a consistent read-only transaction.
func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error { func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
if c.graphCache != nil {
return cb(&nodeTraverserSession{db: c})
}
return c.db.View(func(tx walletdb.ReadTx) error { return c.db.View(func(tx walletdb.ReadTx) error {
return cb(&nodeTraverserSession{ return cb(&nodeTraverserSession{
db: c, db: c,
@ -3900,8 +3877,7 @@ func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
} }
// nodeTraverserSession implements the NodeTraverser interface but with a // nodeTraverserSession implements the NodeTraverser interface but with a
// backing read only transaction for a consistent view of the graph in the case // backing read only transaction for a consistent view of the graph.
// where the graph Cache has not been enabled.
type nodeTraverserSession struct { type nodeTraverserSession struct {
tx kvdb.RTx tx kvdb.RTx
db *KVStore db *KVStore