diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 38485f6c1..55b4b2450 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -553,47 +553,100 @@ func (s *SQLStore) SetSourceNode(ctx context.Context, // announcements. // // NOTE: This is part of the V1Store interface. -func (s *SQLStore) NodeUpdatesInHorizon(startTime, - endTime time.Time, +func (s *SQLStore) NodeUpdatesInHorizon(startTime, endTime time.Time, opts ...IteratorOption) (iter.Seq[models.Node], error) { - ctx := context.TODO() - - var nodes []models.Node - err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - dbNodes, err := db.GetNodesByLastUpdateRange( - ctx, sqlc.GetNodesByLastUpdateRangeParams{ - StartTime: sqldb.SQLInt64(startTime.Unix()), - EndTime: sqldb.SQLInt64(endTime.Unix()), - }, - ) - if err != nil { - return fmt.Errorf("unable to fetch nodes: %w", err) - } - - err = forEachNodeInBatch( - ctx, s.cfg.QueryCfg, db, dbNodes, - func(_ int64, node *models.Node) error { - nodes = append(nodes, *node) - - return nil - }, - ) - if err != nil { - return fmt.Errorf("unable to build nodes: %w", err) - } - - return nil - }, sqldb.NoOpReset) - if err != nil { - return nil, fmt.Errorf("unable to fetch nodes: %w", err) + cfg := defaultIteratorConfig() + for _, opt := range opts { + opt(cfg) } return func(yield func(models.Node) bool) { - for _, node := range nodes { - if !yield(node) { + var ( + ctx = context.TODO() + lastUpdateTime sql.NullInt64 + lastPubKey = make([]byte, 33) + hasMore = true + ) + + // Each iteration, we'll read a batch amount of nodes, yield + // them, then decide is we have more or not. + for hasMore { + var batch []models.Node + + //nolint:ll + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + //nolint:ll + params := sqlc.GetNodesByLastUpdateRangeParams{ + StartTime: sqldb.SQLInt64( + startTime.Unix(), + ), + EndTime: sqldb.SQLInt64( + endTime.Unix(), + ), + LastUpdate: lastUpdateTime, + LastPubKey: lastPubKey, + OnlyPublic: sql.NullBool{ + Bool: cfg.iterPublicNodes, + Valid: true, + }, + MaxResults: sqldb.SQLInt32( + cfg.nodeUpdateIterBatchSize, + ), + } + rows, err := db.GetNodesByLastUpdateRange( + ctx, params, + ) + if err != nil { + return err + } + + hasMore = len(rows) == cfg.nodeUpdateIterBatchSize + + err = forEachNodeInBatch( + ctx, s.cfg.QueryCfg, db, rows, + func(_ int64, node *models.Node) error { + batch = append(batch, *node) + + // Update pagination cursors + // based on the last processed + // node. + lastUpdateTime = sql.NullInt64{ + Int64: node.LastUpdate. + Unix(), + Valid: true, + } + lastPubKey = node.PubKeyBytes[:] + + return nil + }, + ) + if err != nil { + return fmt.Errorf("unable to build "+ + "nodes: %w", err) + } + + return nil + }, func() { + batch = []models.Node{} + }) + + if err != nil { + log.Errorf("NodeUpdatesInHorizon batch "+ + "error: %v", err) return } + + for _, node := range batch { + if !yield(node) { + return + } + } + + // If the batch didn't yield anything, then we're done. + if len(batch) == 0 { + break + } } }, nil } diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 4c2be19ea..8c3f641ed 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -1969,16 +1969,55 @@ const getNodesByLastUpdateRange = `-- name: GetNodesByLastUpdateRange :many SELECT id, version, pub_key, alias, last_update, color, signature FROM graph_nodes WHERE last_update >= $1 - AND last_update < $2 + AND last_update <= $2 + -- Pagination: We use (last_update, pub_key) as a compound cursor. + -- This ensures stable ordering and allows us to resume from where we left off. + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + -- Include rows with last_update greater than cursor (or all rows if cursor is -1) + last_update > COALESCE($3, -1) + OR + -- For rows with same last_update, use pub_key as tiebreaker + (last_update = COALESCE($3, -1) + AND pub_key > $4) + ) + -- Optional filter for public nodes only + AND ( + -- If only_public is false or not provided, include all nodes + COALESCE($5, FALSE) IS FALSE + OR + -- For V1 protocol, a node is public if it has at least one public channel. + -- A public channel has bitcoin_1_signature set (channel announcement received). + EXISTS ( + SELECT 1 + FROM graph_channels c + WHERE c.version = 1 + AND c.bitcoin_1_signature IS NOT NULL + AND (c.node_id_1 = graph_nodes.id OR c.node_id_2 = graph_nodes.id) + ) + ) +ORDER BY last_update ASC, pub_key ASC +LIMIT COALESCE($6, 999999999) ` type GetNodesByLastUpdateRangeParams struct { - StartTime sql.NullInt64 - EndTime sql.NullInt64 + StartTime sql.NullInt64 + EndTime sql.NullInt64 + LastUpdate sql.NullInt64 + LastPubKey []byte + OnlyPublic interface{} + MaxResults interface{} } func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]GraphNode, error) { - rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, arg.StartTime, arg.EndTime) + rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, + arg.StartTime, + arg.EndTime, + arg.LastUpdate, + arg.LastPubKey, + arg.OnlyPublic, + arg.MaxResults, + ) if err != nil { return nil, err } diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index f2224c00f..0621bf9f8 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -164,7 +164,35 @@ ORDER BY node_id, type, position; SELECT * FROM graph_nodes WHERE last_update >= @start_time - AND last_update < @end_time; + AND last_update <= @end_time + -- Pagination: We use (last_update, pub_key) as a compound cursor. + -- This ensures stable ordering and allows us to resume from where we left off. + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + -- Include rows with last_update greater than cursor (or all rows if cursor is -1) + last_update > COALESCE(sqlc.narg('last_update'), -1) + OR + -- For rows with same last_update, use pub_key as tiebreaker + (last_update = COALESCE(sqlc.narg('last_update'), -1) + AND pub_key > sqlc.narg('last_pub_key')) + ) + -- Optional filter for public nodes only + AND ( + -- If only_public is false or not provided, include all nodes + COALESCE(sqlc.narg('only_public'), FALSE) IS FALSE + OR + -- For V1 protocol, a node is public if it has at least one public channel. + -- A public channel has bitcoin_1_signature set (channel announcement received). + EXISTS ( + SELECT 1 + FROM graph_channels c + WHERE c.version = 1 + AND c.bitcoin_1_signature IS NOT NULL + AND (c.node_id_1 = graph_nodes.id OR c.node_id_2 = graph_nodes.id) + ) + ) +ORDER BY last_update ASC, pub_key ASC +LIMIT COALESCE(sqlc.narg('max_results'), 999999999); -- name: DeleteNodeAddresses :exec DELETE FROM graph_node_addresses