channeldb: implement DB fallback for path finding

If the in-memory graph cache is disabled, we fall back to querying the
database.
This commit is contained in:
Oliver Gugger 2021-10-21 13:55:21 +02:00
parent f216da32f3
commit 1fef2970cf
No known key found for this signature in database
GPG Key ID: 8E4256593F177720

View File

@ -199,7 +199,6 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
db: db,
rejectCache: newRejectCache(rejectCacheSize),
chanCache: newChannelCache(chanCacheSize),
graphCache: NewGraphCache(preAllocCacheNumNodes),
}
g.chanScheduler = batch.NewTimeScheduler(
db, &g.cacheMu, batchCommitInterval,
@ -208,18 +207,25 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
db, nil, batchCommitInterval,
)
startTime := time.Now()
log.Debugf("Populating in-memory channel graph, this might take a " +
"while...")
err := g.ForEachNodeCacheable(func(tx kvdb.RTx, node GraphCacheNode) error {
return g.graphCache.AddNode(tx, node)
})
if err != nil {
return nil, err
}
// The graph cache can be turned off (e.g. for mobile users) for a
// speed/memory usage tradeoff.
if useGraphCache {
g.graphCache = NewGraphCache(preAllocCacheNumNodes)
startTime := time.Now()
log.Debugf("Populating in-memory channel graph, this might " +
"take a while...")
err := g.ForEachNodeCacheable(
func(tx kvdb.RTx, node GraphCacheNode) error {
return g.graphCache.AddNode(tx, node)
},
)
if err != nil {
return nil, err
}
log.Debugf("Finished populating in-memory channel graph (took %v, %s)",
time.Since(startTime), g.graphCache.Stats())
log.Debugf("Finished populating in-memory channel graph (took "+
"%v, %s)", time.Since(startTime), g.graphCache.Stats())
}
return g, nil
}
@ -373,7 +379,42 @@ func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo,
func (c *ChannelGraph) ForEachNodeChannel(node route.Vertex,
cb func(channel *DirectedChannel) error) error {
return c.graphCache.ForEachChannel(node, cb)
if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
}
// Fallback that uses the database.
toNodeCallback := func() route.Vertex {
return node
}
toNodeFeatures, err := c.FetchNodeFeatures(node)
if err != nil {
return err
}
dbCallback := func(tx kvdb.RTx, e *ChannelEdgeInfo, p1,
p2 *ChannelEdgePolicy) error {
cachedInPolicy := NewCachedPolicy(p2)
cachedInPolicy.ToNodePubKey = toNodeCallback
cachedInPolicy.ToNodeFeatures = toNodeFeatures
directedChannel := &DirectedChannel{
ChannelID: e.ChannelID,
IsNode1: node == e.NodeKey1Bytes,
OtherNode: e.NodeKey2Bytes,
Capacity: e.Capacity,
OutPolicySet: p1 != nil,
InPolicy: cachedInPolicy,
}
if node == e.NodeKey2Bytes {
directedChannel.OtherNode = e.NodeKey1Bytes
}
return cb(directedChannel)
}
return nodeTraversal(nil, node[:], c.db, dbCallback)
}
// FetchNodeFeatures returns the features of a given node. If no features are
@ -381,7 +422,27 @@ func (c *ChannelGraph) ForEachNodeChannel(node route.Vertex,
func (c *ChannelGraph) FetchNodeFeatures(
node route.Vertex) (*lnwire.FeatureVector, error) {
return c.graphCache.GetFeatures(node), nil
if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}
// Fallback that uses the database.
targetNode, err := c.FetchLightningNode(node)
switch err {
// If the node exists and has features, return them directly.
case nil:
return targetNode.Features, nil
// If we couldn't find a node announcement, populate a blank feature
// vector.
case ErrGraphNodeNotFound:
return lnwire.EmptyFeatureVector(), nil
// Otherwise, bubble the error up.
default:
return nil, err
}
}
// DisabledChannelIDs returns the channel ids of disabled channels.
@ -601,11 +662,14 @@ func (c *ChannelGraph) AddLightningNode(node *LightningNode,
r := &batch.Request{
Update: func(tx kvdb.RwTx) error {
cNode := newGraphCacheNode(
node.PubKeyBytes, node.Features,
)
if err := c.graphCache.AddNode(tx, cNode); err != nil {
return err
if c.graphCache != nil {
cNode := newGraphCacheNode(
node.PubKeyBytes, node.Features,
)
err := c.graphCache.AddNode(tx, cNode)
if err != nil {
return err
}
}
return addLightningNode(tx, node)
@ -686,7 +750,9 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
return ErrGraphNodeNotFound
}
c.graphCache.RemoveNode(nodePub)
if c.graphCache != nil {
c.graphCache.RemoveNode(nodePub)
}
return c.deleteLightningNode(nodes, nodePub[:])
}, func() {})
@ -814,7 +880,9 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx, edge *ChannelEdgeInfo) error
return ErrEdgeAlreadyExist
}
c.graphCache.AddChannel(edge, nil, nil)
if c.graphCache != nil {
c.graphCache.AddChannel(edge, nil, nil)
}
// Before we insert the channel into the database, we'll ensure that
// both nodes already exist in the channel graph. If either node
@ -1015,7 +1083,9 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error {
return ErrEdgeNotFound
}
c.graphCache.UpdateChannel(edge)
if c.graphCache != nil {
c.graphCache.UpdateChannel(edge)
}
return putChanEdgeInfo(edgeIndex, edge, chanKey)
}, func() {})
@ -1153,7 +1223,10 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
c.chanCache.remove(channel.ChannelID)
}
log.Debugf("Pruned graph, cache now has %s", c.graphCache.Stats())
if c.graphCache != nil {
log.Debugf("Pruned graph, cache now has %s",
c.graphCache.Stats())
}
return chansClosed, nil
}
@ -1255,7 +1328,9 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
continue
}
c.graphCache.RemoveNode(nodePubKey)
if c.graphCache != nil {
c.graphCache.RemoveNode(nodePubKey)
}
// If we reach this point, then there are no longer any edges
// that connect this node, so we can delete it.
@ -2100,10 +2175,12 @@ func (c *ChannelGraph) delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
return err
}
c.graphCache.RemoveChannel(
edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
edgeInfo.ChannelID,
)
if c.graphCache != nil {
c.graphCache.RemoveChannel(
edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
edgeInfo.ChannelID,
)
}
// We'll also remove the entry in the edge update index bucket before
// we delete the edges themselves so we can access their last update
@ -2360,7 +2437,12 @@ func updateEdgePolicy(tx kvdb.RwTx, edge *ChannelEdgePolicy,
)
copy(fromNodePubKey[:], fromNode)
copy(toNodePubKey[:], toNode)
graphCache.UpdatePolicy(edge, fromNodePubKey, toNodePubKey, isUpdate1)
if graphCache != nil {
graphCache.UpdatePolicy(
edge, fromNodePubKey, toNodePubKey, isUpdate1,
)
}
return isUpdate1, nil
}
@ -3629,7 +3711,9 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
"bucket: %w", err)
}
c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
if c.graphCache != nil {
c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
}
return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
})
@ -3691,10 +3775,13 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
if err != nil {
return err
}
for _, edgeInfo := range edgeInfos {
c.graphCache.AddChannel(
edgeInfo.Info, edgeInfo.Policy1, edgeInfo.Policy2,
)
if c.graphCache != nil {
for _, edgeInfo := range edgeInfos {
c.graphCache.AddChannel(
edgeInfo.Info, edgeInfo.Policy1,
edgeInfo.Policy2,
)
}
}
return nil