diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index d79357de3..71b1b201a 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -1,8 +1,10 @@ package graphdb import ( + "bytes" "cmp" "context" + "database/sql" "errors" "fmt" "net" @@ -55,6 +57,11 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend, err) } + // 4) Migrate the Prune log. + if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil { + return fmt.Errorf("could not migrate prune log: %w", err) + } + log.Infof("Finished migration of the graph store from KV to SQL in %v", time.Since(t0)) @@ -506,6 +513,115 @@ func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries, return nil } +// migratePruneLog migrates the prune log from the KV backend to the SQL +// database. It iterates over each prune log entry in the KV store, inserts it +// into the SQL database, and then verifies that the entry was inserted +// correctly by fetching it back from the SQL database and comparing it to the +// original entry. +func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend, + sqlDB SQLQueries) error { + + var ( + count uint64 + pruneTipHeight uint32 + pruneTipHash chainhash.Hash + ) + + // migrateSinglePruneEntry is a helper function that inserts a single + // prune log entry into the SQL database and verifies that it was + // inserted correctly. + migrateSinglePruneEntry := func(height uint32, + hash *chainhash.Hash) error { + + count++ + + // Keep track of the prune tip height and hash. + if height > pruneTipHeight { + pruneTipHeight = height + pruneTipHash = *hash + } + + err := sqlDB.UpsertPruneLogEntry( + ctx, sqlc.UpsertPruneLogEntryParams{ + BlockHeight: int64(height), + BlockHash: hash[:], + }, + ) + if err != nil { + return fmt.Errorf("unable to insert prune log "+ + "entry for height %d: %w", height, err) + } + + // Now, check that the entry was inserted correctly. + migratedHash, err := sqlDB.GetPruneHashByHeight( + ctx, int64(height), + ) + if err != nil { + return fmt.Errorf("could not get prune hash "+ + "for height %d: %w", height, err) + } + + return sqldb.CompareRecords( + hash[:], migratedHash, "prune log entry", + ) + } + + // Iterate over each prune log entry in the KV store and migrate it to + // the SQL database. + err := forEachPruneLogEntry( + kvBackend, func(height uint32, hash *chainhash.Hash) error { + err := migrateSinglePruneEntry(height, hash) + if err != nil { + return fmt.Errorf("could not migrate "+ + "prune log entry at height %d: %w", + height, err) + } + + return nil + }, + ) + if err != nil { + return fmt.Errorf("could not migrate prune log: %w", err) + } + + // Check that the prune tip is set correctly in the SQL + // database. + pruneTip, err := sqlDB.GetPruneTip(ctx) + if errors.Is(err, sql.ErrNoRows) { + // The ErrGraphNeverPruned error is expected if no prune log + // entries were migrated from the kvdb store. Otherwise, it's + // an unexpected error. + if count == 0 { + log.Infof("No prune log entries found in KV store " + + "to migrate") + return nil + } + // Fall-through to the next error check. + } + if err != nil { + return fmt.Errorf("could not get prune tip: %w", err) + } + + if pruneTip.BlockHeight != int64(pruneTipHeight) || + !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) { + + return fmt.Errorf("prune tip mismatch after migration: "+ + "expected height %d, hash %s; got height %d, "+ + "hash %s", pruneTipHeight, pruneTipHash, + pruneTip.BlockHeight, + chainhash.Hash(pruneTip.BlockHash)) + } + + log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+ + "tip is: height %d, hash: %s", count, pruneTipHeight, + pruneTipHash) + + return nil +} + +// getAndBuildChanAndPolicies is a helper that builds the channel edge info +// and policies from the given row returned by the SQL query +// GetChannelBySCIDWithPolicies. func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries, row sqlc.GetChannelBySCIDWithPoliciesRow, chain chainhash.Hash) (*models.ChannelEdgeInfo, @@ -542,3 +658,31 @@ func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries, return edge, policy1, policy2, nil } + +// forEachPruneLogEntry iterates over each prune log entry in the KV +// backend and calls the provided callback function for each entry. +func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32, + hash *chainhash.Hash) error) error { + + return kvdb.View(db, func(tx kvdb.RTx) error { + metaBucket := tx.ReadBucket(graphMetaBucket) + if metaBucket == nil { + return ErrGraphNotFound + } + + pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket) + if pruneBucket == nil { + // The graph has never been pruned and so, there are no + // entries to iterate over. + return nil + } + + return pruneBucket.ForEach(func(k, v []byte) error { + blockHeight := byteOrder.Uint32(k) + var blockHash chainhash.Hash + copy(blockHash[:], v) + + return cb(blockHeight, &blockHash) + }) + }, func() {}) +} diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index 6e57b24ac..b2b884261 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -6,6 +6,7 @@ import ( "bytes" "cmp" "context" + "crypto/rand" "errors" "fmt" "image/color" @@ -21,6 +22,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/lightningnetwork/lnd/graph/db/models" @@ -217,6 +219,45 @@ func TestMigrateGraphToSQL(t *testing.T) { numPolicies: 3, }, }, + { + name: "prune log", + write: func(t *testing.T, db *KVStore, object any) { + var hash chainhash.Hash + _, err := rand.Read(hash[:]) + require.NoError(t, err) + + switch obj := object.(type) { + case *models.LightningNode: + err = db.SetSourceNode(ctx, obj) + default: + height, ok := obj.(uint32) + require.True(t, ok) + + _, _, err = db.PruneGraph( + nil, &hash, height, + ) + } + require.NoError(t, err) + }, + objects: []any{ + // The PruneGraph call requires that the source + // node be set. So that is the first object + // we will write. + &models.LightningNode{ + HaveNodeAnnouncement: false, + PubKeyBytes: testPub, + }, + // Now we add some block heights to prune + // the graph at. + uint32(1), uint32(2), uint32(20), uint32(3), + uint32(4), + }, + expGraphStats: graphStats{ + numNodes: 1, + srcNodeSet: true, + pruneTip: 20, + }, + }, } for _, test := range tests { @@ -253,6 +294,7 @@ type graphStats struct { srcNodeSet bool numChannels int numPolicies int + pruneTip int } // assertInSync checks that the KVStore and SQLStore both contain the same @@ -276,6 +318,12 @@ func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore, require.Len(t, sqlChannels, stats.numChannels) require.Equal(t, stats.numPolicies, sqlChannels.CountPolicies()) require.Equal(t, fetchAllChannelsAndPolicies(t, kvDB), sqlChannels) + + // 4) Assert prune logs match. For this one, we iterate through the + // prune log of the kvdb store and check that the entries match the + // entries in the SQL store. Then we just do a final check to ensure + // that the prune tip also matches. + checkKVPruneLogEntries(t, kvDB, sqlDB, stats.pruneTip) } // fetchAllNodes retrieves all nodes from the given store and returns them @@ -379,6 +427,46 @@ func fetchAllChannelsAndPolicies(t *testing.T, store V1Store) chanSet { return channels } +// checkKVPruneLogEntries iterates through the prune log entries in the +// KVStore and checks that there is an entry for each in the SQLStore. It then +// does a final check to ensure that the prune tips in both stores match. +func checkKVPruneLogEntries(t *testing.T, kv *KVStore, sql *SQLStore, + expTip int) { + + // Iterate through the prune log entries in the KVStore and + // check that each entry exists in the SQLStore. + err := forEachPruneLogEntry( + kv.db, func(height uint32, hash *chainhash.Hash) error { + sqlHash, err := sql.db.GetPruneHashByHeight( + context.Background(), int64(height), + ) + require.NoError(t, err) + require.Equal(t, hash[:], sqlHash) + + return nil + }, + ) + require.NoError(t, err) + + kvPruneHash, kvPruneHeight, kvPruneErr := kv.PruneTip() + sqlPruneHash, sqlPruneHeight, sqlPruneErr := sql.PruneTip() + + // If the prune error is ErrGraphNeverPruned, then we expect + // the SQL prune error to also be ErrGraphNeverPruned. + if errors.Is(kvPruneErr, ErrGraphNeverPruned) { + require.ErrorIs(t, sqlPruneErr, ErrGraphNeverPruned) + return + } + + // Otherwise, we expect both prune errors to be nil and the + // prune hashes and heights to match. + require.NoError(t, kvPruneErr) + require.NoError(t, sqlPruneErr) + require.Equal(t, kvPruneHash[:], sqlPruneHash[:]) + require.Equal(t, kvPruneHeight, sqlPruneHeight) + require.Equal(t, expTip, int(sqlPruneHeight)) +} + // setUpKVStore initializes a new KVStore for testing. func setUpKVStore(t *testing.T) *KVStore { kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph") diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index a77709b8e..a42d4472d 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -135,6 +135,7 @@ type SQLQueries interface { Prune log table queries. */ GetPruneTip(ctx context.Context) (sqlc.PruneLog, error) + GetPruneHashByHeight(ctx context.Context, blockHeight int64) ([]byte, error) UpsertPruneLogEntry(ctx context.Context, arg sqlc.UpsertPruneLogEntryParams) error DeletePruneLogEntriesInRange(ctx context.Context, arg sqlc.DeletePruneLogEntriesInRangeParams) error diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 77b5ee555..d6535e806 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -1350,6 +1350,19 @@ func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByL return items, nil } +const getPruneHashByHeight = `-- name: GetPruneHashByHeight :one +SELECT block_hash +FROM prune_log +WHERE block_height = $1 +` + +func (q *Queries) GetPruneHashByHeight(ctx context.Context, blockHeight int64) ([]byte, error) { + row := q.db.QueryRowContext(ctx, getPruneHashByHeight, blockHeight) + var block_hash []byte + err := row.Scan(&block_hash) + return block_hash, err +} + const getPruneTip = `-- name: GetPruneTip :one SELECT block_height, block_hash FROM prune_log diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index b6f776a74..71fc4d281 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -63,6 +63,7 @@ type Querier interface { GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeaturesByPubKeyParams) ([]int32, error) GetNodeIDByPubKey(ctx context.Context, arg GetNodeIDByPubKeyParams) (int64, error) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error) + GetPruneHashByHeight(ctx context.Context, blockHeight int64) ([]byte, error) GetPruneTip(ctx context.Context) (PruneLog, error) GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]Channel, error) GetSCIDByOutpoint(ctx context.Context, arg GetSCIDByOutpointParams) ([]byte, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index f7c1ebbc9..ec43410c9 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -728,6 +728,11 @@ FROM prune_log ORDER BY block_height DESC LIMIT 1; +-- name: GetPruneHashByHeight :one +SELECT block_hash +FROM prune_log +WHERE block_height = $1; + -- name: DeletePruneLogEntriesInRange :exec DELETE FROM prune_log WHERE block_height >= @start_height