From ada349dcf29e722ce1301ea1925b881186d80335 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 1 Aug 2025 14:35:03 +0200 Subject: [PATCH] graph/db: batch collect node feature data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we update the SQLStore's ForEachNodeCacheable and ForEachNodeCached methods to use batch collection for node feature bits. This results in the following performance gains: ``` name old time/op new time/op delta ForEachNodeCacheable-native-sqlite-10 184ms ± 2% 145ms ±10% -21.45% (p=0.000 n=10+10) ForEachNodeCacheable-native-postgres-10 697ms ± 8% 51ms ± 4% -92.68% (p=0.000 n=9+10) ``` --- graph/db/sql_store.go | 62 ++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 02a3f6372..770481217 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -903,8 +903,6 @@ func (s *SQLStore) ForEachNodeDirectedChannel(nodePub route.Vertex, // graph, executing the passed callback with each node encountered. If the // callback returns an error, then the transaction is aborted and the iteration // stops early. -// -// NOTE: This is a part of the V1Store interface. func (s *SQLStore) ForEachNodeCacheable(ctx context.Context, cb func(route.Vertex, *lnwire.FeatureVector) error, reset func()) error { @@ -912,14 +910,8 @@ func (s *SQLStore) ForEachNodeCacheable(ctx context.Context, err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { return forEachNodeCacheable( ctx, s.cfg.QueryCfg, db, - func(nodeID int64, nodePub route.Vertex) error { - features, err := getNodeFeatures( - ctx, db, nodeID, - ) - if err != nil { - return fmt.Errorf("unable to fetch "+ - "node features: %w", err) - } + func(_ int64, nodePub route.Vertex, + features *lnwire.FeatureVector) error { return cb(nodePub, features) }, @@ -1094,13 +1086,7 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context, reset func()) error { handleNode := func(db SQLQueries, nodeID int64, - nodePub route.Vertex) error { - - features, err := getNodeFeatures(ctx, db, nodeID) - if err != nil { - return fmt.Errorf("unable to fetch node(id=%d) "+ - "features: %w", nodeID, err) - } + nodePub route.Vertex, features *lnwire.FeatureVector) error { toNodeCallback := func() route.Vertex { return nodePub @@ -1200,8 +1186,10 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context, return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { return forEachNodeCacheable( ctx, s.cfg.QueryCfg, db, - func(nodeID int64, nodePub route.Vertex) error { - return handleNode(db, nodeID, nodePub) + func(nodeID int64, nodePub route.Vertex, + features *lnwire.FeatureVector) error { + + return handleNode(db, nodeID, nodePub, features) }, ) }, reset) @@ -2951,18 +2939,27 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries, } // forEachNodeCacheable fetches all V1 node IDs and pub keys from the database, -// and executes the provided callback for each node. +// and executes the provided callback for each node. It does so via pagination +// along with batch loading of the node feature bits. func forEachNodeCacheable(ctx context.Context, cfg *sqldb.QueryConfig, - db SQLQueries, - cb func(nodeID int64, nodePub route.Vertex) error) error { + db SQLQueries, processNode func(nodeID int64, nodePub route.Vertex, + features *lnwire.FeatureVector) error) error { handleNode := func(_ context.Context, - node sqlc.ListNodeIDsAndPubKeysRow) error { + dbNode sqlc.ListNodeIDsAndPubKeysRow, + featureBits map[int64][]int) error { + + fv := lnwire.EmptyFeatureVector() + if features, exists := featureBits[dbNode.ID]; exists { + for _, bit := range features { + fv.Set(lnwire.FeatureBit(bit)) + } + } var pub route.Vertex - copy(pub[:], node.PubKey) + copy(pub[:], dbNode.PubKey) - return cb(node.ID, pub) + return processNode(dbNode.ID, pub, fv) } queryFunc := func(ctx context.Context, lastID int64, @@ -2981,8 +2978,19 @@ func forEachNodeCacheable(ctx context.Context, cfg *sqldb.QueryConfig, return row.ID } - return sqldb.ExecutePaginatedQuery( - ctx, cfg, int64(-1), queryFunc, extractCursor, handleNode, + collectFunc := func(node sqlc.ListNodeIDsAndPubKeysRow) (int64, error) { + return node.ID, nil + } + + batchQueryFunc := func(ctx context.Context, + nodeIDs []int64) (map[int64][]int, error) { + + return batchLoadNodeFeaturesHelper(ctx, cfg, db, nodeIDs) + } + + return sqldb.ExecuteCollectAndBatchWithSharedDataQuery( + ctx, cfg, int64(-1), queryFunc, extractCursor, collectFunc, + batchQueryFunc, handleNode, ) }