diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 55b4b2450..2d4d06774 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -981,117 +981,250 @@ func (s *SQLStore) ForEachNodeChannel(ctx context.Context, nodePub route.Vertex, }, reset) } -// ChanUpdatesInHorizon returns all the known channel edges which have at least -// one edge that has an update timestamp within the specified horizon. -// -// NOTE: This is part of the V1Store interface. -func (s *SQLStore) ChanUpdatesInHorizon(startTime, - endTime time.Time, - opts ...IteratorOption) (iter.Seq[ChannelEdge], error) { +// extractMaxUpdateTime returns the maximum of the two policy update times. +// This is used for pagination cursor tracking. +func extractMaxUpdateTime( + row sqlc.GetChannelsByPolicyLastUpdateRangeRow) int64 { + + switch { + case row.Policy1LastUpdate.Valid && row.Policy2LastUpdate.Valid: + return max(row.Policy1LastUpdate.Int64, + row.Policy2LastUpdate.Int64) + case row.Policy1LastUpdate.Valid: + return row.Policy1LastUpdate.Int64 + case row.Policy2LastUpdate.Valid: + return row.Policy2LastUpdate.Int64 + default: + return 0 + } +} + +// buildChannelFromRow constructs a ChannelEdge from a database row. +// This includes building the nodes, channel info, and policies. +func (s *SQLStore) buildChannelFromRow(ctx context.Context, db SQLQueries, + row sqlc.GetChannelsByPolicyLastUpdateRangeRow) (ChannelEdge, error) { + + node1, err := buildNode(ctx, s.cfg.QueryCfg, db, row.GraphNode) + if err != nil { + return ChannelEdge{}, fmt.Errorf("unable to build node1: %w", + err) + } + + node2, err := buildNode(ctx, s.cfg.QueryCfg, db, row.GraphNode_2) + if err != nil { + return ChannelEdge{}, fmt.Errorf("unable to build node2: %w", + err) + } + + channel, err := getAndBuildEdgeInfo( + ctx, s.cfg, db, + row.GraphChannel, node1.PubKeyBytes, + node2.PubKeyBytes, + ) + if err != nil { + return ChannelEdge{}, fmt.Errorf("unable to build "+ + "channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return ChannelEdge{}, fmt.Errorf("unable to extract "+ + "channel policies: %w", err) + } + + p1, p2, err := getAndBuildChanPolicies( + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, channel.ChannelID, + node1.PubKeyBytes, node2.PubKeyBytes, + ) + if err != nil { + return ChannelEdge{}, fmt.Errorf("unable to build "+ + "channel policies: %w", err) + } + + return ChannelEdge{ + Info: channel, + Policy1: p1, + Policy2: p2, + Node1: node1, + Node2: node2, + }, nil +} + +// updateChanCacheBatch updates the channel cache with multiple edges at once. +// This method acquires the cache lock only once for the entire batch. +func (s *SQLStore) updateChanCacheBatch(edgesToCache map[uint64]ChannelEdge) { + if len(edgesToCache) == 0 { + return + } s.cacheMu.Lock() defer s.cacheMu.Unlock() - var ( - ctx = context.TODO() - // To ensure we don't return duplicate ChannelEdges, we'll use - // an additional map to keep track of the edges already seen to - // prevent re-adding it. - edgesSeen = make(map[uint64]struct{}) - edgesToCache = make(map[uint64]ChannelEdge) - edges []ChannelEdge - hits int - ) - - err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - rows, err := db.GetChannelsByPolicyLastUpdateRange( - ctx, sqlc.GetChannelsByPolicyLastUpdateRangeParams{ - Version: int16(ProtocolV1), - StartTime: sqldb.SQLInt64(startTime.Unix()), - EndTime: sqldb.SQLInt64(endTime.Unix()), - }, - ) - if err != nil { - return err - } - - if len(rows) == 0 { - return nil - } - - // We'll pre-allocate the slices and maps here with a best - // effort size in order to avoid unnecessary allocations later - // on. - uncachedRows := make( - []sqlc.GetChannelsByPolicyLastUpdateRangeRow, 0, - len(rows), - ) - edgesToCache = make(map[uint64]ChannelEdge, len(rows)) - edgesSeen = make(map[uint64]struct{}, len(rows)) - edges = make([]ChannelEdge, 0, len(rows)) - - // Separate cached from non-cached channels since we will only - // batch load the data for the ones we haven't cached yet. - for _, row := range rows { - chanIDInt := byteOrder.Uint64(row.GraphChannel.Scid) - - // Skip duplicates. - if _, ok := edgesSeen[chanIDInt]; ok { - continue - } - edgesSeen[chanIDInt] = struct{}{} - - // Check cache first. - if channel, ok := s.chanCache.get(chanIDInt); ok { - hits++ - edges = append(edges, channel) - continue - } - - // Mark this row as one we need to batch load data for. - uncachedRows = append(uncachedRows, row) - } - - // If there are no uncached rows, then we can return early. - if len(uncachedRows) == 0 { - return nil - } - - // Batch load data for all uncached channels. - newEdges, err := batchBuildChannelEdges( - ctx, s.cfg, db, uncachedRows, - ) - if err != nil { - return fmt.Errorf("unable to batch build channel "+ - "edges: %w", err) - } - - edges = append(edges, newEdges...) - - return nil - }, sqldb.NoOpReset) - if err != nil { - return nil, fmt.Errorf("unable to fetch channels: %w", err) + for chanID, edge := range edgesToCache { + s.chanCache.insert(chanID, edge) } +} - // Insert any edges loaded from disk into the cache. - for chanid, channel := range edgesToCache { - s.chanCache.insert(chanid, channel) - } +// ChanUpdatesInHorizon returns all the known channel edges which have at least +// one edge that has an update timestamp within the specified horizon. +// +// Iterator Lifecycle: +// 1. Initialize state (edgesSeen map, cache tracking, pagination cursors) +// 2. Query batch of channels with policies in time range +// 3. For each channel: check if seen, check cache, or build from DB +// 4. Yield channels to caller +// 5. Update cache after successful batch +// 6. Repeat with updated pagination cursor until no more results +// +// NOTE: This is part of the V1Store interface. +func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time, + opts ...IteratorOption) (iter.Seq[ChannelEdge], error) { - if len(edges) > 0 { - log.Debugf("ChanUpdatesInHorizon hit percentage: %.2f (%d/%d)", - float64(hits)*100/float64(len(edges)), hits, len(edges)) - } else { - log.Debugf("ChanUpdatesInHorizon returned no edges in "+ - "horizon (%s, %s)", startTime, endTime) + // Apply options. + cfg := defaultIteratorConfig() + for _, opt := range opts { + opt(cfg) } return func(yield func(ChannelEdge) bool) { - for _, edge := range edges { - if !yield(edge) { + var ( + ctx = context.TODO() + edgesSeen = make(map[uint64]struct{}) + edgesToCache = make(map[uint64]ChannelEdge) + hits int + total int + lastUpdateTime sql.NullInt64 + lastID sql.NullInt64 + hasMore = true + ) + + // Each iteration, we'll read a batch amount of channel updates + // (consulting the cache along the way), yield them, then loop + // back to decide if we have any more updates to read out. + for hasMore { + var batch []ChannelEdge + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), + func(db SQLQueries) error { + //nolint:ll + params := sqlc.GetChannelsByPolicyLastUpdateRangeParams{ + Version: int16(ProtocolV1), + StartTime: sqldb.SQLInt64( + startTime.Unix(), + ), + EndTime: sqldb.SQLInt64( + endTime.Unix(), + ), + LastUpdateTime: lastUpdateTime, + LastID: lastID, + MaxResults: sql.NullInt32{ + Int32: int32( + cfg.chanUpdateIterBatchSize, + ), + Valid: true, + }, + } + //nolint:ll + rows, err := db.GetChannelsByPolicyLastUpdateRange( + ctx, params, + ) + if err != nil { + return err + } + + //nolint:ll + hasMore = len(rows) == cfg.chanUpdateIterBatchSize + + //nolint:ll + for _, row := range rows { + lastUpdateTime = sql.NullInt64{ + Int64: extractMaxUpdateTime(row), + Valid: true, + } + lastID = sql.NullInt64{ + Int64: row.GraphChannel.ID, + Valid: true, + } + + // Skip if we've already + // processed this channel. + chanIDInt := byteOrder.Uint64( + row.GraphChannel.Scid, + ) + _, ok := edgesSeen[chanIDInt] + if ok { + continue + } + + s.cacheMu.RLock() + channel, ok := s.chanCache.get( + chanIDInt, + ) + s.cacheMu.RUnlock() + if ok { + hits++ + total++ + edgesSeen[chanIDInt] = struct{}{} + batch = append(batch, channel) + + continue + } + + chanEdge, err := s.buildChannelFromRow( + ctx, db, row, + ) + if err != nil { + return err + } + + edgesSeen[chanIDInt] = struct{}{} + edgesToCache[chanIDInt] = chanEdge + + batch = append(batch, chanEdge) + + total++ + } + + return nil + }, func() { + batch = nil + edgesSeen = make(map[uint64]struct{}) + edgesToCache = make( + map[uint64]ChannelEdge, + ) + }) + + if err != nil { + log.Errorf("ChanUpdatesInHorizon "+ + "batch error: %v", err) + return } + + for _, edge := range batch { + if !yield(edge) { + return + } + } + + // Update cache after successful batch yield, setting + // the cache lock only once for the entire batch. + s.updateChanCacheBatch(edgesToCache) + edgesToCache = make(map[uint64]ChannelEdge) + + // If the batch didn't yield anything, then we're done. + if len(batch) == 0 { + break + } + } + + if total > 0 { + log.Debugf("ChanUpdatesInHorizon hit percentage: "+ + "%.2f (%d/%d)", + float64(hits)*100/float64(total), hits, total) + } else { + log.Debugf("ChanUpdatesInHorizon returned no edges "+ + "in horizon (%s, %s)", startTime, endTime) } }, nil } diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 8c3f641ed..9c2702737 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -1143,18 +1143,39 @@ WHERE c.version = $1 OR (cp2.last_update >= $2 AND cp2.last_update < $3) ) + -- Pagination using compound cursor (max_update_time, id). + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + (CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END > COALESCE($4, -1)) + OR + (CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END = COALESCE($4, -1) + AND c.id > COALESCE($5, -1)) + ) ORDER BY CASE WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) THEN COALESCE(cp1.last_update, 0) ELSE COALESCE(cp2.last_update, 0) - END ASC + END ASC, + c.id ASC +LIMIT COALESCE($6, 999999999) ` type GetChannelsByPolicyLastUpdateRangeParams struct { - Version int16 - StartTime sql.NullInt64 - EndTime sql.NullInt64 + Version int16 + StartTime sql.NullInt64 + EndTime sql.NullInt64 + LastUpdateTime sql.NullInt64 + LastID sql.NullInt64 + MaxResults interface{} } type GetChannelsByPolicyLastUpdateRangeRow struct { @@ -1194,7 +1215,14 @@ type GetChannelsByPolicyLastUpdateRangeRow struct { } func (q *Queries) GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error) { - rows, err := q.db.QueryContext(ctx, getChannelsByPolicyLastUpdateRange, arg.Version, arg.StartTime, arg.EndTime) + rows, err := q.db.QueryContext(ctx, getChannelsByPolicyLastUpdateRange, + arg.Version, + arg.StartTime, + arg.EndTime, + arg.LastUpdateTime, + arg.LastID, + arg.MaxResults, + ) if err != nil { return nil, err } diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 0621bf9f8..19087fc1b 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -472,12 +472,30 @@ WHERE c.version = @version OR (cp2.last_update >= @start_time AND cp2.last_update < @end_time) ) + -- Pagination using compound cursor (max_update_time, id). + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + (CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END > COALESCE(sqlc.narg('last_update_time'), -1)) + OR + (CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END = COALESCE(sqlc.narg('last_update_time'), -1) + AND c.id > COALESCE(sqlc.narg('last_id'), -1)) + ) ORDER BY CASE WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) THEN COALESCE(cp1.last_update, 0) ELSE COALESCE(cp2.last_update, 0) - END ASC; + END ASC, + c.id ASC +LIMIT COALESCE(sqlc.narg('max_results'), 999999999); -- name: GetChannelByOutpointWithPolicies :one SELECT