From c69971c20b63a613adeb6151ed4f9ce619d6974e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 17:36:45 -0700 Subject: [PATCH] sqldb: implement iterator support for NodeUpdatesInHorizon In this commit, we update the SQL store implementation to support the new iterator-based API for NodeUpdatesInHorizon. This includes adding a new SQL query that supports efficient pagination through result sets. The SQL implementation uses cursor-based pagination with configurable batch sizes, allowing efficient iteration over large result sets without loading everything into memory. The query is optimized to use indexes effectively and minimize database round trips. New SQL query GetNodesByLastUpdateRange is updated to support: * Cursor-based pagination using (last_update, pub_key) compound cursor * Optional filtering for public nodes only * Configurable batch sizes via MaxResults parameter --- graph/db/sql_store.go | 121 +++++++++++++++++++++++++---------- sqldb/sqlc/graph.sql.go | 47 ++++++++++++-- sqldb/sqlc/queries/graph.sql | 30 ++++++++- 3 files changed, 159 insertions(+), 39 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 38485f6c1..55b4b2450 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -553,47 +553,100 @@ func (s *SQLStore) SetSourceNode(ctx context.Context, // announcements. // // NOTE: This is part of the V1Store interface. -func (s *SQLStore) NodeUpdatesInHorizon(startTime, - endTime time.Time, +func (s *SQLStore) NodeUpdatesInHorizon(startTime, endTime time.Time, opts ...IteratorOption) (iter.Seq[models.Node], error) { - ctx := context.TODO() - - var nodes []models.Node - err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - dbNodes, err := db.GetNodesByLastUpdateRange( - ctx, sqlc.GetNodesByLastUpdateRangeParams{ - StartTime: sqldb.SQLInt64(startTime.Unix()), - EndTime: sqldb.SQLInt64(endTime.Unix()), - }, - ) - if err != nil { - return fmt.Errorf("unable to fetch nodes: %w", err) - } - - err = forEachNodeInBatch( - ctx, s.cfg.QueryCfg, db, dbNodes, - func(_ int64, node *models.Node) error { - nodes = append(nodes, *node) - - return nil - }, - ) - if err != nil { - return fmt.Errorf("unable to build nodes: %w", err) - } - - return nil - }, sqldb.NoOpReset) - if err != nil { - return nil, fmt.Errorf("unable to fetch nodes: %w", err) + cfg := defaultIteratorConfig() + for _, opt := range opts { + opt(cfg) } return func(yield func(models.Node) bool) { - for _, node := range nodes { - if !yield(node) { + var ( + ctx = context.TODO() + lastUpdateTime sql.NullInt64 + lastPubKey = make([]byte, 33) + hasMore = true + ) + + // Each iteration, we'll read a batch amount of nodes, yield + // them, then decide is we have more or not. + for hasMore { + var batch []models.Node + + //nolint:ll + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + //nolint:ll + params := sqlc.GetNodesByLastUpdateRangeParams{ + StartTime: sqldb.SQLInt64( + startTime.Unix(), + ), + EndTime: sqldb.SQLInt64( + endTime.Unix(), + ), + LastUpdate: lastUpdateTime, + LastPubKey: lastPubKey, + OnlyPublic: sql.NullBool{ + Bool: cfg.iterPublicNodes, + Valid: true, + }, + MaxResults: sqldb.SQLInt32( + cfg.nodeUpdateIterBatchSize, + ), + } + rows, err := db.GetNodesByLastUpdateRange( + ctx, params, + ) + if err != nil { + return err + } + + hasMore = len(rows) == cfg.nodeUpdateIterBatchSize + + err = forEachNodeInBatch( + ctx, s.cfg.QueryCfg, db, rows, + func(_ int64, node *models.Node) error { + batch = append(batch, *node) + + // Update pagination cursors + // based on the last processed + // node. + lastUpdateTime = sql.NullInt64{ + Int64: node.LastUpdate. + Unix(), + Valid: true, + } + lastPubKey = node.PubKeyBytes[:] + + return nil + }, + ) + if err != nil { + return fmt.Errorf("unable to build "+ + "nodes: %w", err) + } + + return nil + }, func() { + batch = []models.Node{} + }) + + if err != nil { + log.Errorf("NodeUpdatesInHorizon batch "+ + "error: %v", err) return } + + for _, node := range batch { + if !yield(node) { + return + } + } + + // If the batch didn't yield anything, then we're done. + if len(batch) == 0 { + break + } } }, nil } diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 4c2be19ea..8c3f641ed 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -1969,16 +1969,55 @@ const getNodesByLastUpdateRange = `-- name: GetNodesByLastUpdateRange :many SELECT id, version, pub_key, alias, last_update, color, signature FROM graph_nodes WHERE last_update >= $1 - AND last_update < $2 + AND last_update <= $2 + -- Pagination: We use (last_update, pub_key) as a compound cursor. + -- This ensures stable ordering and allows us to resume from where we left off. + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + -- Include rows with last_update greater than cursor (or all rows if cursor is -1) + last_update > COALESCE($3, -1) + OR + -- For rows with same last_update, use pub_key as tiebreaker + (last_update = COALESCE($3, -1) + AND pub_key > $4) + ) + -- Optional filter for public nodes only + AND ( + -- If only_public is false or not provided, include all nodes + COALESCE($5, FALSE) IS FALSE + OR + -- For V1 protocol, a node is public if it has at least one public channel. + -- A public channel has bitcoin_1_signature set (channel announcement received). + EXISTS ( + SELECT 1 + FROM graph_channels c + WHERE c.version = 1 + AND c.bitcoin_1_signature IS NOT NULL + AND (c.node_id_1 = graph_nodes.id OR c.node_id_2 = graph_nodes.id) + ) + ) +ORDER BY last_update ASC, pub_key ASC +LIMIT COALESCE($6, 999999999) ` type GetNodesByLastUpdateRangeParams struct { - StartTime sql.NullInt64 - EndTime sql.NullInt64 + StartTime sql.NullInt64 + EndTime sql.NullInt64 + LastUpdate sql.NullInt64 + LastPubKey []byte + OnlyPublic interface{} + MaxResults interface{} } func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]GraphNode, error) { - rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, arg.StartTime, arg.EndTime) + rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, + arg.StartTime, + arg.EndTime, + arg.LastUpdate, + arg.LastPubKey, + arg.OnlyPublic, + arg.MaxResults, + ) if err != nil { return nil, err } diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index f2224c00f..0621bf9f8 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -164,7 +164,35 @@ ORDER BY node_id, type, position; SELECT * FROM graph_nodes WHERE last_update >= @start_time - AND last_update < @end_time; + AND last_update <= @end_time + -- Pagination: We use (last_update, pub_key) as a compound cursor. + -- This ensures stable ordering and allows us to resume from where we left off. + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + -- Include rows with last_update greater than cursor (or all rows if cursor is -1) + last_update > COALESCE(sqlc.narg('last_update'), -1) + OR + -- For rows with same last_update, use pub_key as tiebreaker + (last_update = COALESCE(sqlc.narg('last_update'), -1) + AND pub_key > sqlc.narg('last_pub_key')) + ) + -- Optional filter for public nodes only + AND ( + -- If only_public is false or not provided, include all nodes + COALESCE(sqlc.narg('only_public'), FALSE) IS FALSE + OR + -- For V1 protocol, a node is public if it has at least one public channel. + -- A public channel has bitcoin_1_signature set (channel announcement received). + EXISTS ( + SELECT 1 + FROM graph_channels c + WHERE c.version = 1 + AND c.bitcoin_1_signature IS NOT NULL + AND (c.node_id_1 = graph_nodes.id OR c.node_id_2 = graph_nodes.id) + ) + ) +ORDER BY last_update ASC, pub_key ASC +LIMIT COALESCE(sqlc.narg('max_results'), 999999999); -- name: DeleteNodeAddresses :exec DELETE FROM graph_node_addresses