migration30: validate migration results before deleting old buckets

This commit adds a new method `validateMigration` to be used prior to
the deletion of the old buckets to be extraly cautious.
This commit is contained in:
yyforyongyu
2022-07-12 20:25:49 +08:00
parent 78a73f9761
commit b391503ddf
3 changed files with 222 additions and 10 deletions

View File

@@ -92,6 +92,15 @@ func MigrateRevocationLog(db kvdb.Backend) error {
progress, total-migrated)
}
// Before we can safety delete the old buckets, we perform a check to
// make sure the logs are migrated as expected.
err = kvdb.Update(db, validateMigration, func() {})
if err != nil {
return fmt.Errorf("validate migration failed: %v", err)
}
log.Info("Migration check passed, now deleting the old logs...")
// Once the migration completes, we can now safety delete the old
// revocation logs.
if err := deleteOldBuckets(db); err != nil {
@@ -551,3 +560,97 @@ func convertRevocationLog(commit *mig.ChannelCommitment,
return rl, nil
}
// validateMigration checks that the data saved in the new buckets match those
// saved in the old buckets. It does so by checking the last keys saved in both
// buckets can match, given the assumption that the `CommitHeight` is
// monotonically increased value so the last key represents the total number of
// records saved.
func validateMigration(tx kvdb.RwTx) error {
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
// If no bucket is found, we can exit early.
if openChanBucket == nil {
return nil
}
// exitWithErr is a helper closure that prepends an error message with
// the locator info.
exitWithErr := func(l *updateLocator, msg string) error {
return fmt.Errorf("unmatched records found under <nodePub=%x"+
", chainHash=%x, fundingOutpoint=%x>: %v", l.nodePub,
l.chainHash, l.fundingOutpoint, msg)
}
// cb is the callback function to be used when iterating the buckets.
cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
// Read both the old and new revocation log buckets.
oldBucket := chanBucket.NestedReadBucket(
revocationLogBucketDeprecated,
)
newBucket := chanBucket.NestedReadBucket(revocationLogBucket)
// Exit early if the old bucket is nil.
//
// NOTE: the new bucket may not be nil here as new logs might
// have been created using lnd@v0.15.0.
if oldBucket == nil {
return nil
}
// Return an error if the expected new bucket cannot be found.
if newBucket == nil {
return exitWithErr(l, "expected new bucket")
}
// Acquire the cursors.
oldCursor := oldBucket.ReadCursor()
newCursor := newBucket.ReadCursor()
// Jump to the end of the cursors to do a quick check.
newKey, _ := oldCursor.Last()
oldKey, _ := newCursor.Last()
// We expected the CommitHeights to be matched for nodes prior
// to v0.15.0.
if bytes.Equal(newKey, oldKey) {
return nil
}
// If the keys do not match, it's likely the node is running
// v0.15.0 and have new logs created. In this case, we will
// validate that every record in the old bucket can be found in
// the new bucket.
oldKey, _ = oldCursor.First()
for {
// Try to locate the old key in the new bucket and we
// expect it to be found.
newKey, _ := newCursor.Seek(oldKey)
// If the old key is not found in the new bucket,
// return an error.
//
// NOTE: because Seek will return the next key when the
// passed key cannot be found, we need to compare the
// keys to deicde whether the old key is found or not.
if !bytes.Equal(newKey, oldKey) {
errMsg := fmt.Sprintf("old bucket has "+
"CommitHeight=%v cannot be found in "+
"new bucket", oldKey)
return exitWithErr(l, errMsg)
}
// Otherwise, keep iterating the old bucket.
oldKey, _ = oldCursor.Next()
// If we've done iterating, all keys have been matched
// and we can safely exit.
if oldKey == nil {
return nil
}
}
}
return iterateBuckets(openChanBucket, nil, cb)
}

View File

@@ -116,6 +116,76 @@ func TestMigrateRevocationLog(t *testing.T) {
}
}
// TestValidateMigration checks that the function `validateMigration` behaves
// as expected.
func TestValidateMigration(t *testing.T) {
c := createTestChannel(nil)
testCases := []struct {
name string
setup func(db kvdb.Backend) error
expectFail bool
}{
{
// Finished prior to v0.15.0.
name: "valid migration",
setup: func(db kvdb.Backend) error {
return createFinished(db, c, true)
},
expectFail: false,
},
{
// Finished after to v0.15.0.
name: "valid migration after v0.15.0",
setup: func(db kvdb.Backend) error {
return createFinished(db, c, false)
},
expectFail: false,
},
{
// Missing logs prior to v0.15.0.
name: "invalid migration",
setup: func(db kvdb.Backend) error {
return createNotFinished(db, c, true)
},
expectFail: true,
},
{
// Missing logs after to v0.15.0.
name: "invalid migration after v0.15.0",
setup: func(db kvdb.Backend) error {
return createNotFinished(db, c, false)
},
expectFail: true,
},
}
for _, tc := range testCases {
tc := tc
// Create a test db.
cdb, cleanUp, err := migtest.MakeDB()
defer cleanUp()
require.NoError(t, err, "failed to create test db")
t.Run(tc.name, func(t *testing.T) {
// Setup test logs.
err := tc.setup(cdb)
require.NoError(t, err, "failed to setup")
// Call the actual function and check the error is
// returned as expected.
err = kvdb.Update(cdb, validateMigration, func() {})
if tc.expectFail {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
// createTwoChannels creates two channels that have the same chainHash and
// IdentityPub, simulating having two channels under the same peer.
func createTwoChannels() (*mig26.OpenChannel, *mig26.OpenChannel) {
@@ -186,7 +256,7 @@ func buildChannelCases(c *mig26.OpenChannel,
case3 := &channelTestCase{
name: "finished migration",
setup: func(db kvdb.Backend) error {
return createFinished(db, c)
return createFinished(db, c, true)
},
asserter: func(t *testing.T, db kvdb.Backend) {
// Check that the old bucket is removed.
@@ -227,7 +297,7 @@ func buildChannelCases(c *mig26.OpenChannel,
case4 := &channelTestCase{
name: "unfinished migration",
setup: func(db kvdb.Backend) error {
return createNotFinished(db, c)
return createNotFinished(db, c, true)
},
asserter: func(t *testing.T, db kvdb.Backend) {
// Check that the old bucket is removed.
@@ -259,7 +329,7 @@ func buildChannelCases(c *mig26.OpenChannel,
case5 := &channelTestCase{
name: "initial migration",
setup: func(db kvdb.Backend) error {
return createNotStarted(db, c)
return createNotStarted(db, c, true)
},
asserter: assertNewLogs,
unfinished: true,

View File

@@ -176,6 +176,19 @@ var (
},
}
// newLog3 defines an revocation log that's been created after v0.15.0.
newLog3 = mig.ChannelCommitment{
CommitHeight: logHeight2 + 1,
LocalLogIndex: 1,
LocalHtlcIndex: 1,
RemoteLogIndex: 0,
RemoteHtlcIndex: 0,
LocalBalance: lnwire.MilliSatoshi(888_800_000),
RemoteBalance: 0,
CommitTx: commitTx2,
Htlcs: []mig.HTLC{htlc},
}
// The following public keys are taken from the itest results.
localMusigKey, _ = btcec.ParsePubKey([]byte{
0x2,
@@ -489,27 +502,53 @@ func createTestStore() (shachain.Store, error) {
}
// createNotStarted will setup a situation where we haven't started the
// migration for the channel.
func createNotStarted(cdb kvdb.Backend, c *mig26.OpenChannel) error {
// migration for the channel. We use the legacy to denote whether to simulate a
// node with v0.15.0.
func createNotStarted(cdb kvdb.Backend, c *mig26.OpenChannel,
legacy bool) error {
var newLogs []mig.ChannelCommitment
// Create test logs.
oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2}
return setupTestLogs(cdb, c, oldLogs, nil)
// Add a new log if the node is running with v0.15.0.
if !legacy {
newLogs = []mig.ChannelCommitment{newLog3}
}
return setupTestLogs(cdb, c, oldLogs, newLogs)
}
// createNotFinished will setup a situation where we have un-migrated logs and
// return the next migration height.
func createNotFinished(cdb kvdb.Backend, c *mig26.OpenChannel) error {
// return the next migration height. We use the legacy to denote whether to
// simulate a node with v0.15.0.
func createNotFinished(cdb kvdb.Backend, c *mig26.OpenChannel,
legacy bool) error {
// Create test logs.
oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2}
newLogs := []mig.ChannelCommitment{oldLog1}
// Add a new log if the node is running with v0.15.0.
if !legacy {
newLogs = append(newLogs, newLog3)
}
return setupTestLogs(cdb, c, oldLogs, newLogs)
}
// createFinished will setup a situation where all the old logs have been
// migrated and return a nil.
func createFinished(cdb kvdb.Backend, c *mig26.OpenChannel) error {
// migrated and return a nil. We use the legacy to denote whether to simulate a
// node with v0.15.0.
func createFinished(cdb kvdb.Backend, c *mig26.OpenChannel,
legacy bool) error {
// Create test logs.
oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2}
newLogs := []mig.ChannelCommitment{oldLog1, oldLog2}
// Add a new log if the node is running with v0.15.0.
if !legacy {
newLogs = append(newLogs, newLog3)
}
return setupTestLogs(cdb, c, oldLogs, newLogs)
}