mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-11-10 14:17:56 +01:00
sqldb: implement iterator support for NodeUpdatesInHorizon
In this commit, we update the SQL store implementation to support the new iterator-based API for NodeUpdatesInHorizon. This includes adding a new SQL query that supports efficient pagination through result sets. The SQL implementation uses cursor-based pagination with configurable batch sizes, allowing efficient iteration over large result sets without loading everything into memory. The query is optimized to use indexes effectively and minimize database round trips. New SQL query GetNodesByLastUpdateRange is updated to support: * Cursor-based pagination using (last_update, pub_key) compound cursor * Optional filtering for public nodes only * Configurable batch sizes via MaxResults parameter
This commit is contained in:
@@ -553,47 +553,100 @@ func (s *SQLStore) SetSourceNode(ctx context.Context,
|
||||
// announcements.
|
||||
//
|
||||
// NOTE: This is part of the V1Store interface.
|
||||
func (s *SQLStore) NodeUpdatesInHorizon(startTime,
|
||||
endTime time.Time,
|
||||
func (s *SQLStore) NodeUpdatesInHorizon(startTime, endTime time.Time,
|
||||
opts ...IteratorOption) (iter.Seq[models.Node], error) {
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
var nodes []models.Node
|
||||
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
||||
dbNodes, err := db.GetNodesByLastUpdateRange(
|
||||
ctx, sqlc.GetNodesByLastUpdateRangeParams{
|
||||
StartTime: sqldb.SQLInt64(startTime.Unix()),
|
||||
EndTime: sqldb.SQLInt64(endTime.Unix()),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch nodes: %w", err)
|
||||
}
|
||||
|
||||
err = forEachNodeInBatch(
|
||||
ctx, s.cfg.QueryCfg, db, dbNodes,
|
||||
func(_ int64, node *models.Node) error {
|
||||
nodes = append(nodes, *node)
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to build nodes: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, sqldb.NoOpReset)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to fetch nodes: %w", err)
|
||||
cfg := defaultIteratorConfig()
|
||||
for _, opt := range opts {
|
||||
opt(cfg)
|
||||
}
|
||||
|
||||
return func(yield func(models.Node) bool) {
|
||||
for _, node := range nodes {
|
||||
if !yield(node) {
|
||||
var (
|
||||
ctx = context.TODO()
|
||||
lastUpdateTime sql.NullInt64
|
||||
lastPubKey = make([]byte, 33)
|
||||
hasMore = true
|
||||
)
|
||||
|
||||
// Each iteration, we'll read a batch amount of nodes, yield
|
||||
// them, then decide is we have more or not.
|
||||
for hasMore {
|
||||
var batch []models.Node
|
||||
|
||||
//nolint:ll
|
||||
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
||||
//nolint:ll
|
||||
params := sqlc.GetNodesByLastUpdateRangeParams{
|
||||
StartTime: sqldb.SQLInt64(
|
||||
startTime.Unix(),
|
||||
),
|
||||
EndTime: sqldb.SQLInt64(
|
||||
endTime.Unix(),
|
||||
),
|
||||
LastUpdate: lastUpdateTime,
|
||||
LastPubKey: lastPubKey,
|
||||
OnlyPublic: sql.NullBool{
|
||||
Bool: cfg.iterPublicNodes,
|
||||
Valid: true,
|
||||
},
|
||||
MaxResults: sqldb.SQLInt32(
|
||||
cfg.nodeUpdateIterBatchSize,
|
||||
),
|
||||
}
|
||||
rows, err := db.GetNodesByLastUpdateRange(
|
||||
ctx, params,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hasMore = len(rows) == cfg.nodeUpdateIterBatchSize
|
||||
|
||||
err = forEachNodeInBatch(
|
||||
ctx, s.cfg.QueryCfg, db, rows,
|
||||
func(_ int64, node *models.Node) error {
|
||||
batch = append(batch, *node)
|
||||
|
||||
// Update pagination cursors
|
||||
// based on the last processed
|
||||
// node.
|
||||
lastUpdateTime = sql.NullInt64{
|
||||
Int64: node.LastUpdate.
|
||||
Unix(),
|
||||
Valid: true,
|
||||
}
|
||||
lastPubKey = node.PubKeyBytes[:]
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to build "+
|
||||
"nodes: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
batch = []models.Node{}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("NodeUpdatesInHorizon batch "+
|
||||
"error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, node := range batch {
|
||||
if !yield(node) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// If the batch didn't yield anything, then we're done.
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -1969,16 +1969,55 @@ const getNodesByLastUpdateRange = `-- name: GetNodesByLastUpdateRange :many
|
||||
SELECT id, version, pub_key, alias, last_update, color, signature
|
||||
FROM graph_nodes
|
||||
WHERE last_update >= $1
|
||||
AND last_update < $2
|
||||
AND last_update <= $2
|
||||
-- Pagination: We use (last_update, pub_key) as a compound cursor.
|
||||
-- This ensures stable ordering and allows us to resume from where we left off.
|
||||
-- We use COALESCE with -1 as sentinel since timestamps are always positive.
|
||||
AND (
|
||||
-- Include rows with last_update greater than cursor (or all rows if cursor is -1)
|
||||
last_update > COALESCE($3, -1)
|
||||
OR
|
||||
-- For rows with same last_update, use pub_key as tiebreaker
|
||||
(last_update = COALESCE($3, -1)
|
||||
AND pub_key > $4)
|
||||
)
|
||||
-- Optional filter for public nodes only
|
||||
AND (
|
||||
-- If only_public is false or not provided, include all nodes
|
||||
COALESCE($5, FALSE) IS FALSE
|
||||
OR
|
||||
-- For V1 protocol, a node is public if it has at least one public channel.
|
||||
-- A public channel has bitcoin_1_signature set (channel announcement received).
|
||||
EXISTS (
|
||||
SELECT 1
|
||||
FROM graph_channels c
|
||||
WHERE c.version = 1
|
||||
AND c.bitcoin_1_signature IS NOT NULL
|
||||
AND (c.node_id_1 = graph_nodes.id OR c.node_id_2 = graph_nodes.id)
|
||||
)
|
||||
)
|
||||
ORDER BY last_update ASC, pub_key ASC
|
||||
LIMIT COALESCE($6, 999999999)
|
||||
`
|
||||
|
||||
type GetNodesByLastUpdateRangeParams struct {
|
||||
StartTime sql.NullInt64
|
||||
EndTime sql.NullInt64
|
||||
StartTime sql.NullInt64
|
||||
EndTime sql.NullInt64
|
||||
LastUpdate sql.NullInt64
|
||||
LastPubKey []byte
|
||||
OnlyPublic interface{}
|
||||
MaxResults interface{}
|
||||
}
|
||||
|
||||
func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]GraphNode, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, arg.StartTime, arg.EndTime)
|
||||
rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange,
|
||||
arg.StartTime,
|
||||
arg.EndTime,
|
||||
arg.LastUpdate,
|
||||
arg.LastPubKey,
|
||||
arg.OnlyPublic,
|
||||
arg.MaxResults,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -164,7 +164,35 @@ ORDER BY node_id, type, position;
|
||||
SELECT *
|
||||
FROM graph_nodes
|
||||
WHERE last_update >= @start_time
|
||||
AND last_update < @end_time;
|
||||
AND last_update <= @end_time
|
||||
-- Pagination: We use (last_update, pub_key) as a compound cursor.
|
||||
-- This ensures stable ordering and allows us to resume from where we left off.
|
||||
-- We use COALESCE with -1 as sentinel since timestamps are always positive.
|
||||
AND (
|
||||
-- Include rows with last_update greater than cursor (or all rows if cursor is -1)
|
||||
last_update > COALESCE(sqlc.narg('last_update'), -1)
|
||||
OR
|
||||
-- For rows with same last_update, use pub_key as tiebreaker
|
||||
(last_update = COALESCE(sqlc.narg('last_update'), -1)
|
||||
AND pub_key > sqlc.narg('last_pub_key'))
|
||||
)
|
||||
-- Optional filter for public nodes only
|
||||
AND (
|
||||
-- If only_public is false or not provided, include all nodes
|
||||
COALESCE(sqlc.narg('only_public'), FALSE) IS FALSE
|
||||
OR
|
||||
-- For V1 protocol, a node is public if it has at least one public channel.
|
||||
-- A public channel has bitcoin_1_signature set (channel announcement received).
|
||||
EXISTS (
|
||||
SELECT 1
|
||||
FROM graph_channels c
|
||||
WHERE c.version = 1
|
||||
AND c.bitcoin_1_signature IS NOT NULL
|
||||
AND (c.node_id_1 = graph_nodes.id OR c.node_id_2 = graph_nodes.id)
|
||||
)
|
||||
)
|
||||
ORDER BY last_update ASC, pub_key ASC
|
||||
LIMIT COALESCE(sqlc.narg('max_results'), 999999999);
|
||||
|
||||
-- name: DeleteNodeAddresses :exec
|
||||
DELETE FROM graph_node_addresses
|
||||
|
||||
Reference in New Issue
Block a user