graph/db: migrate prune log

This commit expands the `MigrateGraphToSQL` to include migration of the
prune log.
This commit is contained in:
Elle Mouton
2025-07-02 12:06:29 +02:00
parent 63609b0801
commit 03ede9ccef
6 changed files with 252 additions and 0 deletions

View File

@@ -1,8 +1,10 @@
package graphdb package graphdb
import ( import (
"bytes"
"cmp" "cmp"
"context" "context"
"database/sql"
"errors" "errors"
"fmt" "fmt"
"net" "net"
@@ -55,6 +57,11 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
err) err)
} }
// 4) Migrate the Prune log.
if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
return fmt.Errorf("could not migrate prune log: %w", err)
}
log.Infof("Finished migration of the graph store from KV to SQL in %v", log.Infof("Finished migration of the graph store from KV to SQL in %v",
time.Since(t0)) time.Since(t0))
@@ -506,6 +513,115 @@ func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
return nil return nil
} }
// migratePruneLog migrates the prune log from the KV backend to the SQL
// database. It iterates over each prune log entry in the KV store, inserts it
// into the SQL database, and then verifies that the entry was inserted
// correctly by fetching it back from the SQL database and comparing it to the
// original entry.
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
sqlDB SQLQueries) error {
var (
count uint64
pruneTipHeight uint32
pruneTipHash chainhash.Hash
)
// migrateSinglePruneEntry is a helper function that inserts a single
// prune log entry into the SQL database and verifies that it was
// inserted correctly.
migrateSinglePruneEntry := func(height uint32,
hash *chainhash.Hash) error {
count++
// Keep track of the prune tip height and hash.
if height > pruneTipHeight {
pruneTipHeight = height
pruneTipHash = *hash
}
err := sqlDB.UpsertPruneLogEntry(
ctx, sqlc.UpsertPruneLogEntryParams{
BlockHeight: int64(height),
BlockHash: hash[:],
},
)
if err != nil {
return fmt.Errorf("unable to insert prune log "+
"entry for height %d: %w", height, err)
}
// Now, check that the entry was inserted correctly.
migratedHash, err := sqlDB.GetPruneHashByHeight(
ctx, int64(height),
)
if err != nil {
return fmt.Errorf("could not get prune hash "+
"for height %d: %w", height, err)
}
return sqldb.CompareRecords(
hash[:], migratedHash, "prune log entry",
)
}
// Iterate over each prune log entry in the KV store and migrate it to
// the SQL database.
err := forEachPruneLogEntry(
kvBackend, func(height uint32, hash *chainhash.Hash) error {
err := migrateSinglePruneEntry(height, hash)
if err != nil {
return fmt.Errorf("could not migrate "+
"prune log entry at height %d: %w",
height, err)
}
return nil
},
)
if err != nil {
return fmt.Errorf("could not migrate prune log: %w", err)
}
// Check that the prune tip is set correctly in the SQL
// database.
pruneTip, err := sqlDB.GetPruneTip(ctx)
if errors.Is(err, sql.ErrNoRows) {
// The ErrGraphNeverPruned error is expected if no prune log
// entries were migrated from the kvdb store. Otherwise, it's
// an unexpected error.
if count == 0 {
log.Infof("No prune log entries found in KV store " +
"to migrate")
return nil
}
// Fall-through to the next error check.
}
if err != nil {
return fmt.Errorf("could not get prune tip: %w", err)
}
if pruneTip.BlockHeight != int64(pruneTipHeight) ||
!bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
return fmt.Errorf("prune tip mismatch after migration: "+
"expected height %d, hash %s; got height %d, "+
"hash %s", pruneTipHeight, pruneTipHash,
pruneTip.BlockHeight,
chainhash.Hash(pruneTip.BlockHash))
}
log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
"tip is: height %d, hash: %s", count, pruneTipHeight,
pruneTipHash)
return nil
}
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
// and policies from the given row returned by the SQL query
// GetChannelBySCIDWithPolicies.
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries, func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
row sqlc.GetChannelBySCIDWithPoliciesRow, row sqlc.GetChannelBySCIDWithPoliciesRow,
chain chainhash.Hash) (*models.ChannelEdgeInfo, chain chainhash.Hash) (*models.ChannelEdgeInfo,
@@ -542,3 +658,31 @@ func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
return edge, policy1, policy2, nil return edge, policy1, policy2, nil
} }
// forEachPruneLogEntry iterates over each prune log entry in the KV
// backend and calls the provided callback function for each entry.
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
hash *chainhash.Hash) error) error {
return kvdb.View(db, func(tx kvdb.RTx) error {
metaBucket := tx.ReadBucket(graphMetaBucket)
if metaBucket == nil {
return ErrGraphNotFound
}
pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
if pruneBucket == nil {
// The graph has never been pruned and so, there are no
// entries to iterate over.
return nil
}
return pruneBucket.ForEach(func(k, v []byte) error {
blockHeight := byteOrder.Uint32(k)
var blockHash chainhash.Hash
copy(blockHash[:], v)
return cb(blockHeight, &blockHash)
})
}, func() {})
}

View File

@@ -6,6 +6,7 @@ import (
"bytes" "bytes"
"cmp" "cmp"
"context" "context"
"crypto/rand"
"errors" "errors"
"fmt" "fmt"
"image/color" "image/color"
@@ -21,6 +22,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog/v2" "github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/graph/db/models"
@@ -217,6 +219,45 @@ func TestMigrateGraphToSQL(t *testing.T) {
numPolicies: 3, numPolicies: 3,
}, },
}, },
{
name: "prune log",
write: func(t *testing.T, db *KVStore, object any) {
var hash chainhash.Hash
_, err := rand.Read(hash[:])
require.NoError(t, err)
switch obj := object.(type) {
case *models.LightningNode:
err = db.SetSourceNode(ctx, obj)
default:
height, ok := obj.(uint32)
require.True(t, ok)
_, _, err = db.PruneGraph(
nil, &hash, height,
)
}
require.NoError(t, err)
},
objects: []any{
// The PruneGraph call requires that the source
// node be set. So that is the first object
// we will write.
&models.LightningNode{
HaveNodeAnnouncement: false,
PubKeyBytes: testPub,
},
// Now we add some block heights to prune
// the graph at.
uint32(1), uint32(2), uint32(20), uint32(3),
uint32(4),
},
expGraphStats: graphStats{
numNodes: 1,
srcNodeSet: true,
pruneTip: 20,
},
},
} }
for _, test := range tests { for _, test := range tests {
@@ -253,6 +294,7 @@ type graphStats struct {
srcNodeSet bool srcNodeSet bool
numChannels int numChannels int
numPolicies int numPolicies int
pruneTip int
} }
// assertInSync checks that the KVStore and SQLStore both contain the same // assertInSync checks that the KVStore and SQLStore both contain the same
@@ -276,6 +318,12 @@ func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore,
require.Len(t, sqlChannels, stats.numChannels) require.Len(t, sqlChannels, stats.numChannels)
require.Equal(t, stats.numPolicies, sqlChannels.CountPolicies()) require.Equal(t, stats.numPolicies, sqlChannels.CountPolicies())
require.Equal(t, fetchAllChannelsAndPolicies(t, kvDB), sqlChannels) require.Equal(t, fetchAllChannelsAndPolicies(t, kvDB), sqlChannels)
// 4) Assert prune logs match. For this one, we iterate through the
// prune log of the kvdb store and check that the entries match the
// entries in the SQL store. Then we just do a final check to ensure
// that the prune tip also matches.
checkKVPruneLogEntries(t, kvDB, sqlDB, stats.pruneTip)
} }
// fetchAllNodes retrieves all nodes from the given store and returns them // fetchAllNodes retrieves all nodes from the given store and returns them
@@ -379,6 +427,46 @@ func fetchAllChannelsAndPolicies(t *testing.T, store V1Store) chanSet {
return channels return channels
} }
// checkKVPruneLogEntries iterates through the prune log entries in the
// KVStore and checks that there is an entry for each in the SQLStore. It then
// does a final check to ensure that the prune tips in both stores match.
func checkKVPruneLogEntries(t *testing.T, kv *KVStore, sql *SQLStore,
expTip int) {
// Iterate through the prune log entries in the KVStore and
// check that each entry exists in the SQLStore.
err := forEachPruneLogEntry(
kv.db, func(height uint32, hash *chainhash.Hash) error {
sqlHash, err := sql.db.GetPruneHashByHeight(
context.Background(), int64(height),
)
require.NoError(t, err)
require.Equal(t, hash[:], sqlHash)
return nil
},
)
require.NoError(t, err)
kvPruneHash, kvPruneHeight, kvPruneErr := kv.PruneTip()
sqlPruneHash, sqlPruneHeight, sqlPruneErr := sql.PruneTip()
// If the prune error is ErrGraphNeverPruned, then we expect
// the SQL prune error to also be ErrGraphNeverPruned.
if errors.Is(kvPruneErr, ErrGraphNeverPruned) {
require.ErrorIs(t, sqlPruneErr, ErrGraphNeverPruned)
return
}
// Otherwise, we expect both prune errors to be nil and the
// prune hashes and heights to match.
require.NoError(t, kvPruneErr)
require.NoError(t, sqlPruneErr)
require.Equal(t, kvPruneHash[:], sqlPruneHash[:])
require.Equal(t, kvPruneHeight, sqlPruneHeight)
require.Equal(t, expTip, int(sqlPruneHeight))
}
// setUpKVStore initializes a new KVStore for testing. // setUpKVStore initializes a new KVStore for testing.
func setUpKVStore(t *testing.T) *KVStore { func setUpKVStore(t *testing.T) *KVStore {
kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph") kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph")

View File

@@ -135,6 +135,7 @@ type SQLQueries interface {
Prune log table queries. Prune log table queries.
*/ */
GetPruneTip(ctx context.Context) (sqlc.PruneLog, error) GetPruneTip(ctx context.Context) (sqlc.PruneLog, error)
GetPruneHashByHeight(ctx context.Context, blockHeight int64) ([]byte, error)
UpsertPruneLogEntry(ctx context.Context, arg sqlc.UpsertPruneLogEntryParams) error UpsertPruneLogEntry(ctx context.Context, arg sqlc.UpsertPruneLogEntryParams) error
DeletePruneLogEntriesInRange(ctx context.Context, arg sqlc.DeletePruneLogEntriesInRangeParams) error DeletePruneLogEntriesInRange(ctx context.Context, arg sqlc.DeletePruneLogEntriesInRangeParams) error

View File

@@ -1350,6 +1350,19 @@ func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByL
return items, nil return items, nil
} }
const getPruneHashByHeight = `-- name: GetPruneHashByHeight :one
SELECT block_hash
FROM prune_log
WHERE block_height = $1
`
func (q *Queries) GetPruneHashByHeight(ctx context.Context, blockHeight int64) ([]byte, error) {
row := q.db.QueryRowContext(ctx, getPruneHashByHeight, blockHeight)
var block_hash []byte
err := row.Scan(&block_hash)
return block_hash, err
}
const getPruneTip = `-- name: GetPruneTip :one const getPruneTip = `-- name: GetPruneTip :one
SELECT block_height, block_hash SELECT block_height, block_hash
FROM prune_log FROM prune_log

View File

@@ -63,6 +63,7 @@ type Querier interface {
GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeaturesByPubKeyParams) ([]int32, error) GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeaturesByPubKeyParams) ([]int32, error)
GetNodeIDByPubKey(ctx context.Context, arg GetNodeIDByPubKeyParams) (int64, error) GetNodeIDByPubKey(ctx context.Context, arg GetNodeIDByPubKeyParams) (int64, error)
GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error)
GetPruneHashByHeight(ctx context.Context, blockHeight int64) ([]byte, error)
GetPruneTip(ctx context.Context) (PruneLog, error) GetPruneTip(ctx context.Context) (PruneLog, error)
GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]Channel, error) GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]Channel, error)
GetSCIDByOutpoint(ctx context.Context, arg GetSCIDByOutpointParams) ([]byte, error) GetSCIDByOutpoint(ctx context.Context, arg GetSCIDByOutpointParams) ([]byte, error)

View File

@@ -728,6 +728,11 @@ FROM prune_log
ORDER BY block_height DESC ORDER BY block_height DESC
LIMIT 1; LIMIT 1;
-- name: GetPruneHashByHeight :one
SELECT block_hash
FROM prune_log
WHERE block_height = $1;
-- name: DeletePruneLogEntriesInRange :exec -- name: DeletePruneLogEntriesInRange :exec
DELETE FROM prune_log DELETE FROM prune_log
WHERE block_height >= @start_height WHERE block_height >= @start_height