diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index e70492d6f..2f9262958 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -257,6 +257,11 @@ The underlying functionality between those two options remain the same. * Log rotation can now use ZSTD +* [Remove redundant + iteration](https://github.com/lightningnetwork/lnd/pull/9496) over a node's + persisted channels when updating the graph cache with a new node or node + update. + ## Deprecations ### ⚠️ **Warning:** The following RPCs will be removed in release version **0.21**: diff --git a/graph/db/graph.go b/graph/db/graph.go index 326c66ab0..6759e18b8 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -231,13 +231,13 @@ func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph, log.Debugf("Populating in-memory channel graph, this might " + "take a while...") - err := g.ForEachNodeCacheable( - func(tx kvdb.RTx, node GraphCacheNode) error { - g.graphCache.AddNodeFeatures(node) + err := g.ForEachNodeCacheable(func(node route.Vertex, + features *lnwire.FeatureVector) error { - return nil - }, - ) + g.graphCache.AddNodeFeatures(node, features) + + return nil + }) if err != nil { return nil, err } @@ -772,8 +772,8 @@ func (c *ChannelGraph) forEachNode( // graph, executing the passed callback with each node encountered. If the // callback returns an error, then the transaction is aborted and the iteration // stops early. -func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx, - GraphCacheNode) error) error { +func (c *ChannelGraph) ForEachNodeCacheable(cb func(route.Vertex, + *lnwire.FeatureVector) error) error { traversal := func(tx kvdb.RTx) error { // First grab the nodes bucket which stores the mapping from @@ -792,7 +792,7 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx, } nodeReader := bytes.NewReader(nodeBytes) - cacheableNode, err := deserializeLightningNodeCacheable( + node, features, err := deserializeLightningNodeCacheable( //nolint:ll nodeReader, ) if err != nil { @@ -801,7 +801,7 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx, // Execute the callback, the transaction will abort if // this returns an error. - return cb(tx, cacheableNode) + return cb(node, features) }) } @@ -901,10 +901,9 @@ func (c *ChannelGraph) AddLightningNode(node *models.LightningNode, r := &batch.Request{ Update: func(tx kvdb.RwTx) error { if c.graphCache != nil { - cNode := newGraphCacheNode( + c.graphCache.AddNodeFeatures( node.PubKeyBytes, node.Features, ) - c.graphCache.AddNodeFeatures(cNode) } return addLightningNode(tx, node) @@ -3056,50 +3055,6 @@ func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx, return node, nil } -// graphCacheNode is a struct that wraps a LightningNode in a way that it can be -// cached in the graph cache. -type graphCacheNode struct { - pubKeyBytes route.Vertex - features *lnwire.FeatureVector -} - -// newGraphCacheNode returns a new cache optimized node. -func newGraphCacheNode(pubKey route.Vertex, - features *lnwire.FeatureVector) *graphCacheNode { - - return &graphCacheNode{ - pubKeyBytes: pubKey, - features: features, - } -} - -// PubKey returns the node's public identity key. -func (n *graphCacheNode) PubKey() route.Vertex { - return n.pubKeyBytes -} - -// Features returns the node's features. -func (n *graphCacheNode) Features() *lnwire.FeatureVector { - return n.features -} - -// ForEachChannel iterates through all channels of this node, executing the -// passed callback with an edge info structure and the policies of each end -// of the channel. The first edge policy is the outgoing edge *to* the -// connecting node, while the second is the incoming edge *from* the -// connecting node. If the callback returns an error, then the iteration is -// halted with the error propagated back up to the caller. -// -// Unknown policies are passed into the callback as nil values. -func (n *graphCacheNode) ForEachChannel(tx kvdb.RTx, - cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error { - - return nodeTraversal(tx, n.pubKeyBytes[:], nil, cb) -} - -var _ GraphCacheNode = (*graphCacheNode)(nil) - // HasLightningNode determines if the graph has a vertex identified by the // target node identity public key. If the node exists in the database, a // timestamp of when the data for the node was lasted updated is returned along @@ -4059,60 +4014,59 @@ func fetchLightningNode(nodeBucket kvdb.RBucket, return deserializeLightningNode(nodeReader) } -func deserializeLightningNodeCacheable(r io.Reader) (*graphCacheNode, error) { - // Always populate a feature vector, even if we don't have a node - // announcement and short circuit below. - node := newGraphCacheNode( - route.Vertex{}, - lnwire.EmptyFeatureVector(), - ) +func deserializeLightningNodeCacheable(r io.Reader) (route.Vertex, + *lnwire.FeatureVector, error) { - var nodeScratch [8]byte + var ( + pubKey route.Vertex + features = lnwire.EmptyFeatureVector() + nodeScratch [8]byte + ) // Skip ahead: // - LastUpdate (8 bytes) if _, err := r.Read(nodeScratch[:]); err != nil { - return nil, err + return pubKey, nil, err } - if _, err := io.ReadFull(r, node.pubKeyBytes[:]); err != nil { - return nil, err + if _, err := io.ReadFull(r, pubKey[:]); err != nil { + return pubKey, nil, err } // Read the node announcement flag. if _, err := r.Read(nodeScratch[:2]); err != nil { - return nil, err + return pubKey, nil, err } hasNodeAnn := byteOrder.Uint16(nodeScratch[:2]) // The rest of the data is optional, and will only be there if we got a // node announcement for this node. if hasNodeAnn == 0 { - return node, nil + return pubKey, features, nil } // We did get a node announcement for this node, so we'll have the rest // of the data available. var rgb uint8 if err := binary.Read(r, byteOrder, &rgb); err != nil { - return nil, err + return pubKey, nil, err } if err := binary.Read(r, byteOrder, &rgb); err != nil { - return nil, err + return pubKey, nil, err } if err := binary.Read(r, byteOrder, &rgb); err != nil { - return nil, err + return pubKey, nil, err } if _, err := wire.ReadVarString(r, 0); err != nil { - return nil, err + return pubKey, nil, err } - if err := node.features.Decode(r); err != nil { - return nil, err + if err := features.Decode(r); err != nil { + return pubKey, nil, err } - return node, nil + return pubKey, features, nil } func deserializeLightningNode(r io.Reader) (models.LightningNode, error) { diff --git a/graph/db/graph_cache.go b/graph/db/graph_cache.go index 67f34e86e..cefa88a85 100644 --- a/graph/db/graph_cache.go +++ b/graph/db/graph_cache.go @@ -6,33 +6,10 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/lightningnetwork/lnd/graph/db/models" - "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) -// GraphCacheNode is an interface for all the information the cache needs to know -// about a lightning node. -type GraphCacheNode interface { - // PubKey is the node's public identity key. - PubKey() route.Vertex - - // Features returns the node's p2p features. - Features() *lnwire.FeatureVector - - // ForEachChannel iterates through all channels of a given node, - // executing the passed callback with an edge info structure and the - // policies of each end of the channel. The first edge policy is the - // outgoing edge *to* the connecting node, while the second is the - // incoming edge *from* the connecting node. If the callback returns an - // error, then the iteration is halted with the error propagated back up - // to the caller. - ForEachChannel(kvdb.RTx, - func(kvdb.RTx, *models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error -} - // DirectedChannel is a type that stores the channel information as seen from // one side of the channel. type DirectedChannel struct { @@ -124,16 +101,13 @@ func (c *GraphCache) Stats() string { } // AddNodeFeatures adds a graph node and its features to the cache. -func (c *GraphCache) AddNodeFeatures(node GraphCacheNode) { - nodePubKey := node.PubKey() +func (c *GraphCache) AddNodeFeatures(node route.Vertex, + features *lnwire.FeatureVector) { - // Only hold the lock for a short time. The `ForEachChannel()` below is - // possibly slow as it has to go to the backend, so we can unlock - // between the calls. And the AddChannel() method will acquire its own - // lock anyway. c.mtx.Lock() - c.nodeFeatures[nodePubKey] = node.Features() - c.mtx.Unlock() + defer c.mtx.Unlock() + + c.nodeFeatures[node] = features } // AddChannel adds a non-directed channel, meaning that the order of policy 1 diff --git a/graph/db/graph_cache_test.go b/graph/db/graph_cache_test.go index 35048e7cd..087fb81ac 100644 --- a/graph/db/graph_cache_test.go +++ b/graph/db/graph_cache_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/lightningnetwork/lnd/graph/db/models" - "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" "github.com/stretchr/testify/require" @@ -25,39 +24,6 @@ var ( pubKey2, _ = route.NewVertexFromBytes(pubKey2Bytes) ) -type node struct { - pubKey route.Vertex - features *lnwire.FeatureVector - - edgeInfos []*models.ChannelEdgeInfo - outPolicies []*models.ChannelEdgePolicy - inPolicies []*models.ChannelEdgePolicy -} - -func (n *node) PubKey() route.Vertex { - return n.pubKey -} -func (n *node) Features() *lnwire.FeatureVector { - return n.features -} - -func (n *node) ForEachChannel(tx kvdb.RTx, - cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error) error { - - for idx := range n.edgeInfos { - err := cb( - tx, n.edgeInfos[idx], n.outPolicies[idx], - n.inPolicies[idx], - ) - if err != nil { - return err - } - } - - return nil -} - // TestGraphCacheAddNode tests that a channel going from node A to node B can be // cached correctly, independent of the direction we add the channel as. func TestGraphCacheAddNode(t *testing.T) { @@ -85,12 +51,8 @@ func TestGraphCacheAddNode(t *testing.T) { ChannelFlags: lnwire.ChanUpdateChanFlags(channelFlagB), ToNode: nodeA, } - node := &node{ - pubKey: nodeA, - features: lnwire.EmptyFeatureVector(), - } cache := NewGraphCache(10) - cache.AddNodeFeatures(node) + cache.AddNodeFeatures(nodeA, lnwire.EmptyFeatureVector()) cache.AddChannel(&models.ChannelEdgeInfo{ ChannelID: 1000, // Those are direction independent! diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 3b3454b6b..eb658e2f1 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -1105,24 +1105,22 @@ func TestGraphTraversalCacheable(t *testing.T) { // Iterate through all the known channels within the graph DB by // iterating over each node, once again if the map is empty that // indicates that all edges have properly been reached. - var nodes []GraphCacheNode - err = graph.ForEachNodeCacheable( - func(tx kvdb.RTx, node GraphCacheNode) error { - delete(nodeMap, node.PubKey()) + var nodes []route.Vertex + err = graph.ForEachNodeCacheable(func(node route.Vertex, + features *lnwire.FeatureVector) error { - nodes = append(nodes, node) + delete(nodeMap, node) + nodes = append(nodes, node) - return nil - }, - ) + return nil + }) require.NoError(t, err) require.Len(t, nodeMap, 0) err = graph.db.View(func(tx kvdb.RTx) error { for _, node := range nodes { - err := node.ForEachChannel( - tx, func(tx kvdb.RTx, - info *models.ChannelEdgeInfo, + err := graph.ForEachNodeChannelTx(tx, node, + func(tx kvdb.RTx, info *models.ChannelEdgeInfo, policy *models.ChannelEdgePolicy, policy2 *models.ChannelEdgePolicy) error { //nolint:ll @@ -3883,44 +3881,37 @@ func BenchmarkForEachChannel(b *testing.B) { maxHTLCs lnwire.MilliSatoshi ) - var nodes []GraphCacheNode - err = graph.ForEachNodeCacheable( - func(tx kvdb.RTx, node GraphCacheNode) error { - nodes = append(nodes, node) + var nodes []route.Vertex + err = graph.ForEachNodeCacheable(func(node route.Vertex, + vector *lnwire.FeatureVector) error { - return nil - }, - ) - require.NoError(b, err) - - err = graph.db.View(func(tx kvdb.RTx) error { - for _, n := range nodes { - cb := func(tx kvdb.RTx, - info *models.ChannelEdgeInfo, - policy *models.ChannelEdgePolicy, - policy2 *models.ChannelEdgePolicy) error { //nolint:ll - - // We need to do something with - // the data here, otherwise the - // compiler is going to optimize - // this away, and we get bogus - // results. - totalCapacity += info.Capacity - maxHTLCs += policy.MaxHTLC - maxHTLCs += policy2.MaxHTLC - - return nil - } - - err := n.ForEachChannel(tx, cb) - if err != nil { - return err - } - } + nodes = append(nodes, node) return nil - }, func() {}) + }) require.NoError(b, err) + + for _, n := range nodes { + cb := func(tx kvdb.RTx, + info *models.ChannelEdgeInfo, + policy *models.ChannelEdgePolicy, + policy2 *models.ChannelEdgePolicy) error { //nolint:ll + + // We need to do something with + // the data here, otherwise the + // compiler is going to optimize + // this away, and we get bogus + // results. + totalCapacity += info.Capacity + maxHTLCs += policy.MaxHTLC + maxHTLCs += policy2.MaxHTLC + + return nil + } + + err := graph.ForEachNodeChannel(n, cb) + require.NoError(b, err) + } } }