sqldb: implement iterator support for ChanUpdatesInHorizon

In this commit, we update the SQL store implementation to support the
new iterator-based API for ChanUpdatesInHorizon. This includes adding
SQL query pagination support and helper functions for efficient batch
processing.

The SQL implementation uses cursor-based pagination with configurable
batch sizes, allowing efficient iteration over large result sets without
loading everything into memory. The query is optimized to use indexes
effectively and minimize database round trips.

New SQL query GetChannelsByPolicyLastUpdateRange is updated to support:
- Cursor-based pagination using (max_update_time, id) compound cursor
- Configurable batch sizes via MaxResults parameter
- Efficient batch caching with updateChanCacheBatch helper
This commit is contained in:
Olaoluwa Osuntokun
2025-08-04 17:45:19 -07:00
parent c69971c20b
commit 31ab2ae4b6
3 changed files with 284 additions and 105 deletions

View File

@@ -1143,18 +1143,39 @@ WHERE c.version = $1
OR
(cp2.last_update >= $2 AND cp2.last_update < $3)
)
-- Pagination using compound cursor (max_update_time, id).
-- We use COALESCE with -1 as sentinel since timestamps are always positive.
AND (
(CASE
WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0)
THEN COALESCE(cp1.last_update, 0)
ELSE COALESCE(cp2.last_update, 0)
END > COALESCE($4, -1))
OR
(CASE
WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0)
THEN COALESCE(cp1.last_update, 0)
ELSE COALESCE(cp2.last_update, 0)
END = COALESCE($4, -1)
AND c.id > COALESCE($5, -1))
)
ORDER BY
CASE
WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0)
THEN COALESCE(cp1.last_update, 0)
ELSE COALESCE(cp2.last_update, 0)
END ASC
END ASC,
c.id ASC
LIMIT COALESCE($6, 999999999)
`
type GetChannelsByPolicyLastUpdateRangeParams struct {
Version int16
StartTime sql.NullInt64
EndTime sql.NullInt64
Version int16
StartTime sql.NullInt64
EndTime sql.NullInt64
LastUpdateTime sql.NullInt64
LastID sql.NullInt64
MaxResults interface{}
}
type GetChannelsByPolicyLastUpdateRangeRow struct {
@@ -1194,7 +1215,14 @@ type GetChannelsByPolicyLastUpdateRangeRow struct {
}
func (q *Queries) GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error) {
rows, err := q.db.QueryContext(ctx, getChannelsByPolicyLastUpdateRange, arg.Version, arg.StartTime, arg.EndTime)
rows, err := q.db.QueryContext(ctx, getChannelsByPolicyLastUpdateRange,
arg.Version,
arg.StartTime,
arg.EndTime,
arg.LastUpdateTime,
arg.LastID,
arg.MaxResults,
)
if err != nil {
return nil, err
}

View File

@@ -472,12 +472,30 @@ WHERE c.version = @version
OR
(cp2.last_update >= @start_time AND cp2.last_update < @end_time)
)
-- Pagination using compound cursor (max_update_time, id).
-- We use COALESCE with -1 as sentinel since timestamps are always positive.
AND (
(CASE
WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0)
THEN COALESCE(cp1.last_update, 0)
ELSE COALESCE(cp2.last_update, 0)
END > COALESCE(sqlc.narg('last_update_time'), -1))
OR
(CASE
WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0)
THEN COALESCE(cp1.last_update, 0)
ELSE COALESCE(cp2.last_update, 0)
END = COALESCE(sqlc.narg('last_update_time'), -1)
AND c.id > COALESCE(sqlc.narg('last_id'), -1))
)
ORDER BY
CASE
WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0)
THEN COALESCE(cp1.last_update, 0)
ELSE COALESCE(cp2.last_update, 0)
END ASC;
END ASC,
c.id ASC
LIMIT COALESCE(sqlc.narg('max_results'), 999999999);
-- name: GetChannelByOutpointWithPolicies :one
SELECT