From c9a775e7fb0379f021723e79364988862c1ea480 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 2 Jul 2025 09:22:47 +0200 Subject: [PATCH 1/8] graph/db: fix build flag directive Fix a typo in the build directive. --- graph/db/test_sqlite.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graph/db/test_sqlite.go b/graph/db/test_sqlite.go index 76b59c84b..2af105259 100644 --- a/graph/db/test_sqlite.go +++ b/graph/db/test_sqlite.go @@ -1,4 +1,4 @@ -//go:build !test_db_posgres && test_db_sqlite +//go:build !test_db_postgres && test_db_sqlite package graphdb From 5d7abcdf03f7f69afd04d5dfdc7dc701933c55eb Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 2 Jul 2025 09:27:18 +0200 Subject: [PATCH 2/8] graph/db: refactor SQL DB creation files Factor out the transaction executor construction so that we can have access to the raw BatchedSQLQueries type from within tests. --- graph/db/test_postgres.go | 19 ++++--------------- graph/db/test_sql.go | 23 +++++++++++++++++++++++ graph/db/test_sqlite.go | 19 ++++--------------- 3 files changed, 31 insertions(+), 30 deletions(-) create mode 100644 graph/db/test_sql.go diff --git a/graph/db/test_postgres.go b/graph/db/test_postgres.go index 8067756e2..7af420d19 100644 --- a/graph/db/test_postgres.go +++ b/graph/db/test_postgres.go @@ -6,14 +6,12 @@ import ( "database/sql" "testing" - "github.com/btcsuite/btcd/chaincfg" "github.com/lightningnetwork/lnd/sqldb" - "github.com/stretchr/testify/require" ) -// NewTestDB is a helper function that creates a SQLStore backed by a postgres -// database for testing. -func NewTestDB(t testing.TB) V1Store { +// newBatchQuerier creates a new BatchedSQLQueries instance for testing +// using a PostgreSQL database fixture. +func newBatchQuerier(t testing.TB) BatchedSQLQueries { pgFixture := sqldb.NewTestPgFixture( t, sqldb.DefaultPostgresFixtureLifetime, ) @@ -23,18 +21,9 @@ func NewTestDB(t testing.TB) V1Store { db := sqldb.NewTestPostgresDB(t, pgFixture).BaseDB - executor := sqldb.NewTransactionExecutor( + return sqldb.NewTransactionExecutor( db, func(tx *sql.Tx) SQLQueries { return db.WithTx(tx) }, ) - - store, err := NewSQLStore( - &SQLStoreConfig{ - ChainHash: *chaincfg.MainNetParams.GenesisHash, - }, executor, - ) - require.NoError(t, err) - - return store } diff --git a/graph/db/test_sql.go b/graph/db/test_sql.go new file mode 100644 index 000000000..9d4d507b3 --- /dev/null +++ b/graph/db/test_sql.go @@ -0,0 +1,23 @@ +//go:build test_db_postgres || test_db_sqlite + +package graphdb + +import ( + "testing" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/stretchr/testify/require" +) + +// NewTestDB is a helper function that creates a SQLStore backed by a SQL +// database for testing. +func NewTestDB(t testing.TB) V1Store { + store, err := NewSQLStore( + &SQLStoreConfig{ + ChainHash: *chaincfg.MainNetParams.GenesisHash, + }, newBatchQuerier(t), + ) + require.NoError(t, err) + + return store +} diff --git a/graph/db/test_sqlite.go b/graph/db/test_sqlite.go index 2af105259..35f7cb5d8 100644 --- a/graph/db/test_sqlite.go +++ b/graph/db/test_sqlite.go @@ -6,28 +6,17 @@ import ( "database/sql" "testing" - "github.com/btcsuite/btcd/chaincfg" "github.com/lightningnetwork/lnd/sqldb" - "github.com/stretchr/testify/require" ) -// NewTestDB is a helper function that creates a SQLStore backed by a sqlite -// database for testing. -func NewTestDB(t testing.TB) V1Store { +// newBatchQuerier creates a new BatchedSQLQueries instance for testing +// using a SQLite database. +func newBatchQuerier(t testing.TB) BatchedSQLQueries { db := sqldb.NewTestSqliteDB(t).BaseDB - executor := sqldb.NewTransactionExecutor( + return sqldb.NewTransactionExecutor( db, func(tx *sql.Tx) SQLQueries { return db.WithTx(tx) }, ) - - store, err := NewSQLStore( - &SQLStoreConfig{ - ChainHash: *chaincfg.MainNetParams.GenesisHash, - }, executor, - ) - require.NoError(t, err) - - return store } From e3572e77c59035b6181dad4391980c89f399eea2 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 2 Jul 2025 09:47:52 +0200 Subject: [PATCH 3/8] graph/db: add SQL migration function and test framework In this commit, the basic framework for the graph SQL migration is added. This sets us up for the commits to follow which will add migration logic for each table in the graph commit by commit. --- graph/db/sql_migration.go | 62 +++++++++++++++++++++++++ graph/db/sql_migration_test.go | 82 ++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 graph/db/sql_migration.go create mode 100644 graph/db/sql_migration_test.go diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go new file mode 100644 index 000000000..67607bbb9 --- /dev/null +++ b/graph/db/sql_migration.go @@ -0,0 +1,62 @@ +package graphdb + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/kvdb" +) + +// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL +// backend. +// +// NOTE: this is currently not called from any code path. It is called via tests +// only for now and will be called from the main lnd binary once the +// migration is fully implemented and tested. +func MigrateGraphToSQL(_ context.Context, kvBackend kvdb.Backend, + _ SQLQueries, _ chainhash.Hash) error { + + log.Infof("Starting migration of the graph store from KV to SQL") + t0 := time.Now() + + // Check if there is a graph to migrate. + graphExists, err := checkGraphExists(kvBackend) + if err != nil { + return fmt.Errorf("failed to check graph existence: %w", err) + } + if !graphExists { + log.Infof("No graph found in KV store, skipping the migration") + return nil + } + + log.Infof("Finished migration of the graph store from KV to SQL in %v", + time.Since(t0)) + + return nil +} + +// checkGraphExists checks if the graph exists in the KV backend. +func checkGraphExists(db kvdb.Backend) (bool, error) { + // Check if there is even a graph to migrate. + err := db.View(func(tx kvdb.RTx) error { + // Check for the existence of the node bucket which is a top + // level bucket that would have been created on the initial + // creation of the graph store. + nodes := tx.ReadBucket(nodeBucket) + if nodes == nil { + return ErrGraphNotFound + } + + return nil + }, func() {}) + if errors.Is(err, ErrGraphNotFound) { + return false, nil + } else if err != nil { + return false, err + } + + return true, nil +} diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go new file mode 100644 index 000000000..e21eb2002 --- /dev/null +++ b/graph/db/sql_migration_test.go @@ -0,0 +1,82 @@ +//go:build test_db_postgres || test_db_sqlite + +package graphdb + +import ( + "context" + "testing" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/stretchr/testify/require" +) + +var testChain = *chaincfg.MainNetParams.GenesisHash + +// TestMigrateGraphToSQL tests various deterministic cases that we want to test +// for to ensure that our migration from a graph store backed by a KV DB to a +// SQL database works as expected. At the end of each test, the DBs are compared +// and expected to have the exact same data in them. +func TestMigrateGraphToSQL(t *testing.T) { + t.Parallel() + ctx := context.Background() + + tests := []struct { + name string + write func(t *testing.T, db *KVStore, object any) + objects []any + expGraphStats graphStats + }{ + { + name: "empty", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + // Set up our source kvdb DB. + kvDB := setUpKVStore(t) + + // Write the test objects to the kvdb store. + for _, object := range test.objects { + test.write(t, kvDB, object) + } + + // Set up our destination SQL DB. + sql, ok := NewTestDB(t).(*SQLStore) + require.True(t, ok) + + // Run the migration. + err := MigrateGraphToSQL( + ctx, kvDB.db, sql.db, testChain, + ) + require.NoError(t, err) + + // Validate that the two databases are now in sync. + assertInSync(t, kvDB, sql, test.expGraphStats) + }) + } +} + +// graphStats holds expected statistics about the graph after migration. +type graphStats struct { +} + +// assertInSync checks that the KVStore and SQLStore both contain the same +// graph data after migration. +func assertInSync(_ *testing.T, _ *KVStore, _ *SQLStore, stats graphStats) { +} + +// setUpKVStore initializes a new KVStore for testing. +func setUpKVStore(t *testing.T) *KVStore { + kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph") + require.NoError(t, err) + t.Cleanup(cleanup) + + kvStore, err := NewKVStore(kvDB) + require.NoError(t, err) + + return kvStore +} From 5c09652e1a8aec53a6521ec3d2d816941d7b5082 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 4 Jul 2025 10:18:46 +0200 Subject: [PATCH 4/8] graph/db: add dev migration test helper This commit was inspired by the invoices package TestMigrationWithChannelDB test helper. This test is purely for running locally to test the graph SQL migration. It allows a dev to run the migration against a local `channel.sqlite` or even `channel.db` file. --- graph/db/sql_migration_test.go | 127 +++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index e21eb2002..10e070c0f 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -4,10 +4,18 @@ package graphdb import ( "context" + "os" + "path" + "strings" "testing" + "time" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btclog/v2" "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/kvdb/sqlbase" + "github.com/lightningnetwork/lnd/kvdb/sqlite" + "github.com/lightningnetwork/lnd/sqldb" "github.com/stretchr/testify/require" ) @@ -80,3 +88,122 @@ func setUpKVStore(t *testing.T) *KVStore { return kvStore } + +// TestMigrationWithChannelDB tests the migration of the graph store from a +// bolt backed channel.db or a kvdb channel.sqlite to a SQL database. Note that +// this test does not attempt to be a complete migration test for all graph +// store types but rather is added as a tool for developers and users to debug +// graph migration issues with an actual channel.db/channel.sqlite file. +// +// NOTE: To use this test, place either of those files in the graph/db/testdata +// directory, uncomment the "Skipf" line, and set "chain" variable appropriately +// and set the "fileName" variable to the name of the channel database file you +// want to use for the migration test. +func TestMigrationWithChannelDB(t *testing.T) { + ctx := context.Background() + + // NOTE: comment this line out to run the test. + t.Skipf("skipping test meant for local debugging only") + + // NOTE: set this to the genesis hash of the chain that the store + // was created on. + chain := *chaincfg.MainNetParams.GenesisHash + + // NOTE: set this to the name of the channel database file you want + // to use for the migration test. This may be either a bbolt ".db" file + // or a SQLite ".sqlite" file. If you want to migrate from a + // bbolt channel.db file, set this to "channel.db". + const fileName = "channel.sqlite" + + // Set up logging for the test. + UseLogger(btclog.NewSLogger(btclog.NewDefaultHandler(os.Stdout))) + + // migrate runs the migration from the kvdb store to the SQL store. + migrate := func(t *testing.T, kvBackend kvdb.Backend) { + graphStore := newBatchQuerier(t) + + err := graphStore.ExecTx( + ctx, sqldb.WriteTxOpt(), func(tx SQLQueries) error { + return MigrateGraphToSQL( + ctx, kvBackend, tx, chain, + ) + }, sqldb.NoOpReset, + ) + require.NoError(t, err) + } + + connectBBolt := func(t *testing.T, dbPath string) kvdb.Backend { + cfg := &kvdb.BoltBackendConfig{ + DBPath: dbPath, + DBFileName: fileName, + NoFreelistSync: true, + AutoCompact: false, + AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, + DBTimeout: kvdb.DefaultDBTimeout, + } + + kvStore, err := kvdb.GetBoltBackend(cfg) + require.NoError(t, err) + + return kvStore + } + + connectSQLite := func(t *testing.T, dbPath string) kvdb.Backend { + const ( + timeout = 10 * time.Second + maxConns = 50 + ) + sqlbase.Init(maxConns) + + cfg := &sqlite.Config{ + Timeout: timeout, + BusyTimeout: timeout, + MaxConnections: maxConns, + } + + kvStore, err := kvdb.Open( + kvdb.SqliteBackendName, ctx, cfg, + dbPath, fileName, + // NOTE: we use the raw string here else we get an + // import cycle if we try to import lncfg.NSChannelDB. + "channeldb", + ) + require.NoError(t, err) + + return kvStore + } + + tests := []struct { + name string + dbPath string + }{ + { + name: "empty", + dbPath: t.TempDir(), + }, + { + name: "testdata", + dbPath: "testdata", + }, + } + + // Determine if we are using a SQLite file or a Bolt DB file. + var isSqlite bool + if strings.HasSuffix(fileName, ".sqlite") { + isSqlite = true + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + chanDBPath := path.Join(test.dbPath, fileName) + t.Logf("Connecting to channel DB at: %s", chanDBPath) + + connectDB := connectBBolt + if isSqlite { + connectDB = connectSQLite + } + + migrate(t, connectDB(t, test.dbPath)) + }) + } +} From 084563c5c0564497a06a574309d640f7adacdb73 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Mon, 7 Jul 2025 13:43:11 +0200 Subject: [PATCH 5/8] sqldb+invoices: create a re-usable comparion helper function So that we can use it for comparisons in other migrations too. --- go.mod | 2 +- invoices/sql_migration.go | 29 +++-------------------------- sqldb/go.mod | 4 ++-- sqldb/migrations.go | 30 ++++++++++++++++++++++++++++++ 4 files changed, 36 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 4a5beee62..1ba1309e6 100644 --- a/go.mod +++ b/go.mod @@ -140,7 +140,7 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runc v1.1.12 // indirect github.com/ory/dockertest/v3 v3.10.0 // indirect - github.com/pmezard/go-difflib v1.0.0 + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect diff --git a/invoices/sql_migration.go b/invoices/sql_migration.go index 2ee41fd41..7a0791e25 100644 --- a/invoices/sql_migration.go +++ b/invoices/sql_migration.go @@ -6,17 +6,14 @@ import ( "encoding/binary" "errors" "fmt" - "reflect" "strconv" "time" - "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/sqldb" "github.com/lightningnetwork/lnd/sqldb/sqlc" - "github.com/pmezard/go-difflib/difflib" "golang.org/x/time/rate" ) @@ -53,11 +50,6 @@ var ( // // addIndexNo => invoiceKey addIndexBucket = []byte("invoice-add-index") - - // ErrMigrationMismatch is returned when the migrated invoice does not - // match the original invoice. - ErrMigrationMismatch = fmt.Errorf("migrated invoice does not match " + - "original invoice") ) // createInvoiceHashIndex generates a hash index that contains payment hashes @@ -548,24 +540,9 @@ func migrateInvoices(ctx context.Context, tx *sqlc.Queries, // Override the add index before checking for equality. migratedInvoice.AddIndex = invoice.AddIndex - if !reflect.DeepEqual(invoice, *migratedInvoice) { - diff := difflib.UnifiedDiff{ - A: difflib.SplitLines( - spew.Sdump(invoice), - ), - B: difflib.SplitLines( - spew.Sdump(migratedInvoice), - ), - FromFile: "Expected", - FromDate: "", - ToFile: "Actual", - ToDate: "", - Context: 3, - } - diffText, _ := difflib.GetUnifiedDiffString(diff) - - return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch, - paymentHash, diffText) + err = sqldb.CompareRecords(invoice, *migratedInvoice, "invoice") + if err != nil { + return err } } diff --git a/sqldb/go.mod b/sqldb/go.mod index f1c64416f..52846e151 100644 --- a/sqldb/go.mod +++ b/sqldb/go.mod @@ -2,11 +2,13 @@ module github.com/lightningnetwork/lnd/sqldb require ( github.com/btcsuite/btclog/v2 v2.0.1-0.20250602222548-9967d19bb084 + github.com/davecgh/go-spew v1.1.1 github.com/golang-migrate/migrate/v4 v4.17.0 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 github.com/jackc/pgx/v5 v5.3.1 github.com/ory/dockertest/v3 v3.10.0 + github.com/pmezard/go-difflib v1.0.0 github.com/stretchr/testify v1.9.0 golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 modernc.org/sqlite v1.29.10 @@ -19,7 +21,6 @@ require ( github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/containerd/continuity v0.3.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/cli v20.10.17+incompatible // indirect github.com/docker/docker v24.0.7+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect @@ -45,7 +46,6 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runc v1.1.5 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/sirupsen/logrus v1.9.2 // indirect diff --git a/sqldb/migrations.go b/sqldb/migrations.go index 53d71e390..98c63734d 100644 --- a/sqldb/migrations.go +++ b/sqldb/migrations.go @@ -9,14 +9,17 @@ import ( "io" "io/fs" "net/http" + "reflect" "strings" "time" "github.com/btcsuite/btclog/v2" + "github.com/davecgh/go-spew/spew" "github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4/database" "github.com/golang-migrate/migrate/v4/source/httpfs" "github.com/lightningnetwork/lnd/sqldb/sqlc" + "github.com/pmezard/go-difflib/difflib" ) var ( @@ -71,6 +74,11 @@ var ( // user if necessary. }, }, migrationAdditions...) + + // ErrMigrationMismatch is returned when a migrated record does not + // match the original record. + ErrMigrationMismatch = fmt.Errorf("migrated record does not match " + + "original record") ) // MigrationConfig is a configuration struct that describes SQL migrations. Each @@ -472,3 +480,25 @@ func ApplyMigrations(ctx context.Context, db *BaseDB, return nil } + +// CompareRecords checks if the original and migrated objects are equal. If +// they are not, it returns an error with a unified diff of the two objects. +func CompareRecords(original, migrated any, identifier string) error { + if reflect.DeepEqual(original, migrated) { + return nil + } + + diff := difflib.UnifiedDiff{ + A: difflib.SplitLines(spew.Sdump(original)), + B: difflib.SplitLines(spew.Sdump(migrated)), + FromFile: "Expected", + FromDate: "", + ToFile: "Actual", + ToDate: "", + Context: 3, + } + diffText, _ := difflib.GetUnifiedDiffString(diff) + + return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier, + diffText) +} From 8b2f4821d4c40103d1f928e5f865ab3fb2df795e Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 4 Jul 2025 10:39:12 +0200 Subject: [PATCH 6/8] graph/db: migrate nodes to SQL store This commit expands MigrateGraphToSQL so that it migrates all the graph nodes from kvdb to SQL. The TestMigrateGraphToSQL test is updated to cover the basic LightningNode cases. A new test, `TestSQLMigrationEdgeCases`, is also added and a case is added to tests the edge case where a node exists in our kvdb store that has invalid TLV bytes. --- graph/db/sql_migration.go | 128 ++++++++++++++++- graph/db/sql_migration_test.go | 243 ++++++++++++++++++++++++++++++++- 2 files changed, 367 insertions(+), 4 deletions(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 67607bbb9..c8b362e70 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -1,13 +1,19 @@ package graphdb import ( + "cmp" "context" "errors" "fmt" + "net" + "slices" "time" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/sqldb" + "github.com/lightningnetwork/lnd/sqldb/sqlc" ) // MigrateGraphToSQL migrates the graph store from a KV backend to a SQL @@ -16,8 +22,8 @@ import ( // NOTE: this is currently not called from any code path. It is called via tests // only for now and will be called from the main lnd binary once the // migration is fully implemented and tested. -func MigrateGraphToSQL(_ context.Context, kvBackend kvdb.Backend, - _ SQLQueries, _ chainhash.Hash) error { +func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend, + sqlDB SQLQueries, _ chainhash.Hash) error { log.Infof("Starting migration of the graph store from KV to SQL") t0 := time.Now() @@ -32,6 +38,11 @@ func MigrateGraphToSQL(_ context.Context, kvBackend kvdb.Backend, return nil } + // 1) Migrate all the nodes. + if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil { + return fmt.Errorf("could not migrate nodes: %w", err) + } + log.Infof("Finished migration of the graph store from KV to SQL in %v", time.Since(t0)) @@ -60,3 +71,116 @@ func checkGraphExists(db kvdb.Backend) (bool, error) { return true, nil } + +// migrateNodes migrates all nodes from the KV backend to the SQL database. +// This includes doing a sanity check after each migration to ensure that the +// migrated node matches the original node. +func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, + sqlDB SQLQueries) error { + + // Keep track of the number of nodes migrated and the number of + // nodes skipped due to errors. + var ( + count uint64 + skipped uint64 + ) + + // Loop through each node in the KV store and insert it into the SQL + // database. + err := forEachNode(kvBackend, func(_ kvdb.RTx, + node *models.LightningNode) error { + + pub := node.PubKeyBytes + + // Sanity check to ensure that the node has valid extra opaque + // data. If it does not, we'll skip it. We need to do this + // because previously we would just persist any TLV bytes that + // we received without validating them. Now, however, we + // normalise the storage of extra opaque data, so we need to + // ensure that the data is valid. We don't want to abort the + // migration if we encounter a node with invalid extra opaque + // data, so we'll just skip it and log a warning. + _, err := marshalExtraOpaqueData(node.ExtraOpaqueData) + if errors.Is(err, ErrParsingExtraTLVBytes) { + skipped++ + log.Warnf("Skipping migration of node %x with invalid "+ + "extra opaque data: %v", pub, + node.ExtraOpaqueData) + + return nil + } else if err != nil { + return fmt.Errorf("unable to marshal extra "+ + "opaque data for node %x: %w", pub, err) + } + + count++ + + // TODO(elle): At this point, we should check the loaded node + // to see if we should extract any DNS addresses from its + // opaque type addresses. This is expected to be done in: + // https://github.com/lightningnetwork/lnd/pull/9455. + // This TODO is being tracked in + // https://github.com/lightningnetwork/lnd/issues/9795 as this + // must be addressed before making this code path active in + // production. + + // Write the node to the SQL database. + id, err := upsertNode(ctx, sqlDB, node) + if err != nil { + return fmt.Errorf("could not persist node(%x): %w", pub, + err) + } + + // Fetch it from the SQL store and compare it against the + // original node object to ensure the migration was successful. + dbNode, err := sqlDB.GetNodeByPubKey( + ctx, sqlc.GetNodeByPubKeyParams{ + PubKey: node.PubKeyBytes[:], + Version: int16(ProtocolV1), + }, + ) + if err != nil { + return fmt.Errorf("could not get node by pubkey (%x)"+ + "after migration: %w", pub, err) + } + + // Sanity check: ensure the migrated node ID matches the one we + // just inserted. + if dbNode.ID != id { + return fmt.Errorf("node ID mismatch for node (%x) "+ + "after migration: expected %d, got %d", + pub, id, dbNode.ID) + } + + migratedNode, err := buildNode(ctx, sqlDB, &dbNode) + if err != nil { + return fmt.Errorf("could not build migrated node "+ + "from dbNode(db id: %d, node pub: %x): %w", + dbNode.ID, pub, err) + } + + // Make sure that the node addresses are sorted before + // comparing them to ensure that the order of addresses does + // not affect the comparison. + slices.SortFunc(node.Addresses, func(i, j net.Addr) int { + return cmp.Compare(i.String(), j.String()) + }) + slices.SortFunc( + migratedNode.Addresses, func(i, j net.Addr) int { + return cmp.Compare(i.String(), j.String()) + }, + ) + + return sqldb.CompareRecords( + node, migratedNode, fmt.Sprintf("node %x", pub), + ) + }) + if err != nil { + return fmt.Errorf("could not migrate nodes: %w", err) + } + + log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+ + "invalid TLV streams)", count, skipped) + + return nil +} diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index 10e070c0f..542e2996d 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -3,23 +3,39 @@ package graphdb import ( + "bytes" "context" + "fmt" + "image/color" + "net" "os" "path" + "slices" "strings" "testing" "time" + "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb/sqlbase" "github.com/lightningnetwork/lnd/kvdb/sqlite" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/sqldb" "github.com/stretchr/testify/require" ) -var testChain = *chaincfg.MainNetParams.GenesisHash +var ( + testChain = *chaincfg.MainNetParams.GenesisHash + testColor = color.RGBA{R: 1, G: 2, B: 3} + testTime = time.Unix(11111, 0) + testSigBytes = testSig.Serialize() + testExtraData = []byte{1, 1, 1, 2, 2, 2, 2} + testEmptyFeatures = lnwire.EmptyFeatureVector() +) // TestMigrateGraphToSQL tests various deterministic cases that we want to test // for to ensure that our migration from a graph store backed by a KV DB to a @@ -29,6 +45,19 @@ func TestMigrateGraphToSQL(t *testing.T) { t.Parallel() ctx := context.Background() + writeUpdate := func(t *testing.T, db *KVStore, object any) { + t.Helper() + + var err error + switch obj := object.(type) { + case *models.LightningNode: + err = db.AddLightningNode(ctx, obj) + default: + err = fmt.Errorf("unhandled object type: %T", obj) + } + require.NoError(t, err) + } + tests := []struct { name string write func(t *testing.T, db *KVStore, object any) @@ -38,6 +67,44 @@ func TestMigrateGraphToSQL(t *testing.T) { { name: "empty", }, + { + name: "nodes", + write: writeUpdate, + //nolint:ll + objects: []any{ + // Normal node with all fields. + makeTestNode(t), + // A node with no node announcement. + makeTestShellNode(t), + // A node with an announcement but no addresses. + makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = nil + }), + // A node with all types of addresses. + makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = []net.Addr{ + testAddr, + testIPV4Addr, + testIPV6Addr, + anotherAddr, + testOnionV2Addr, + testOnionV3Addr, + testOpaqueAddr, + } + }), + // No extra opaque data. + makeTestNode(t, func(n *models.LightningNode) { + n.ExtraOpaqueData = nil + }), + // A node with no features. + makeTestNode(t, func(n *models.LightningNode) { + n.Features = lnwire.EmptyFeatureVector() + }), + }, + expGraphStats: graphStats{ + numNodes: 6, + }, + }, } for _, test := range tests { @@ -70,11 +137,45 @@ func TestMigrateGraphToSQL(t *testing.T) { // graphStats holds expected statistics about the graph after migration. type graphStats struct { + numNodes int } // assertInSync checks that the KVStore and SQLStore both contain the same // graph data after migration. -func assertInSync(_ *testing.T, _ *KVStore, _ *SQLStore, stats graphStats) { +func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore, + stats graphStats) { + + // 1) Compare the nodes in the two stores. + sqlNodes := fetchAllNodes(t, sqlDB) + require.Len(t, sqlNodes, stats.numNodes) + require.Equal(t, fetchAllNodes(t, kvDB), sqlNodes) +} + +// fetchAllNodes retrieves all nodes from the given store and returns them +// sorted by their public key. +func fetchAllNodes(t *testing.T, store V1Store) []*models.LightningNode { + nodes := make([]*models.LightningNode, 0) + + err := store.ForEachNode(func(tx NodeRTx) error { + node := tx.Node() + + // Call PubKey to ensure the objects cached pubkey is set so that + // the objects can be compared as a whole. + _, err := node.PubKey() + require.NoError(t, err) + + nodes = append(nodes, node) + + return nil + }) + require.NoError(t, err) + + // Sort the nodes by their public key to ensure a consistent order. + slices.SortFunc(nodes, func(i, j *models.LightningNode) int { + return bytes.Compare(i.PubKeyBytes[:], j.PubKeyBytes[:]) + }) + + return nodes } // setUpKVStore initializes a new KVStore for testing. @@ -89,6 +190,72 @@ func setUpKVStore(t *testing.T) *KVStore { return kvStore } +// genPubKey generates a new public key for testing purposes. +func genPubKey(t *testing.T) route.Vertex { + key, err := btcec.NewPrivateKey() + require.NoError(t, err) + + var pub route.Vertex + copy(pub[:], key.PubKey().SerializeCompressed()) + + return pub +} + +// testNodeOpt defines a functional option type that can be used to +// modify the attributes of a models.LightningNode crated by makeTestNode. +type testNodeOpt func(*models.LightningNode) + +// makeTestNode can be used to create a test models.LightningNode. The +// functional options can be used to modify the node's attributes. +func makeTestNode(t *testing.T, opts ...testNodeOpt) *models.LightningNode { + n := &models.LightningNode{ + HaveNodeAnnouncement: true, + AuthSigBytes: testSigBytes, + LastUpdate: testTime, + Color: testColor, + Alias: "kek", + Features: testFeatures, + Addresses: testAddrs, + ExtraOpaqueData: testExtraData, + PubKeyBytes: genPubKey(t), + } + + for _, opt := range opts { + opt(n) + } + + // We call this method so that the internal pubkey field is populated + // which then lets us to proper struct comparison later on. + _, err := n.PubKey() + require.NoError(t, err) + + return n +} + +// makeTestShellNode creates a minimal models.LightningNode +// that only contains the public key and no other attributes. +func makeTestShellNode(t *testing.T, + opts ...testNodeOpt) *models.LightningNode { + + n := &models.LightningNode{ + HaveNodeAnnouncement: false, + PubKeyBytes: genPubKey(t), + Features: testEmptyFeatures, + LastUpdate: time.Unix(0, 0), + } + + for _, opt := range opts { + opt(n) + } + + // We call this method so that the internal pubkey field is populated + // which then lets us to proper struct comparison later on. + _, err := n.PubKey() + require.NoError(t, err) + + return n +} + // TestMigrationWithChannelDB tests the migration of the graph store from a // bolt backed channel.db or a kvdb channel.sqlite to a SQL database. Note that // this test does not attempt to be a complete migration test for all graph @@ -207,3 +374,75 @@ func TestMigrationWithChannelDB(t *testing.T) { }) } } + +// TestSQLMigrationEdgeCases tests various edge cases where the migration will +// still be successful but the final states of the KVStore and SQLStore +// will differ slightly. +func TestSQLMigrationEdgeCases(t *testing.T) { + t.Parallel() + ctx := context.Background() + + var invalidTLVData = []byte{0x01, 0x02, 0x03} + + // Here, we test that in the case where the KV store contains a node + // with invalid TLV data, the migration will still succeed, but the + // node will not end up in the SQL store. + t.Run("node with bad tlv data", func(t *testing.T) { + // Make one valid node and one node with invalid TLV data. + n1 := makeTestNode(t) + n2 := makeTestNode(t, func(n *models.LightningNode) { + n.ExtraOpaqueData = invalidTLVData + }) + + populateKV := func(t *testing.T, db *KVStore) { + // Insert both nodes into the KV store. + require.NoError(t, db.AddLightningNode(ctx, n1)) + require.NoError(t, db.AddLightningNode(ctx, n2)) + } + + runTestMigration(t, populateKV, dbState{ + // We expect only the valid node to be present in the + // SQL db. + nodes: []*models.LightningNode{n1}, + }) + }) +} + +// runTestMigration is a helper function that sets up the KVStore and SQLStore, +// populates the KVStore with the provided call-back, runs the migration, and +// asserts that the SQLStore contains the expected state. +func runTestMigration(t *testing.T, populateKV func(t *testing.T, db *KVStore), + expState dbState) { + + ctx := context.Background() + + // Set up our source kvdb DB. + kvDB := setUpKVStore(t) + + // Set up our destination SQL DB. + sql, ok := NewTestDB(t).(*SQLStore) + require.True(t, ok) + + // Populate the kvdb store with the test data. + populateKV(t, kvDB) + + // Run the migration. + err := MigrateGraphToSQL( + ctx, kvDB.db, sql.db, testChain, + ) + require.NoError(t, err) + + assertResultState(t, sql, expState) +} + +// dbState describes the expected state of the SQLStore after a migration. +type dbState struct { + nodes []*models.LightningNode +} + +// assertResultState asserts that the SQLStore contains the expected +// state after a migration. +func assertResultState(t *testing.T, sql *SQLStore, expState dbState) { + // Assert that the sql store contains the expected nodes. + require.ElementsMatch(t, expState.nodes, fetchAllNodes(t, sql)) +} From 9657707175ce306ef9f3adae6f011d9f36c6b324 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 4 Jul 2025 10:54:44 +0200 Subject: [PATCH 7/8] graph/db: migrate source node to SQL store --- graph/db/sql_migration.go | 77 ++++++++++++++++++++++++++++++++++ graph/db/sql_migration_test.go | 39 ++++++++++++++++- 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index c8b362e70..a70dd62d0 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -43,6 +43,11 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend, return fmt.Errorf("could not migrate nodes: %w", err) } + // 2) Migrate the source node. + if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil { + return fmt.Errorf("could not migrate source node: %w", err) + } + log.Infof("Finished migration of the graph store from KV to SQL in %v", time.Since(t0)) @@ -184,3 +189,75 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, return nil } + +// migrateSourceNode migrates the source node from the KV backend to the +// SQL database. +func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend, + sqlDB SQLQueries) error { + + sourceNode, err := sourceNode(kvdb) + if errors.Is(err, ErrSourceNodeNotSet) { + // If the source node has not been set yet, we can skip this + // migration step. + return nil + } else if err != nil { + return fmt.Errorf("could not get source node from kv "+ + "store: %w", err) + } + + pub := sourceNode.PubKeyBytes + + // Get the DB ID of the source node by its public key. This node must + // already exist in the SQL database, as it should have been migrated + // in the previous node-migration step. + id, err := sqlDB.GetNodeIDByPubKey( + ctx, sqlc.GetNodeIDByPubKeyParams{ + PubKey: pub[:], + Version: int16(ProtocolV1), + }, + ) + if err != nil { + return fmt.Errorf("could not get source node ID: %w", err) + } + + // Now we can add the source node to the SQL database. + err = sqlDB.AddSourceNode(ctx, id) + if err != nil { + return fmt.Errorf("could not add source node to SQL store: %w", + err) + } + + // Verify that the source node was added correctly by fetching it back + // from the SQL database and checking that the expected DB ID and + // pub key are returned. We don't need to do a whole node comparison + // here, as this was already done in the previous migration step. + srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1)) + if err != nil { + return fmt.Errorf("could not get source nodes from SQL "+ + "store: %w", err) + } + + // The SQL store has support for multiple source nodes (for future + // protocol versions) but this migration is purely aimed at the V1 + // store, and so we expect exactly one source node to be present. + if len(srcNodes) != 1 { + return fmt.Errorf("expected exactly one source node, "+ + "got %d", len(srcNodes)) + } + + // Check that the source node ID and pub key match the original + // source node. + if srcNodes[0].NodeID != id { + return fmt.Errorf("source node ID mismatch after migration: "+ + "expected %d, got %d", id, srcNodes[0].NodeID) + } + err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node") + if err != nil { + return fmt.Errorf("source node pubkey mismatch after "+ + "migration: %w", err) + } + + log.Infof("Migrated source node with pubkey %x to SQL", pub[:]) + + return nil +} diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index 542e2996d..8e0df1c90 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -5,6 +5,7 @@ package graphdb import ( "bytes" "context" + "errors" "fmt" "image/color" "net" @@ -105,6 +106,23 @@ func TestMigrateGraphToSQL(t *testing.T) { numNodes: 6, }, }, + { + name: "source node", + write: func(t *testing.T, db *KVStore, object any) { + node, ok := object.(*models.LightningNode) + require.True(t, ok) + + err := db.SetSourceNode(ctx, node) + require.NoError(t, err) + }, + objects: []any{ + makeTestNode(t), + }, + expGraphStats: graphStats{ + numNodes: 1, + srcNodeSet: true, + }, + }, } for _, test := range tests { @@ -137,7 +155,8 @@ func TestMigrateGraphToSQL(t *testing.T) { // graphStats holds expected statistics about the graph after migration. type graphStats struct { - numNodes int + numNodes int + srcNodeSet bool } // assertInSync checks that the KVStore and SQLStore both contain the same @@ -149,6 +168,12 @@ func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore, sqlNodes := fetchAllNodes(t, sqlDB) require.Len(t, sqlNodes, stats.numNodes) require.Equal(t, fetchAllNodes(t, kvDB), sqlNodes) + + // 2) Check that the source nodes match (if indeed source nodes have + // been set). + sqlSourceNode := fetchSourceNode(t, sqlDB) + require.Equal(t, stats.srcNodeSet, sqlSourceNode != nil) + require.Equal(t, fetchSourceNode(t, kvDB), sqlSourceNode) } // fetchAllNodes retrieves all nodes from the given store and returns them @@ -178,6 +203,18 @@ func fetchAllNodes(t *testing.T, store V1Store) []*models.LightningNode { return nodes } +// fetchSourceNode retrieves the source node from the given store. +func fetchSourceNode(t *testing.T, store V1Store) *models.LightningNode { + node, err := store.SourceNode(context.Background()) + if errors.Is(err, ErrSourceNodeNotSet) { + return nil + } else { + require.NoError(t, err) + } + + return node +} + // setUpKVStore initializes a new KVStore for testing. func setUpKVStore(t *testing.T) *KVStore { kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph") From cb959bddb0e589152dcbfcf643d89ce7babba890 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 4 Jul 2025 14:21:38 +0200 Subject: [PATCH 8/8] docs: add release note --- docs/release-notes/release-notes-0.20.0.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/release-notes/release-notes-0.20.0.md b/docs/release-notes/release-notes-0.20.0.md index ee36d7c8f..0db6fa11e 100644 --- a/docs/release-notes/release-notes-0.20.0.md +++ b/docs/release-notes/release-notes-0.20.0.md @@ -92,6 +92,8 @@ circuit. The indices are only available for forwarding events saved after v0.20. * [9](https://github.com/lightningnetwork/lnd/pull/9939) * [10](https://github.com/lightningnetwork/lnd/pull/9971) * [11](https://github.com/lightningnetwork/lnd/pull/9972) + * Add graph SQL migration logic: + * [1](https://github.com/lightningnetwork/lnd/pull/10036) ## RPC Updates * Previously the `RoutingPolicy` would return the inbound fee record in its