graph/db: move various cache write calls to ChannelGraph

Here, we move the graph cache writes for AddLightningNode,
DeleteLightningNode, AddChannelEdge and MarkEdgeLive to the
ChannelGraph. Since these are writes, the cache is only updated if the
DB write is successful.
This commit is contained in:
Elle Mouton
2025-02-18 14:48:46 -03:00
parent 9fe9e32c6e
commit f75e6a1c10
3 changed files with 107 additions and 30 deletions

View File

@@ -1,8 +1,10 @@
package graphdb
import (
"sync"
"time"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
@@ -27,6 +29,10 @@ type Config struct {
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
// into this layer so that the KVStore is only responsible for CRUD operations.
type ChannelGraph struct {
// cacheMu guards any writes to the graphCache. It should be held
// across the DB write call and the graphCache update to make the
// two updates as atomic as possible.
cacheMu sync.Mutex
graphCache *GraphCache
*KVStore
@@ -153,3 +159,103 @@ func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
return c.KVStore.ForEachNodeCached(cb)
}
// AddLightningNode adds a vertex/node to the graph database. If the node is not
// in the database from before, this will add a new, unconnected one to the
// graph. If it is present from before, this will update that node's
// information. Note that this method is expected to only be called to update an
// already present node from a node announcement, or to insert a node found in a
// channel update.
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
op ...batch.SchedulerOption) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.KVStore.AddLightningNode(node, op...)
if err != nil {
return err
}
if c.graphCache != nil {
c.graphCache.AddNodeFeatures(
node.PubKeyBytes, node.Features,
)
}
return nil
}
// DeleteLightningNode starts a new database transaction to remove a vertex/node
// from the database according to the node's public key.
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.KVStore.DeleteLightningNode(nodePub)
if err != nil {
return err
}
if c.graphCache != nil {
c.graphCache.RemoveNode(nodePub)
}
return nil
}
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
// undirected edge from the two target nodes are created. The information stored
// denotes the static attributes of the channel, such as the channelID, the keys
// involved in creation of the channel, and the set of features that the channel
// supports. The chanPoint and chanID are used to uniquely identify the edge
// globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.KVStore.AddChannelEdge(edge, op...)
if err != nil {
return err
}
if c.graphCache != nil {
c.graphCache.AddChannel(edge, nil, nil)
}
return nil
}
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
// If the cache is enabled, the edge will be added back to the graph cache if
// we still have a record of this channel in the DB.
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.KVStore.MarkEdgeLive(chanID)
if err != nil {
return err
}
if c.graphCache != nil {
// We need to add the channel back into our graph cache,
// otherwise we won't use it for path finding.
infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
if err != nil {
return err
}
if len(infos) == 0 {
return nil
}
info := infos[0]
c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
}
return nil
}

View File

@@ -884,12 +884,6 @@ func (c *KVStore) AddLightningNode(node *models.LightningNode,
r := &batch.Request{
Update: func(tx kvdb.RwTx) error {
if c.graphCache != nil {
c.graphCache.AddNodeFeatures(
node.PubKeyBytes, node.Features,
)
}
return addLightningNode(tx, node)
},
}
@@ -969,10 +963,6 @@ func (c *KVStore) DeleteLightningNode(nodePub route.Vertex) error {
return ErrGraphNodeNotFound
}
if c.graphCache != nil {
c.graphCache.RemoveNode(nodePub)
}
return c.deleteLightningNode(nodes, nodePub[:])
}, func() {})
}
@@ -1104,10 +1094,6 @@ func (c *KVStore) addChannelEdge(tx kvdb.RwTx,
return ErrEdgeAlreadyExist
}
if c.graphCache != nil {
c.graphCache.AddChannel(edge, nil, nil)
}
// Before we insert the channel into the database, we'll ensure that
// both nodes already exist in the channel graph. If either node
// doesn't, then we'll insert a "shell" node that just includes its
@@ -3717,22 +3703,6 @@ func (c *KVStore) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)
// We need to add the channel back into our graph cache, otherwise we
// won't use it for path finding.
if c.graphCache != nil {
edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID})
if err != nil {
return err
}
for _, edgeInfo := range edgeInfos {
c.graphCache.AddChannel(
edgeInfo.Info, edgeInfo.Policy1,
edgeInfo.Policy2,
)
}
}
return nil
}