graph/db+sqldb: validate prune log migration using batching

As was done in the previous commits for nodes & channels, we update the
migratePruneLog function here so that it validates migrated entries in
batches rather than one-by-one.
This commit is contained in:
Elle Mouton
2025-08-13 13:50:20 +02:00
parent 81c54611c1
commit 8554f17b3f
5 changed files with 135 additions and 41 deletions

View File

@@ -61,7 +61,8 @@ func MigrateGraphToSQL(ctx context.Context, cfg *SQLStoreConfig,
}
// 4) Migrate the Prune log.
if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
err = migratePruneLog(ctx, cfg.QueryCfg, kvBackend, sqlDB)
if err != nil {
return fmt.Errorf("could not migrate prune log: %w", err)
}
@@ -794,12 +795,11 @@ func validateMigratedChannelWithBatchData(cfg *SQLStoreConfig,
}
// migratePruneLog migrates the prune log from the KV backend to the SQL
// database. It iterates over each prune log entry in the KV store, inserts it
// into the SQL database, and then verifies that the entry was inserted
// correctly by fetching it back from the SQL database and comparing it to the
// original entry.
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
sqlDB SQLQueries) error {
// database. It collects entries in batches, inserts them individually, and then
// validates them in batches using GetPruneEntriesForHeights for better i
// performance.
func migratePruneLog(ctx context.Context, cfg *sqldb.QueryConfig,
kvBackend kvdb.Backend, sqlDB SQLQueries) error {
var (
count uint64
@@ -813,12 +813,61 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
}
)
// migrateSinglePruneEntry is a helper function that inserts a single
// prune log entry into the SQL database and verifies that it was
// inserted correctly.
migrateSinglePruneEntry := func(height uint32,
hash *chainhash.Hash) error {
batch := make(map[uint32]chainhash.Hash, cfg.MaxBatchSize)
// validateBatch validates a batch of prune entries using batch query.
validateBatch := func() error {
if len(batch) == 0 {
return nil
}
// Extract heights for the batch query.
heights := make([]int64, 0, len(batch))
for height := range batch {
heights = append(heights, int64(height))
}
// Batch fetch all entries from the database.
rows, err := sqlDB.GetPruneEntriesForHeights(ctx, heights)
if err != nil {
return fmt.Errorf("could not batch get prune "+
"entries: %w", err)
}
if len(rows) != len(batch) {
return fmt.Errorf("expected to fetch %d prune "+
"entries, but got %d", len(batch),
len(rows))
}
// Validate each entry in the batch.
for _, row := range rows {
kvdbHash, ok := batch[uint32(row.BlockHeight)]
if !ok {
return fmt.Errorf("prune entry for height %d "+
"not found in batch", row.BlockHeight)
}
err := sqldb.CompareRecords(
kvdbHash[:], row.BlockHash,
fmt.Sprintf("prune log entry at height %d",
row.BlockHash),
)
if err != nil {
return err
}
}
// Reset the batch map for the next iteration.
batch = make(map[uint32]chainhash.Hash, cfg.MaxBatchSize)
return nil
}
// Iterate over each prune log entry in the KV store and migrate it to
// the SQL database.
err := forEachPruneLogEntry(
kvBackend, func(height uint32, hash *chainhash.Hash) error {
count++
chunk++
@@ -828,6 +877,7 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
pruneTipHash = *hash
}
// Insert the entry (individual inserts for now).
err := sqlDB.UpsertPruneLogEntry(
ctx, sqlc.UpsertPruneLogEntryParams{
BlockHeight: int64(height),
@@ -839,29 +889,16 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
"entry for height %d: %w", height, err)
}
// Now, check that the entry was inserted correctly.
migratedHash, err := sqlDB.GetPruneHashByHeight(
ctx, int64(height),
)
if err != nil {
return fmt.Errorf("could not get prune hash "+
"for height %d: %w", height, err)
}
// Add to validation batch.
batch[height] = *hash
return sqldb.CompareRecords(
hash[:], migratedHash, "prune log entry",
)
}
// Iterate over each prune log entry in the KV store and migrate it to
// the SQL database.
err := forEachPruneLogEntry(
kvBackend, func(height uint32, hash *chainhash.Hash) error {
err := migrateSinglePruneEntry(height, hash)
// Validate batch when full.
if len(batch) >= cfg.MaxBatchSize {
err := validateBatch()
if err != nil {
return fmt.Errorf("could not migrate "+
"prune log entry at height %d: %w",
height, err)
return fmt.Errorf("batch "+
"validation failed: %w", err)
}
}
s.Do(func() {
@@ -882,6 +919,15 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
return fmt.Errorf("could not migrate prune log: %w", err)
}
// Validate any remaining entries in the batch.
if len(batch) > 0 {
err := validateBatch()
if err != nil {
return fmt.Errorf("final batch validation failed: %w",
err)
}
}
// Check that the prune tip is set correctly in the SQL
// database.
pruneTip, err := sqlDB.GetPruneTip(ctx)

View File

@@ -142,6 +142,7 @@ type SQLQueries interface {
*/
GetPruneTip(ctx context.Context) (sqlc.GraphPruneLog, error)
GetPruneHashByHeight(ctx context.Context, blockHeight int64) ([]byte, error)
GetPruneEntriesForHeights(ctx context.Context, heights []int64) ([]sqlc.GraphPruneLog, error)
UpsertPruneLogEntry(ctx context.Context, arg sqlc.UpsertPruneLogEntryParams) error
DeletePruneLogEntriesInRange(ctx context.Context, arg sqlc.DeletePruneLogEntriesInRangeParams) error

View File

@@ -1992,6 +1992,46 @@ func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByL
return items, nil
}
const getPruneEntriesForHeights = `-- name: GetPruneEntriesForHeights :many
SELECT block_height, block_hash
FROM graph_prune_log
WHERE block_height
IN (/*SLICE:heights*/?)
`
func (q *Queries) GetPruneEntriesForHeights(ctx context.Context, heights []int64) ([]GraphPruneLog, error) {
query := getPruneEntriesForHeights
var queryParams []interface{}
if len(heights) > 0 {
for _, v := range heights {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:heights*/?", makeQueryParams(len(queryParams), len(heights)), 1)
} else {
query = strings.Replace(query, "/*SLICE:heights*/?", "NULL", 1)
}
rows, err := q.db.QueryContext(ctx, query, queryParams...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GraphPruneLog
for rows.Next() {
var i GraphPruneLog
if err := rows.Scan(&i.BlockHeight, &i.BlockHash); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getPruneHashByHeight = `-- name: GetPruneHashByHeight :one
SELECT block_hash
FROM graph_prune_log

View File

@@ -71,6 +71,7 @@ type Querier interface {
GetNodeIDByPubKey(ctx context.Context, arg GetNodeIDByPubKeyParams) (int64, error)
GetNodesByIDs(ctx context.Context, ids []int64) ([]GraphNode, error)
GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]GraphNode, error)
GetPruneEntriesForHeights(ctx context.Context, heights []int64) ([]GraphPruneLog, error)
GetPruneHashByHeight(ctx context.Context, blockHeight int64) ([]byte, error)
GetPruneTip(ctx context.Context) (GraphPruneLog, error)
GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]GraphChannel, error)

View File

@@ -952,6 +952,12 @@ FROM graph_prune_log
ORDER BY block_height DESC
LIMIT 1;
-- name: GetPruneEntriesForHeights :many
SELECT block_height, block_hash
FROM graph_prune_log
WHERE block_height
IN (sqlc.slice('heights')/*SLICE:heights*/);
-- name: GetPruneHashByHeight :one
SELECT block_hash
FROM graph_prune_log