From ddc0e95edac64ecb7167bce0398ac2db7e86b8fc Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 18 Jul 2025 11:47:25 +0200 Subject: [PATCH] graph/db+sqldb: delete channels in batches Use the new `SLICES` directive to add a DeleteChannels query which takes a set of DB channel IDs. Then replace all our calls to DeleteChannel with a paginated call to DeleteChannels. --- graph/db/sql_store.go | 66 ++++++++++++++++++++++++------------ sqldb/sqlc/graph.sql.go | 29 +++++++++++----- sqldb/sqlc/querier.go | 2 +- sqldb/sqlc/queries/graph.sql | 5 +-- 4 files changed, 69 insertions(+), 33 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 1c02a7b89..081154f0f 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -108,7 +108,7 @@ type SQLQueries interface { GetChannelByOutpointWithPolicies(ctx context.Context, arg sqlc.GetChannelByOutpointWithPoliciesParams) (sqlc.GetChannelByOutpointWithPoliciesRow, error) GetPublicV1ChannelsBySCID(ctx context.Context, arg sqlc.GetPublicV1ChannelsBySCIDParams) ([]sqlc.GraphChannel, error) GetSCIDByOutpoint(ctx context.Context, arg sqlc.GetSCIDByOutpointParams) ([]byte, error) - DeleteChannel(ctx context.Context, id int64) error + DeleteChannels(ctx context.Context, ids []int64) error CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error @@ -1725,6 +1725,7 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, deleted []*models.ChannelEdgeInfo ) err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { + chanIDsToDelete := make([]int64, 0, len(chanIDs)) chanCallBack := func(ctx context.Context, row sqlc.GetChannelsBySCIDWithPoliciesRow) error { @@ -1748,13 +1749,10 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, return err } - err = db.DeleteChannel(ctx, row.GraphChannel.ID) - if err != nil { - return fmt.Errorf("unable to delete "+ - "channel: %w", err) - } - deleted = append(deleted, info) + chanIDsToDelete = append( + chanIDsToDelete, row.GraphChannel.ID, + ) if !markZombie { return nil @@ -1809,7 +1807,7 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, return ErrEdgeNotFound } - return nil + return s.deleteChannels(ctx, db, chanIDsToDelete) }, func() { deleted = nil @@ -2462,6 +2460,8 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint, prunedNodes []route.Vertex ) err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error { + var chansToDelete []int64 + // Define the callback function for processing each channel. channelCallback := func(ctx context.Context, row sqlc.GetChannelsByOutpointsRow) error { @@ -2481,13 +2481,10 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint, return err } - err = db.DeleteChannel(ctx, row.GraphChannel.ID) - if err != nil { - return fmt.Errorf("unable to delete "+ - "channel: %w", err) - } - closedChans = append(closedChans, info) + chansToDelete = append( + chansToDelete, row.GraphChannel.ID, + ) return nil } @@ -2500,6 +2497,11 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint, "outpoints: %w", err) } + err = s.deleteChannels(ctx, db, chansToDelete) + if err != nil { + return fmt.Errorf("unable to delete channels: %w", err) + } + err = db.UpsertPruneLogEntry( ctx, sqlc.UpsertPruneLogEntryParams{ BlockHash: blockHash[:], @@ -2565,6 +2567,27 @@ func (s *SQLStore) forEachChanInOutpoints(ctx context.Context, db SQLQueries, ) } +func (s *SQLStore) deleteChannels(ctx context.Context, db SQLQueries, + dbIDs []int64) error { + + // Create a wrapper that uses the transaction's db instance to execute + // the query. + queryWrapper := func(ctx context.Context, ids []int64) ([]any, error) { + return nil, db.DeleteChannels(ctx, ids) + } + + idConverter := func(id int64) int64 { + return id + } + + return sqldb.ExecutePagedQuery( + ctx, s.cfg.PaginationCfg, dbIDs, idConverter, + queryWrapper, func(ctx context.Context, _ any) error { + return nil + }, + ) +} + // ChannelView returns the verifiable edge information for each active channel // within the known channel graph. The set of UTXOs (along with their scripts) // returned are the ones that need to be watched on chain to detect channel @@ -2740,7 +2763,8 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) ( return fmt.Errorf("unable to fetch channels: %w", err) } - for _, row := range rows { + chanIDsToDelete := make([]int64, len(rows)) + for i, row := range rows { node1, node2, err := buildNodeVertices( row.Node1PubKey, row.Node2PubKey, ) @@ -2756,15 +2780,15 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) ( return err } - err = db.DeleteChannel(ctx, row.GraphChannel.ID) - if err != nil { - return fmt.Errorf("unable to delete "+ - "channel: %w", err) - } - + chanIDsToDelete[i] = row.GraphChannel.ID removedChans = append(removedChans, channel) } + err = s.deleteChannels(ctx, db, chanIDsToDelete) + if err != nil { + return fmt.Errorf("unable to delete channels: %w", err) + } + return db.DeletePruneLogEntriesInRange( ctx, sqlc.DeletePruneLogEntriesInRangeParams{ StartHeight: int64(height), diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 2f0ba8d67..89a92927f 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -143,15 +143,6 @@ func (q *Queries) CreateChannelExtraType(ctx context.Context, arg CreateChannelE return err } -const deleteChannel = `-- name: DeleteChannel :exec -DELETE FROM graph_channels WHERE id = $1 -` - -func (q *Queries) DeleteChannel(ctx context.Context, id int64) error { - _, err := q.db.ExecContext(ctx, deleteChannel, id) - return err -} - const deleteChannelPolicyExtraTypes = `-- name: DeleteChannelPolicyExtraTypes :exec DELETE FROM graph_channel_policy_extra_types WHERE channel_policy_id = $1 @@ -162,6 +153,26 @@ func (q *Queries) DeleteChannelPolicyExtraTypes(ctx context.Context, channelPoli return err } +const deleteChannels = `-- name: DeleteChannels :exec +DELETE FROM graph_channels +WHERE id IN (/*SLICE:ids*/?) +` + +func (q *Queries) DeleteChannels(ctx context.Context, ids []int64) error { + query := deleteChannels + var queryParams []interface{} + if len(ids) > 0 { + for _, v := range ids { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:ids*/?", makeQueryParams(len(queryParams), len(ids)), 1) + } else { + query = strings.Replace(query, "/*SLICE:ids*/?", "NULL", 1) + } + _, err := q.db.ExecContext(ctx, query, queryParams...) + return err +} + const deleteExtraNodeType = `-- name: DeleteExtraNodeType :exec DELETE FROM graph_node_extra_types WHERE node_id = $1 diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index cd32dc75b..9b9d010b4 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -18,8 +18,8 @@ type Querier interface { CreateChannel(ctx context.Context, arg CreateChannelParams) (int64, error) CreateChannelExtraType(ctx context.Context, arg CreateChannelExtraTypeParams) error DeleteCanceledInvoices(ctx context.Context) (sql.Result, error) - DeleteChannel(ctx context.Context, id int64) error DeleteChannelPolicyExtraTypes(ctx context.Context, channelPolicyID int64) error + DeleteChannels(ctx context.Context, ids []int64) error DeleteExtraNodeType(ctx context.Context, arg DeleteExtraNodeTypeParams) error DeleteInvoice(ctx context.Context, arg DeleteInvoiceParams) (sql.Result, error) DeleteNode(ctx context.Context, id int64) error diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 52c09e23d..8551a7706 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -569,8 +569,9 @@ WHERE c.version = $1 AND c.id > $2 ORDER BY c.id LIMIT $3; --- name: DeleteChannel :exec -DELETE FROM graph_channels WHERE id = $1; +-- name: DeleteChannels :exec +DELETE FROM graph_channels +WHERE id IN (sqlc.slice('ids')/*SLICE:ids*/); /* ───────────────────────────────────────────── graph_channel_features table queries