mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-25 13:12:11 +02:00
graph/db: batch collect node feature data
In this commit, we update the SQLStore's ForEachNodeCacheable and ForEachNodeCached methods to use batch collection for node feature bits. This results in the following performance gains: ``` name old time/op new time/op delta ForEachNodeCacheable-native-sqlite-10 184ms ± 2% 145ms ±10% -21.45% (p=0.000 n=10+10) ForEachNodeCacheable-native-postgres-10 697ms ± 8% 51ms ± 4% -92.68% (p=0.000 n=9+10) ```
This commit is contained in:
@@ -903,8 +903,6 @@ func (s *SQLStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
|
||||
// graph, executing the passed callback with each node encountered. If the
|
||||
// callback returns an error, then the transaction is aborted and the iteration
|
||||
// stops early.
|
||||
//
|
||||
// NOTE: This is a part of the V1Store interface.
|
||||
func (s *SQLStore) ForEachNodeCacheable(ctx context.Context,
|
||||
cb func(route.Vertex, *lnwire.FeatureVector) error,
|
||||
reset func()) error {
|
||||
@@ -912,14 +910,8 @@ func (s *SQLStore) ForEachNodeCacheable(ctx context.Context,
|
||||
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
||||
return forEachNodeCacheable(
|
||||
ctx, s.cfg.QueryCfg, db,
|
||||
func(nodeID int64, nodePub route.Vertex) error {
|
||||
features, err := getNodeFeatures(
|
||||
ctx, db, nodeID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch "+
|
||||
"node features: %w", err)
|
||||
}
|
||||
func(_ int64, nodePub route.Vertex,
|
||||
features *lnwire.FeatureVector) error {
|
||||
|
||||
return cb(nodePub, features)
|
||||
},
|
||||
@@ -1094,13 +1086,7 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context,
|
||||
reset func()) error {
|
||||
|
||||
handleNode := func(db SQLQueries, nodeID int64,
|
||||
nodePub route.Vertex) error {
|
||||
|
||||
features, err := getNodeFeatures(ctx, db, nodeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch node(id=%d) "+
|
||||
"features: %w", nodeID, err)
|
||||
}
|
||||
nodePub route.Vertex, features *lnwire.FeatureVector) error {
|
||||
|
||||
toNodeCallback := func() route.Vertex {
|
||||
return nodePub
|
||||
@@ -1200,8 +1186,10 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context,
|
||||
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
||||
return forEachNodeCacheable(
|
||||
ctx, s.cfg.QueryCfg, db,
|
||||
func(nodeID int64, nodePub route.Vertex) error {
|
||||
return handleNode(db, nodeID, nodePub)
|
||||
func(nodeID int64, nodePub route.Vertex,
|
||||
features *lnwire.FeatureVector) error {
|
||||
|
||||
return handleNode(db, nodeID, nodePub, features)
|
||||
},
|
||||
)
|
||||
}, reset)
|
||||
@@ -2951,18 +2939,27 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries,
|
||||
}
|
||||
|
||||
// forEachNodeCacheable fetches all V1 node IDs and pub keys from the database,
|
||||
// and executes the provided callback for each node.
|
||||
// and executes the provided callback for each node. It does so via pagination
|
||||
// along with batch loading of the node feature bits.
|
||||
func forEachNodeCacheable(ctx context.Context, cfg *sqldb.QueryConfig,
|
||||
db SQLQueries,
|
||||
cb func(nodeID int64, nodePub route.Vertex) error) error {
|
||||
db SQLQueries, processNode func(nodeID int64, nodePub route.Vertex,
|
||||
features *lnwire.FeatureVector) error) error {
|
||||
|
||||
handleNode := func(_ context.Context,
|
||||
node sqlc.ListNodeIDsAndPubKeysRow) error {
|
||||
dbNode sqlc.ListNodeIDsAndPubKeysRow,
|
||||
featureBits map[int64][]int) error {
|
||||
|
||||
fv := lnwire.EmptyFeatureVector()
|
||||
if features, exists := featureBits[dbNode.ID]; exists {
|
||||
for _, bit := range features {
|
||||
fv.Set(lnwire.FeatureBit(bit))
|
||||
}
|
||||
}
|
||||
|
||||
var pub route.Vertex
|
||||
copy(pub[:], node.PubKey)
|
||||
copy(pub[:], dbNode.PubKey)
|
||||
|
||||
return cb(node.ID, pub)
|
||||
return processNode(dbNode.ID, pub, fv)
|
||||
}
|
||||
|
||||
queryFunc := func(ctx context.Context, lastID int64,
|
||||
@@ -2981,8 +2978,19 @@ func forEachNodeCacheable(ctx context.Context, cfg *sqldb.QueryConfig,
|
||||
return row.ID
|
||||
}
|
||||
|
||||
return sqldb.ExecutePaginatedQuery(
|
||||
ctx, cfg, int64(-1), queryFunc, extractCursor, handleNode,
|
||||
collectFunc := func(node sqlc.ListNodeIDsAndPubKeysRow) (int64, error) {
|
||||
return node.ID, nil
|
||||
}
|
||||
|
||||
batchQueryFunc := func(ctx context.Context,
|
||||
nodeIDs []int64) (map[int64][]int, error) {
|
||||
|
||||
return batchLoadNodeFeaturesHelper(ctx, cfg, db, nodeIDs)
|
||||
}
|
||||
|
||||
return sqldb.ExecuteCollectAndBatchWithSharedDataQuery(
|
||||
ctx, cfg, int64(-1), queryFunc, extractCursor, collectFunc,
|
||||
batchQueryFunc, handleNode,
|
||||
)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user