graph/db: move cache read checks to ChannelGraph.

This commit moves the graph cache checks for FetchNodeFeatures,
ForEachNodeDirectedChannel, GraphSession and ForEachNodeCached from the
KVStore to the ChannelGraph. Since the ChannelGraph is currently just a
pass-through for any of the KVStore methods, all that needs to be done
for calls to go via the ChannelGraph instead directly to the KVStore is
for the ChannelGraph to go and implement those methods.
This commit is contained in:
Elle Mouton 2025-02-18 14:27:00 -03:00
parent ac107ab365
commit 138e846879
No known key found for this signature in database
GPG Key ID: D7D916376026F177
3 changed files with 68 additions and 30 deletions

View File

@ -91,3 +91,65 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
graphCache: graphCache,
}, nil
}
// ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller. If the graphCache
// is available, then it will be used to retrieve the node's channels instead
// of the database.
//
// Unknown policies are passed into the callback as nil values.
//
// NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
cb func(channel *DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
}
return c.KVStore.ForEachNodeDirectedChannel(node, cb)
}
// FetchNodeFeatures returns the features of the given node. If no features are
// known for the node, an empty feature vector is returned.
// If the graphCache is available, then it will be used to retrieve the node's
// features instead of the database.
//
// NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
*lnwire.FeatureVector, error) {
if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}
return c.KVStore.FetchNodeFeatures(node)
}
// GraphSession will provide the call-back with access to a NodeTraverser
// instance which can be used to perform queries against the channel graph. If
// the graph cache is not enabled, then the call-back will be provided with
// access to the graph via a consistent read-only transaction.
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
if c.graphCache != nil {
return cb(c)
}
return c.KVStore.GraphSession(cb)
}
// ForEachNodeCached iterates through all the stored vertices/nodes in the
// graph, executing the passed callback with each node encountered.
//
// NOTE: The callback contents MUST not be modified.
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachNode(cb)
}
return c.KVStore.ForEachNodeCached(cb)
}

View File

@ -3953,7 +3953,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) {
getSingleChannel := func() *DirectedChannel {
var ch *DirectedChannel
err = graph.forEachNodeDirectedChannel(nil, node1.PubKeyBytes,
err = graph.ForEachNodeDirectedChannel(node1.PubKeyBytes,
func(c *DirectedChannel) error {
require.Nil(t, ch)
ch = c

View File

@ -480,10 +480,6 @@ func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
node route.Vertex, cb func(channel *DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
}
// Fallback that uses the database.
toNodeCallback := func() route.Vertex {
return node
@ -539,10 +535,6 @@ func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
node route.Vertex) (*lnwire.FeatureVector, error) {
if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}
// Fallback that uses the database.
targetNode, err := c.FetchLightningNodeTx(tx, node)
switch {
@ -564,9 +556,7 @@ func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
// ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller. If the graphCache
// is available, then it will be used to retrieve the node's channels instead
// of the database.
// is halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
@ -579,8 +569,6 @@ func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
// FetchNodeFeatures returns the features of the given node. If no features are
// known for the node, an empty feature vector is returned.
// If the graphCache is available, then it will be used to retrieve the node's
// features instead of the database.
//
// NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
@ -589,18 +577,13 @@ func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
return c.fetchNodeFeatures(nil, nodePub)
}
// ForEachNodeCached is similar to forEachNode, but it utilizes the channel
// graph cache instead. Note that this doesn't return all the information the
// regular forEachNode method does.
// ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel
// data to the call-back.
//
// NOTE: The callback contents MUST not be modified.
func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachNode(cb)
}
// Otherwise call back to a version that uses the database directly.
// We'll iterate over each node, then the set of channels for each
// node, and construct a similar callback functiopn signature as the
@ -3883,14 +3866,8 @@ func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
}
// GraphSession will provide the call-back with access to a NodeTraverser
// instance which can be used to perform queries against the channel graph. If
// the graph cache is not enabled, then the call-back will be provided with
// access to the graph via a consistent read-only transaction.
// instance which can be used to perform queries against the channel graph.
func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
if c.graphCache != nil {
return cb(&nodeTraverserSession{db: c})
}
return c.db.View(func(tx walletdb.ReadTx) error {
return cb(&nodeTraverserSession{
db: c,
@ -3900,8 +3877,7 @@ func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
}
// nodeTraverserSession implements the NodeTraverser interface but with a
// backing read only transaction for a consistent view of the graph in the case
// where the graph Cache has not been enabled.
// backing read only transaction for a consistent view of the graph.
type nodeTraverserSession struct {
tx kvdb.RTx
db *KVStore