From b391503ddfbf4dd1b5c962aa0b665c53dbb7feb1 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 12 Jul 2022 20:25:49 +0800 Subject: [PATCH] migration30: validate migration results before deleting old buckets This commit adds a new method `validateMigration` to be used prior to the deletion of the old buckets to be extraly cautious. --- channeldb/migration30/migration.go | 103 ++++++++++++++++++++++++ channeldb/migration30/migration_test.go | 76 ++++++++++++++++- channeldb/migration30/test_utils.go | 53 ++++++++++-- 3 files changed, 222 insertions(+), 10 deletions(-) diff --git a/channeldb/migration30/migration.go b/channeldb/migration30/migration.go index afc1686a6..fc551ac2a 100644 --- a/channeldb/migration30/migration.go +++ b/channeldb/migration30/migration.go @@ -92,6 +92,15 @@ func MigrateRevocationLog(db kvdb.Backend) error { progress, total-migrated) } + // Before we can safety delete the old buckets, we perform a check to + // make sure the logs are migrated as expected. + err = kvdb.Update(db, validateMigration, func() {}) + if err != nil { + return fmt.Errorf("validate migration failed: %v", err) + } + + log.Info("Migration check passed, now deleting the old logs...") + // Once the migration completes, we can now safety delete the old // revocation logs. if err := deleteOldBuckets(db); err != nil { @@ -551,3 +560,97 @@ func convertRevocationLog(commit *mig.ChannelCommitment, return rl, nil } + +// validateMigration checks that the data saved in the new buckets match those +// saved in the old buckets. It does so by checking the last keys saved in both +// buckets can match, given the assumption that the `CommitHeight` is +// monotonically increased value so the last key represents the total number of +// records saved. +func validateMigration(tx kvdb.RwTx) error { + openChanBucket := tx.ReadWriteBucket(openChannelBucket) + + // If no bucket is found, we can exit early. + if openChanBucket == nil { + return nil + } + + // exitWithErr is a helper closure that prepends an error message with + // the locator info. + exitWithErr := func(l *updateLocator, msg string) error { + return fmt.Errorf("unmatched records found under : %v", l.nodePub, + l.chainHash, l.fundingOutpoint, msg) + } + + // cb is the callback function to be used when iterating the buckets. + cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error { + // Read both the old and new revocation log buckets. + oldBucket := chanBucket.NestedReadBucket( + revocationLogBucketDeprecated, + ) + newBucket := chanBucket.NestedReadBucket(revocationLogBucket) + + // Exit early if the old bucket is nil. + // + // NOTE: the new bucket may not be nil here as new logs might + // have been created using lnd@v0.15.0. + if oldBucket == nil { + return nil + } + + // Return an error if the expected new bucket cannot be found. + if newBucket == nil { + return exitWithErr(l, "expected new bucket") + } + + // Acquire the cursors. + oldCursor := oldBucket.ReadCursor() + newCursor := newBucket.ReadCursor() + + // Jump to the end of the cursors to do a quick check. + newKey, _ := oldCursor.Last() + oldKey, _ := newCursor.Last() + + // We expected the CommitHeights to be matched for nodes prior + // to v0.15.0. + if bytes.Equal(newKey, oldKey) { + return nil + } + + // If the keys do not match, it's likely the node is running + // v0.15.0 and have new logs created. In this case, we will + // validate that every record in the old bucket can be found in + // the new bucket. + oldKey, _ = oldCursor.First() + + for { + // Try to locate the old key in the new bucket and we + // expect it to be found. + newKey, _ := newCursor.Seek(oldKey) + + // If the old key is not found in the new bucket, + // return an error. + // + // NOTE: because Seek will return the next key when the + // passed key cannot be found, we need to compare the + // keys to deicde whether the old key is found or not. + if !bytes.Equal(newKey, oldKey) { + errMsg := fmt.Sprintf("old bucket has "+ + "CommitHeight=%v cannot be found in "+ + "new bucket", oldKey) + return exitWithErr(l, errMsg) + } + + // Otherwise, keep iterating the old bucket. + oldKey, _ = oldCursor.Next() + + // If we've done iterating, all keys have been matched + // and we can safely exit. + if oldKey == nil { + return nil + } + } + } + + return iterateBuckets(openChanBucket, nil, cb) +} diff --git a/channeldb/migration30/migration_test.go b/channeldb/migration30/migration_test.go index 472ba373b..573598ec3 100644 --- a/channeldb/migration30/migration_test.go +++ b/channeldb/migration30/migration_test.go @@ -116,6 +116,76 @@ func TestMigrateRevocationLog(t *testing.T) { } } +// TestValidateMigration checks that the function `validateMigration` behaves +// as expected. +func TestValidateMigration(t *testing.T) { + c := createTestChannel(nil) + + testCases := []struct { + name string + setup func(db kvdb.Backend) error + expectFail bool + }{ + { + // Finished prior to v0.15.0. + name: "valid migration", + setup: func(db kvdb.Backend) error { + return createFinished(db, c, true) + }, + expectFail: false, + }, + { + // Finished after to v0.15.0. + name: "valid migration after v0.15.0", + setup: func(db kvdb.Backend) error { + return createFinished(db, c, false) + }, + expectFail: false, + }, + { + // Missing logs prior to v0.15.0. + name: "invalid migration", + setup: func(db kvdb.Backend) error { + return createNotFinished(db, c, true) + }, + expectFail: true, + }, + { + // Missing logs after to v0.15.0. + name: "invalid migration after v0.15.0", + setup: func(db kvdb.Backend) error { + return createNotFinished(db, c, false) + }, + expectFail: true, + }, + } + + for _, tc := range testCases { + tc := tc + + // Create a test db. + cdb, cleanUp, err := migtest.MakeDB() + defer cleanUp() + require.NoError(t, err, "failed to create test db") + + t.Run(tc.name, func(t *testing.T) { + // Setup test logs. + err := tc.setup(cdb) + require.NoError(t, err, "failed to setup") + + // Call the actual function and check the error is + // returned as expected. + err = kvdb.Update(cdb, validateMigration, func() {}) + + if tc.expectFail { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + // createTwoChannels creates two channels that have the same chainHash and // IdentityPub, simulating having two channels under the same peer. func createTwoChannels() (*mig26.OpenChannel, *mig26.OpenChannel) { @@ -186,7 +256,7 @@ func buildChannelCases(c *mig26.OpenChannel, case3 := &channelTestCase{ name: "finished migration", setup: func(db kvdb.Backend) error { - return createFinished(db, c) + return createFinished(db, c, true) }, asserter: func(t *testing.T, db kvdb.Backend) { // Check that the old bucket is removed. @@ -227,7 +297,7 @@ func buildChannelCases(c *mig26.OpenChannel, case4 := &channelTestCase{ name: "unfinished migration", setup: func(db kvdb.Backend) error { - return createNotFinished(db, c) + return createNotFinished(db, c, true) }, asserter: func(t *testing.T, db kvdb.Backend) { // Check that the old bucket is removed. @@ -259,7 +329,7 @@ func buildChannelCases(c *mig26.OpenChannel, case5 := &channelTestCase{ name: "initial migration", setup: func(db kvdb.Backend) error { - return createNotStarted(db, c) + return createNotStarted(db, c, true) }, asserter: assertNewLogs, unfinished: true, diff --git a/channeldb/migration30/test_utils.go b/channeldb/migration30/test_utils.go index 77d6f0198..c964dfb75 100644 --- a/channeldb/migration30/test_utils.go +++ b/channeldb/migration30/test_utils.go @@ -176,6 +176,19 @@ var ( }, } + // newLog3 defines an revocation log that's been created after v0.15.0. + newLog3 = mig.ChannelCommitment{ + CommitHeight: logHeight2 + 1, + LocalLogIndex: 1, + LocalHtlcIndex: 1, + RemoteLogIndex: 0, + RemoteHtlcIndex: 0, + LocalBalance: lnwire.MilliSatoshi(888_800_000), + RemoteBalance: 0, + CommitTx: commitTx2, + Htlcs: []mig.HTLC{htlc}, + } + // The following public keys are taken from the itest results. localMusigKey, _ = btcec.ParsePubKey([]byte{ 0x2, @@ -489,27 +502,53 @@ func createTestStore() (shachain.Store, error) { } // createNotStarted will setup a situation where we haven't started the -// migration for the channel. -func createNotStarted(cdb kvdb.Backend, c *mig26.OpenChannel) error { +// migration for the channel. We use the legacy to denote whether to simulate a +// node with v0.15.0. +func createNotStarted(cdb kvdb.Backend, c *mig26.OpenChannel, + legacy bool) error { + + var newLogs []mig.ChannelCommitment + // Create test logs. oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2} - return setupTestLogs(cdb, c, oldLogs, nil) + + // Add a new log if the node is running with v0.15.0. + if !legacy { + newLogs = []mig.ChannelCommitment{newLog3} + } + return setupTestLogs(cdb, c, oldLogs, newLogs) } // createNotFinished will setup a situation where we have un-migrated logs and -// return the next migration height. -func createNotFinished(cdb kvdb.Backend, c *mig26.OpenChannel) error { +// return the next migration height. We use the legacy to denote whether to +// simulate a node with v0.15.0. +func createNotFinished(cdb kvdb.Backend, c *mig26.OpenChannel, + legacy bool) error { + // Create test logs. oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2} newLogs := []mig.ChannelCommitment{oldLog1} + + // Add a new log if the node is running with v0.15.0. + if !legacy { + newLogs = append(newLogs, newLog3) + } return setupTestLogs(cdb, c, oldLogs, newLogs) } // createFinished will setup a situation where all the old logs have been -// migrated and return a nil. -func createFinished(cdb kvdb.Backend, c *mig26.OpenChannel) error { +// migrated and return a nil. We use the legacy to denote whether to simulate a +// node with v0.15.0. +func createFinished(cdb kvdb.Backend, c *mig26.OpenChannel, + legacy bool) error { + // Create test logs. oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2} newLogs := []mig.ChannelCommitment{oldLog1, oldLog2} + + // Add a new log if the node is running with v0.15.0. + if !legacy { + newLogs = append(newLogs, newLog3) + } return setupTestLogs(cdb, c, oldLogs, newLogs) }