diff --git a/channeldb/migration30/iterator.go b/channeldb/migration30/iterator.go index b5d2c016f..d4e08cbf4 100644 --- a/channeldb/migration30/iterator.go +++ b/channeldb/migration30/iterator.go @@ -2,6 +2,7 @@ package migration30 import ( "bytes" + "encoding/binary" "errors" "fmt" @@ -71,7 +72,7 @@ func (ul *updateLocator) locateChanBucket(rootBucket kvdb.RwBucket) ( // findNextMigrateHeight finds the next commit height that's not migrated. It // returns the commit height bytes found. A nil return value means the // migration has been completed for this particular channel bucket. -func findNextMigrateHeight(chanBucket kvdb.RwBucket) ([]byte, error) { +func findNextMigrateHeight(chanBucket kvdb.RwBucket) []byte { // Read the old log bucket. The old bucket doesn't exist, indicating // either we don't have any old logs for this channel, or the migration // has been finished and the old bucket has been deleted. @@ -79,7 +80,7 @@ func findNextMigrateHeight(chanBucket kvdb.RwBucket) ([]byte, error) { revocationLogBucketDeprecated, ) if oldBucket == nil { - return nil, nil + return nil } // Acquire a read cursor for the old bucket. @@ -92,7 +93,7 @@ func findNextMigrateHeight(chanBucket kvdb.RwBucket) ([]byte, error) { logBucket := chanBucket.NestedReadBucket(revocationLogBucket) if logBucket == nil { nextHeight, _ := oldCursor.First() - return nextHeight, nil + return nextHeight } // Acquire a read cursor for the new bucket. @@ -100,37 +101,111 @@ func findNextMigrateHeight(chanBucket kvdb.RwBucket) ([]byte, error) { // Read the last migrated record. If the key is nil, we haven't // migrated any logs yet. In this case we return the first commit - // height found from the old revocation log bucket. + // height found from the old revocation log bucket. For instance, + // - old log: [1, 2] + // - new log: [] + // We will return the first key [1]. migratedHeight, _ := cursor.Last() if migratedHeight == nil { nextHeight, _ := oldCursor.First() - return nextHeight, nil + return nextHeight } - // Read the last height from the old log bucket. If the height of the - // last old revocation equals to the migrated height, we've done - // migrating for this channel. + // Read the last height from the old log bucket. endHeight, _ := oldCursor.Last() - if bytes.Equal(migratedHeight, endHeight) { - return nil, nil + + switch bytes.Compare(migratedHeight, endHeight) { + // If the height of the last old revocation equals to the migrated + // height, we've done migrating for this channel. For instance, + // - old log: [1, 2] + // - new log: [1, 2] + case 0: + return nil + + // If the migrated height is smaller, it means this is a resumed + // migration. In this case we will return the next height found in the + // old bucket. For instance, + // - old log: [1, 2] + // - new log: [1] + // We will return the key [2]. + case -1: + // Now point the cursor to the migratedHeight. If we cannot + // find this key from the old log bucket, the database might be + // corrupted. In this case, we would return the first key so + // that we would redo the migration for this chan bucket. + matchedHeight, _ := oldCursor.Seek(migratedHeight) + + // NOTE: because Seek will return the next key when the passed + // key cannot be found, we need to compare the `matchedHeight` + // to decide whether `migratedHeight` is found or not. + if !bytes.Equal(matchedHeight, migratedHeight) { + log.Warnf("Old revocation bucket doesn't have "+ + "CommitHeight=%v yet it's found in the new "+ + "bucket. It's likely the new revocation log "+ + "bucket is corrupted. Migrations will be"+ + "applied again.", + binary.BigEndian.Uint64(migratedHeight)) + + // Now return the first height found in the old bucket + // so we can redo the migration. + nextHeight, _ := oldCursor.First() + return nextHeight + } + + // Otherwise, find the next height to be migrated. + nextHeight, _ := oldCursor.Next() + return nextHeight + + // If the migrated height is greater, it means this node has new logs + // saved after v0.15.0. In this case, we need to further decide whether + // the old logs have been migrated or not. + case 1: } - // Now point the cursor to the migratedHeight. If we cannot find this - // key from the old log bucket, the database might be corrupted. In - // this case, we would return the first key so that we would redo the - // migration for this chan bucket. - matchedHeight, _ := oldCursor.Seek(migratedHeight) - if matchedHeight == nil { - // Now return the first height found in the old bucket so we - // can redo the migration. - nextHeight, _ := oldCursor.First() - return nextHeight, nil + // If we ever reached here, it means we have a mixed of new and old + // logs saved. Suppose we have old logs as, + // - old log: [1, 2] + // We'd have four possible scenarios, + // - new log: [ 3, 4] <- no migration happened, return [1]. + // - new log: [1, 3, 4] <- resumed migration, return [2]. + // - new log: [ 2, 3, 4] <- corrupted migration, return [1]. + // - new log: [1, 2, 3, 4] <- finished migration, return nil. + // To find the next migration height, we will iterate the old logs to + // grab the heights and query them in the new bucket until an height + // cannot be found, which is our next migration height. Or, if the old + // heights can all be found, it indicates a finished migration. + + // Move the cursor to the first record. + oldKey, _ := oldCursor.First() + + // NOTE: this action can be time-consuming as we are iterating the + // records and compare them. However, we would only ever hit here if + // this is a resumed migration with new logs created after v.0.15.0. + for { + // Try to locate the old key in the new bucket. If it cannot be + // found, it will be the next migrate height. + newKey, _ := cursor.Seek(oldKey) + + // If the old key is not found in the new bucket, return it as + // our next migration height. + // + // NOTE: because Seek will return the next key when the passed + // key cannot be found, we need to compare the keys to deicde + // whether the old key is found or not. + if !bytes.Equal(newKey, oldKey) { + return oldKey + } + + // Otherwise, keep iterating the old bucket. + oldKey, _ = oldCursor.Next() + + // If we've done iterating, yet all the old keys can be found + // in the new bucket, this means the migration has been + // finished. + if oldKey == nil { + return nil + } } - - // Otherwise, find the next height to be migrated. - nextHeight, _ := oldCursor.Next() - - return nextHeight, nil } // locateNextUpdateNum returns a locator that's used to start our migration. A @@ -142,10 +217,7 @@ func locateNextUpdateNum(openChanBucket kvdb.RwBucket) (*updateLocator, error) { cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error { locator = l - updateNum, err := findNextMigrateHeight(chanBucket) - if err != nil { - return err - } + updateNum := findNextMigrateHeight(chanBucket) // We've found the next commit height and can now exit. if updateNum != nil { diff --git a/channeldb/migration30/iterator_test.go b/channeldb/migration30/iterator_test.go index 14c76a916..7593124f5 100644 --- a/channeldb/migration30/iterator_test.go +++ b/channeldb/migration30/iterator_test.go @@ -154,8 +154,8 @@ func TestFindNextMigrateHeight(t *testing.T) { return err } - height, err = findNextMigrateHeight(chanBucket) - return err + height = findNextMigrateHeight(chanBucket) + return nil }, func() {}) require.NoError(t, err) @@ -175,10 +175,10 @@ func TestFindNextMigrateHeight(t *testing.T) { expectedHeight: nil, }, { - // When we don't have any new logs, our next migration - // height would be the first height found in the old - // logs. - name: "empty new logs", + // When we don't have any migrated logs, our next + // migration height would be the first height found in + // the old logs. + name: "empty migrated logs", oldLogs: []mig.ChannelCommitment{ createDummyChannelCommit(1), createDummyChannelCommit(2), @@ -186,10 +186,10 @@ func TestFindNextMigrateHeight(t *testing.T) { expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1}, }, { - // When we have new logs, the next migration height - // should be the first height found in the old logs but - // not in the new logs. - name: "have new logs", + // When we have migrated logs, the next migration + // height should be the first height found in the old + // logs but not in the migrated logs. + name: "have migrated logs", oldLogs: []mig.ChannelCommitment{ createDummyChannelCommit(1), createDummyChannelCommit(2), @@ -200,10 +200,10 @@ func TestFindNextMigrateHeight(t *testing.T) { expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2}, }, { - // When both the logs have equal length, the next + // When both the logs have equal indexes, the next // migration should be nil as we've finished migrating // for this bucket. - name: "have equal logs", + name: "have finished logs", oldLogs: []mig.ChannelCommitment{ createDummyChannelCommit(1), createDummyChannelCommit(2), @@ -215,10 +215,45 @@ func TestFindNextMigrateHeight(t *testing.T) { expectedHeight: nil, }, { - // When the lastest height found from the new logs is - // ahead of the old logs, we still return the old log's - // height. - name: "corrupted logs", + // When there are new logs saved in the new bucket, + // which happens when the node is running with + // v.0.15.0, and we don't have any migrated logs, the + // next migration height should be the first height + // found in the old bucket. + name: "have new logs but no migrated logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(3), + createDummyChannelCommit(4), + }, + expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1}, + }, + { + // When there are new logs saved in the new bucket, + // which happens when the node is running with + // v.0.15.0, and we have migrated logs, the returned + // value should be the next un-migrated height. + name: "have new logs and migrated logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(3), + createDummyChannelCommit(4), + }, + expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2}, + }, + { + // When there are new logs saved in the new bucket, + // which happens when the node is running with + // v.0.15.0, and we have corrupted logs, the returned + // value should be the first height in the old bucket. + name: "have new logs but missing logs", oldLogs: []mig.ChannelCommitment{ createDummyChannelCommit(1), createDummyChannelCommit(2), @@ -226,9 +261,28 @@ func TestFindNextMigrateHeight(t *testing.T) { newLogs: []mig.ChannelCommitment{ createDummyChannelCommit(2), createDummyChannelCommit(3), + createDummyChannelCommit(4), }, expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1}, }, + { + // When there are new logs saved in the new bucket, + // which happens when the node is running with + // v.0.15.0, and we have finished the migration, we + // expect a nil height to be returned. + name: "have new logs and finished logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + createDummyChannelCommit(3), + createDummyChannelCommit(4), + }, + expectedHeight: nil, + }, } for _, tc := range testCases { diff --git a/channeldb/migration30/migration.go b/channeldb/migration30/migration.go index bc4317b1d..afc1686a6 100644 --- a/channeldb/migration30/migration.go +++ b/channeldb/migration30/migration.go @@ -243,38 +243,33 @@ func logMigrationStat(db kvdb.Backend) (uint64, uint64, error) { // total is the number of total records. total uint64 - // migrated is the number of already migrated records. - migrated uint64 - // unmigrated is the number of unmigrated records. unmigrated uint64 ) err = kvdb.Update(db, func(tx kvdb.RwTx) error { - total, unmigrated, migrated, err = fetchLogStats(tx) + total, unmigrated, err = fetchLogStats(tx) return err }, func() {}) - log.Debugf("Total logs=%d, migrated=%d, unmigrated=%d", total, migrated, - unmigrated) - return total, migrated, err + log.Debugf("Total logs=%d, unmigrated=%d", total, unmigrated) + return total, total - unmigrated, err } // fetchLogStats iterates all the chan buckets to provide stats about the logs. -// The returned values are num of total records, num of un-migrated records, -// and num of migrated records. -func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, uint64, error) { +// The returned values are num of total records, and num of un-migrated +// records. +func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, error) { var ( total uint64 totalUnmigrated uint64 - totalMigrated uint64 ) openChanBucket := tx.ReadWriteBucket(openChannelBucket) // If no bucket is found, we can exit early. if openChanBucket == nil { - return 0, 0, 0, fmt.Errorf("root bucket not found") + return 0, 0, fmt.Errorf("root bucket not found") } // counter is a helper closure used to count the number of records @@ -317,19 +312,10 @@ func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, uint64, error) { return nil } - // countMigrated is a callback function used to count the total number - // of migrated records. - countMigrated := func(chanBucket kvdb.RwBucket, - l *updateLocator) error { - - totalMigrated += counter(chanBucket, revocationLogBucket) - return nil - } - // Locate the next migration height. locator, err := locateNextUpdateNum(openChanBucket) if err != nil { - return 0, 0, 0, fmt.Errorf("locator got error: %v", err) + return 0, 0, fmt.Errorf("locator got error: %v", err) } // If the returned locator is not nil, we still have un-migrated @@ -338,20 +324,17 @@ func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, uint64, error) { if locator != nil { err = iterateBuckets(openChanBucket, locator, countUnmigrated) if err != nil { - return 0, 0, 0, err + return 0, 0, err } } // Count the total number of records by supplying a nil locator. err = iterateBuckets(openChanBucket, nil, countTotal) if err != nil { - return 0, 0, 0, err + return 0, 0, err } - // Count the total number of already migrated records by supplying a - // nil locator. - err = iterateBuckets(openChanBucket, nil, countMigrated) - return total, totalUnmigrated, totalMigrated, err + return total, totalUnmigrated, err } // logEntry houses the info needed to write a new revocation log.