graph/db+sqldb: delete channels in batches

Use the new `SLICES` directive to add a DeleteChannels query which takes
a set of DB channel IDs. Then replace all our calls to DeleteChannel
with a paginated call to DeleteChannels.
This commit is contained in:
Elle Mouton
2025-07-18 11:47:25 +02:00
parent de6c030f29
commit ddc0e95eda
4 changed files with 69 additions and 33 deletions

View File

@@ -108,7 +108,7 @@ type SQLQueries interface {
GetChannelByOutpointWithPolicies(ctx context.Context, arg sqlc.GetChannelByOutpointWithPoliciesParams) (sqlc.GetChannelByOutpointWithPoliciesRow, error)
GetPublicV1ChannelsBySCID(ctx context.Context, arg sqlc.GetPublicV1ChannelsBySCIDParams) ([]sqlc.GraphChannel, error)
GetSCIDByOutpoint(ctx context.Context, arg sqlc.GetSCIDByOutpointParams) ([]byte, error)
DeleteChannel(ctx context.Context, id int64) error
DeleteChannels(ctx context.Context, ids []int64) error
CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
@@ -1725,6 +1725,7 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
deleted []*models.ChannelEdgeInfo
)
err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
chanIDsToDelete := make([]int64, 0, len(chanIDs))
chanCallBack := func(ctx context.Context,
row sqlc.GetChannelsBySCIDWithPoliciesRow) error {
@@ -1748,13 +1749,10 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
return err
}
err = db.DeleteChannel(ctx, row.GraphChannel.ID)
if err != nil {
return fmt.Errorf("unable to delete "+
"channel: %w", err)
}
deleted = append(deleted, info)
chanIDsToDelete = append(
chanIDsToDelete, row.GraphChannel.ID,
)
if !markZombie {
return nil
@@ -1809,7 +1807,7 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
return ErrEdgeNotFound
}
return nil
return s.deleteChannels(ctx, db, chanIDsToDelete)
}, func() {
deleted = nil
@@ -2462,6 +2460,8 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint,
prunedNodes []route.Vertex
)
err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
var chansToDelete []int64
// Define the callback function for processing each channel.
channelCallback := func(ctx context.Context,
row sqlc.GetChannelsByOutpointsRow) error {
@@ -2481,13 +2481,10 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint,
return err
}
err = db.DeleteChannel(ctx, row.GraphChannel.ID)
if err != nil {
return fmt.Errorf("unable to delete "+
"channel: %w", err)
}
closedChans = append(closedChans, info)
chansToDelete = append(
chansToDelete, row.GraphChannel.ID,
)
return nil
}
@@ -2500,6 +2497,11 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint,
"outpoints: %w", err)
}
err = s.deleteChannels(ctx, db, chansToDelete)
if err != nil {
return fmt.Errorf("unable to delete channels: %w", err)
}
err = db.UpsertPruneLogEntry(
ctx, sqlc.UpsertPruneLogEntryParams{
BlockHash: blockHash[:],
@@ -2565,6 +2567,27 @@ func (s *SQLStore) forEachChanInOutpoints(ctx context.Context, db SQLQueries,
)
}
func (s *SQLStore) deleteChannels(ctx context.Context, db SQLQueries,
dbIDs []int64) error {
// Create a wrapper that uses the transaction's db instance to execute
// the query.
queryWrapper := func(ctx context.Context, ids []int64) ([]any, error) {
return nil, db.DeleteChannels(ctx, ids)
}
idConverter := func(id int64) int64 {
return id
}
return sqldb.ExecutePagedQuery(
ctx, s.cfg.PaginationCfg, dbIDs, idConverter,
queryWrapper, func(ctx context.Context, _ any) error {
return nil
},
)
}
// ChannelView returns the verifiable edge information for each active channel
// within the known channel graph. The set of UTXOs (along with their scripts)
// returned are the ones that need to be watched on chain to detect channel
@@ -2740,7 +2763,8 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) (
return fmt.Errorf("unable to fetch channels: %w", err)
}
for _, row := range rows {
chanIDsToDelete := make([]int64, len(rows))
for i, row := range rows {
node1, node2, err := buildNodeVertices(
row.Node1PubKey, row.Node2PubKey,
)
@@ -2756,15 +2780,15 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) (
return err
}
err = db.DeleteChannel(ctx, row.GraphChannel.ID)
if err != nil {
return fmt.Errorf("unable to delete "+
"channel: %w", err)
}
chanIDsToDelete[i] = row.GraphChannel.ID
removedChans = append(removedChans, channel)
}
err = s.deleteChannels(ctx, db, chanIDsToDelete)
if err != nil {
return fmt.Errorf("unable to delete channels: %w", err)
}
return db.DeletePruneLogEntriesInRange(
ctx, sqlc.DeletePruneLogEntriesInRangeParams{
StartHeight: int64(height),