diff --git a/go.mod b/go.mod index ba53873e1..c660cbb5a 100644 --- a/go.mod +++ b/go.mod @@ -138,7 +138,7 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runc v1.1.12 // indirect github.com/ory/dockertest/v3 v3.10.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.0 github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect diff --git a/invoices/kv_sql_migration_test.go b/invoices/kv_sql_migration_test.go new file mode 100644 index 000000000..5d36b5c64 --- /dev/null +++ b/invoices/kv_sql_migration_test.go @@ -0,0 +1,146 @@ +package invoices_test + +import ( + "context" + "database/sql" + "testing" + "time" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/clock" + invpkg "github.com/lightningnetwork/lnd/invoices" + "github.com/lightningnetwork/lnd/sqldb" + "github.com/lightningnetwork/lnd/sqldb/sqlc" + "github.com/stretchr/testify/require" +) + +// TestMigrationWithChannelDB tests the migration of invoices from a bolt backed +// channel.db to a SQL database. Note that this test does not attempt to be a +// complete migration test for all invoice types but rather is added as a tool +// for developers and users to debug invoice migration issues with an actual +// channel.db file. +func TestMigrationWithChannelDB(t *testing.T) { + // First create a shared Postgres instance so we don't spawn a new + // docker container for each test. + pgFixture := sqldb.NewTestPgFixture( + t, sqldb.DefaultPostgresFixtureLifetime, + ) + t.Cleanup(func() { + pgFixture.TearDown(t) + }) + + makeSQLDB := func(t *testing.T, sqlite bool) (*invpkg.SQLStore, + *sqldb.TransactionExecutor[*sqlc.Queries]) { + + var db *sqldb.BaseDB + if sqlite { + db = sqldb.NewTestSqliteDB(t).BaseDB + } else { + db = sqldb.NewTestPostgresDB(t, pgFixture).BaseDB + } + + invoiceExecutor := sqldb.NewTransactionExecutor( + db, func(tx *sql.Tx) invpkg.SQLInvoiceQueries { + return db.WithTx(tx) + }, + ) + + genericExecutor := sqldb.NewTransactionExecutor( + db, func(tx *sql.Tx) *sqlc.Queries { + return db.WithTx(tx) + }, + ) + + testClock := clock.NewTestClock(time.Unix(1, 0)) + + return invpkg.NewSQLStore(invoiceExecutor, testClock), + genericExecutor + } + + migrationTest := func(t *testing.T, kvStore *channeldb.DB, + sqlite bool) { + + sqlInvoiceStore, sqlStore := makeSQLDB(t, sqlite) + ctxb := context.Background() + + const batchSize = 11 + var opts sqldb.MigrationTxOptions + err := sqlStore.ExecTx( + ctxb, &opts, func(tx *sqlc.Queries) error { + return invpkg.MigrateInvoicesToSQL( + ctxb, kvStore.Backend, kvStore, tx, + batchSize, + ) + }, func() {}, + ) + require.NoError(t, err) + + // MigrateInvoices will check if the inserted invoice equals to + // the migrated one, but as a sanity check, we'll also fetch the + // invoices from the store and compare them to the original + // invoices. + query := invpkg.InvoiceQuery{ + IndexOffset: 0, + // As a sanity check, fetch more invoices than we have + // to ensure that we did not add any extra invoices. + // Note that we don't really have a way to know the + // exact number of invoices in the bolt db without first + // iterating over all of them, but for test purposes + // constant should be enough. + NumMaxInvoices: 9999, + } + result1, err := kvStore.QueryInvoices(ctxb, query) + require.NoError(t, err) + numInvoices := len(result1.Invoices) + + result2, err := sqlInvoiceStore.QueryInvoices(ctxb, query) + require.NoError(t, err) + require.Equal(t, numInvoices, len(result2.Invoices)) + + // Simply zero out the add index so we don't fail on that when + // comparing. + for i := 0; i < numInvoices; i++ { + result1.Invoices[i].AddIndex = 0 + result2.Invoices[i].AddIndex = 0 + + // We need to override the timezone of the invoices as + // the provided DB vs the test runners local time zone + // might be different. + invpkg.OverrideInvoiceTimeZone(&result1.Invoices[i]) + invpkg.OverrideInvoiceTimeZone(&result2.Invoices[i]) + + require.Equal( + t, result1.Invoices[i], result2.Invoices[i], + ) + } + } + + tests := []struct { + name string + dbPath string + }{ + { + "empty", + t.TempDir(), + }, + { + "testdata", + "testdata", + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + store := channeldb.OpenForTesting(t, test.dbPath) + + t.Run("Postgres", func(t *testing.T) { + migrationTest(t, store, false) + }) + + t.Run("SQLite", func(t *testing.T) { + migrationTest(t, store, true) + }) + }) + } +} diff --git a/invoices/sql_migration.go b/invoices/sql_migration.go index 47ee3e329..af0e4865f 100644 --- a/invoices/sql_migration.go +++ b/invoices/sql_migration.go @@ -4,15 +4,19 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" + "reflect" "strconv" "time" + "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/sqldb" "github.com/lightningnetwork/lnd/sqldb/sqlc" + "github.com/pmezard/go-difflib/difflib" ) var ( @@ -48,6 +52,11 @@ var ( // // addIndexNo => invoiceKey addIndexBucket = []byte("invoice-add-index") + + // ErrMigrationMismatch is returned when the migrated invoice does not + // match the original invoice. + ErrMigrationMismatch = fmt.Errorf("migrated invoice does not match " + + "original invoice") ) // createInvoiceHashIndex generates a hash index that contains payment hashes @@ -60,7 +69,7 @@ var ( // a new index in the SQL database that maps each invoice key to its // corresponding payment hash. func createInvoiceHashIndex(ctx context.Context, db kvdb.Backend, - tx SQLInvoiceQueries) error { + tx *sqlc.Queries) error { return db.View(func(kvTx kvdb.RTx) error { invoices := kvTx.ReadBucket(invoiceBucket) @@ -399,3 +408,151 @@ func OverrideInvoiceTimeZone(invoice *Invoice) { } } } + +// MigrateInvoicesToSQL runs the migration of all invoices from the KV database +// to the SQL database. The migration is done in a single transaction to ensure +// that all invoices are migrated or none at all. This function can be run +// multiple times without causing any issues as it will check if the migration +// has already been performed. +func MigrateInvoicesToSQL(ctx context.Context, db kvdb.Backend, + kvStore InvoiceDB, tx *sqlc.Queries, batchSize int) error { + + log.Infof("Starting migration of invoices from KV to SQL") + + offset := uint64(0) + t0 := time.Now() + + // Create the hash index which we will use to look up invoice + // payment hashes by their add index during migration. + err := createInvoiceHashIndex(ctx, db, tx) + if err != nil && !errors.Is(err, ErrNoInvoicesCreated) { + log.Errorf("Unable to create invoice hash index: %v", + err) + + return err + } + log.Debugf("Created SQL invoice hash index in %v", time.Since(t0)) + + total := 0 + // Now we can start migrating the invoices. We'll do this in + // batches to reduce memory usage. + for { + t0 = time.Now() + query := InvoiceQuery{ + IndexOffset: offset, + NumMaxInvoices: uint64(batchSize), + } + + queryResult, err := kvStore.QueryInvoices(ctx, query) + if err != nil && !errors.Is(err, ErrNoInvoicesCreated) { + return fmt.Errorf("unable to query invoices: "+ + "%w", err) + } + + if len(queryResult.Invoices) == 0 { + log.Infof("All invoices migrated") + + break + } + + err = migrateInvoices(ctx, tx, queryResult.Invoices) + if err != nil { + return err + } + + offset = queryResult.LastIndexOffset + total += len(queryResult.Invoices) + log.Debugf("Migrated %d KV invoices to SQL in %v\n", total, + time.Since(t0)) + } + + // Clean up the hash index as it's no longer needed. + err = tx.ClearKVInvoiceHashIndex(ctx) + if err != nil { + return fmt.Errorf("unable to clear invoice hash "+ + "index: %w", err) + } + + log.Infof("Migration of %d invoices from KV to SQL completed", total) + + return nil +} + +func migrateInvoices(ctx context.Context, tx *sqlc.Queries, + invoices []Invoice) error { + + for i, invoice := range invoices { + var paymentHash lntypes.Hash + if invoice.Terms.PaymentPreimage != nil { + paymentHash = invoice.Terms.PaymentPreimage.Hash() + } else { + paymentHashBytes, err := + tx.GetKVInvoicePaymentHashByAddIndex( + ctx, int64(invoice.AddIndex), + ) + if err != nil { + // This would be an unexpected inconsistency + // in the kv database. We can't do much here + // so we'll notify the user and continue. + log.Warnf("Cannot migrate invoice, unable to "+ + "fetch payment hash (add_index=%v): %v", + invoice.AddIndex, err) + + continue + } + + copy(paymentHash[:], paymentHashBytes) + } + + err := MigrateSingleInvoice(ctx, tx, &invoices[i], paymentHash) + if err != nil { + return fmt.Errorf("unable to migrate invoice(%v): %w", + paymentHash, err) + } + + migratedInvoice, err := fetchInvoice( + ctx, tx, InvoiceRefByHash(paymentHash), + ) + if err != nil { + return fmt.Errorf("unable to fetch migrated "+ + "invoice(%v): %w", paymentHash, err) + } + + // Override the time zone for comparison. Note that we need to + // override both invoices as the original invoice is coming from + // KV database, it was stored as a binary serialized Go + // time.Time value which has nanosecond precision but might have + // been created in a different time zone. The migrated invoice + // is stored in SQL in UTC and selected in the local time zone, + // however in PostgreSQL it has microsecond precision while in + // SQLite it has nanosecond precision if using TEXT storage + // class. + OverrideInvoiceTimeZone(&invoice) + OverrideInvoiceTimeZone(migratedInvoice) + + // Override the add index before checking for equality. + migratedInvoice.AddIndex = invoice.AddIndex + + if !reflect.DeepEqual(invoice, *migratedInvoice) { + diff := difflib.UnifiedDiff{ + A: difflib.SplitLines( + spew.Sdump(invoice), + ), + B: difflib.SplitLines( + spew.Sdump(migratedInvoice), + ), + FromFile: "Expected", + FromDate: "", + ToFile: "Actual", + ToDate: "", + Context: 3, + } + diffText, _ := difflib.GetUnifiedDiffString(diff) + + return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch, + paymentHash, diffText) + } + } + + return nil +} diff --git a/invoices/sql_store.go b/invoices/sql_store.go index c9ffcc44c..8a819e5ba 100644 --- a/invoices/sql_store.go +++ b/invoices/sql_store.go @@ -138,6 +138,8 @@ type SQLInvoiceQueries interface { //nolint:interfacebloat GetKVInvoicePaymentHashByAddIndex(ctx context.Context, addIndex int64) ( []byte, error) + + ClearKVInvoiceHashIndex(ctx context.Context) error } var _ InvoiceDB = (*SQLStore)(nil) @@ -354,8 +356,8 @@ func (i *SQLStore) AddInvoice(ctx context.Context, // fetchInvoice fetches the common invoice data and the AMP state for the // invoice with the given reference. -func (i *SQLStore) fetchInvoice(ctx context.Context, - db SQLInvoiceQueries, ref InvoiceRef) (*Invoice, error) { +func fetchInvoice(ctx context.Context, db SQLInvoiceQueries, + ref InvoiceRef) (*Invoice, error) { if ref.PayHash() == nil && ref.PayAddr() == nil && ref.SetID() == nil { return nil, ErrInvoiceNotFound @@ -686,7 +688,7 @@ func (i *SQLStore) LookupInvoice(ctx context.Context, readTxOpt := NewSQLInvoiceQueryReadTx() txErr := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { - invoice, err = i.fetchInvoice(ctx, db, ref) + invoice, err = fetchInvoice(ctx, db, ref) return err }, func() {}) @@ -1387,7 +1389,7 @@ func (i *SQLStore) UpdateInvoice(ctx context.Context, ref InvoiceRef, ref.refModifier = HtlcSetOnlyModifier } - invoice, err := i.fetchInvoice(ctx, db, ref) + invoice, err := fetchInvoice(ctx, db, ref) if err != nil { return err } diff --git a/invoices/testdata/channel.db b/invoices/testdata/channel.db new file mode 100644 index 000000000..78741eae9 Binary files /dev/null and b/invoices/testdata/channel.db differ