Merge pull request #6469 from yyforyongyu/5388-migration

channeldb: add optional migration to prune revocation logs
This commit is contained in:
Olaoluwa Osuntokun
2022-08-04 17:15:08 -07:00
committed by GitHub
20 changed files with 4181 additions and 18 deletions

View File

@@ -23,6 +23,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration26"
"github.com/lightningnetwork/lnd/channeldb/migration27"
"github.com/lightningnetwork/lnd/channeldb/migration29"
"github.com/lightningnetwork/lnd/channeldb/migration30"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
@@ -45,17 +46,34 @@ var (
// up-to-date version of the database.
type migration func(tx kvdb.RwTx) error
type version struct {
// mandatoryVersion defines a db version that must be applied before the lnd
// starts.
type mandatoryVersion struct {
number uint32
migration migration
}
// optionalMigration defines an optional migration function. When a migration
// is optional, it usually involves a large scale of changes that might touch
// millions of keys. Due to OOM concern, the update cannot be safely done
// within one db transaction. Thus, for optional migrations, they must take the
// db backend and construct transactions as needed.
type optionalMigration func(db kvdb.Backend) error
// optionalVersion defines a db version that can be optionally applied. When
// applying migrations, we must apply all the mandatory migrations first before
// attempting optional ones.
type optionalVersion struct {
name string
migration optionalMigration
}
var (
// dbVersions is storing all versions of database. If current version
// of database don't match with latest version this list will be used
// for retrieving all migration function that are need to apply to the
// current db.
dbVersions = []version{
// dbVersions is storing all mandatory versions of database. If current
// version of database don't match with latest version this list will
// be used for retrieving all migration function that are need to apply
// to the current db.
dbVersions = []mandatoryVersion{
{
// The base DB version requires no migration.
number: 0,
@@ -237,6 +255,19 @@ var (
},
}
// optionalVersions stores all optional migrations that are applied
// after dbVersions.
//
// NOTE: optional migrations must be fault-tolerant and re-run already
// migrated data must be noop, which means the migration must be able
// to determine its state.
optionalVersions = []optionalVersion{
{
name: "prune revocation log",
migration: migration30.MigrateRevocationLog,
},
}
// Big endian is the preferred byte order, due to cursor scans over
// integer keys iterating in order.
byteOrder = binary.BigEndian
@@ -337,6 +368,13 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
backend.Close()
return nil, err
}
// Grab the optional migration config.
omc := opts.OptionalMiragtionConfig
if err := chanDB.applyOptionalVersions(omc); err != nil {
backend.Close()
return nil, err
}
}
return chanDB, nil
@@ -1309,7 +1347,7 @@ func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
// syncVersions function is used for safe db version synchronization. It
// applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration.
func (d *DB) syncVersions(versions []version) error {
func (d *DB) syncVersions(versions []mandatoryVersion) error {
meta, err := d.FetchMeta(nil)
if err != nil {
if err == ErrMetaNotFound {
@@ -1379,6 +1417,69 @@ func (d *DB) syncVersions(versions []version) error {
}, func() {})
}
// applyOptionalVersions takes a config to determine whether the optional
// migrations will be applied.
//
// NOTE: only support the prune_revocation_log optional migration atm.
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
// TODO(yy): need to design the db to support dry run for optional
// migrations.
if d.dryRun {
log.Info("Skipped optional migrations as dry run mode is not " +
"supported yet")
return nil
}
om, err := d.fetchOptionalMeta()
if err != nil {
if err == ErrMetaNotFound {
om = &OptionalMeta{
Versions: make(map[uint64]string),
}
} else {
return err
}
}
log.Infof("Checking for optional update: prune_revocation_log=%v, "+
"db_version=%s", cfg.PruneRevocationLog, om)
// Exit early if the optional migration is not specified.
if !cfg.PruneRevocationLog {
return nil
}
// Exit early if the optional migration has already been applied.
if _, ok := om.Versions[0]; ok {
return nil
}
// Get the optional version.
version := optionalVersions[0]
log.Infof("Performing database optional migration: %s", version.name)
// Migrate the data.
if err := version.migration(d); err != nil {
log.Errorf("Unable to apply optional migration: %s, error: %v",
version.name, err)
return err
}
// Update the optional meta. Notice that unlike the mandatory db
// migrations where we perform the migration and updating meta in a
// single db transaction, we use different transactions here. Even when
// the following update is failed, we should be fine here as we would
// re-run the optional migration again, which is a noop, during next
// startup.
om.Versions[0] = version.name
if err := d.putOptionalMeta(om); err != nil {
log.Errorf("Unable to update optional meta: %v", err)
return err
}
return nil
}
// ChannelGraph returns the current instance of the directed channel graph.
func (d *DB) ChannelGraph() *ChannelGraph {
return d.graph
@@ -1390,13 +1491,15 @@ func (d *DB) ChannelStateDB() *ChannelStateDB {
return d.channelStateDB
}
func getLatestDBVersion(versions []version) uint32 {
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
return versions[len(versions)-1].number
}
// getMigrationsToApply retrieves the migration function that should be
// applied to the database.
func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) {
func getMigrationsToApply(versions []mandatoryVersion,
version uint32) ([]migration, []uint32) {
migrations := make([]migration, 0, len(versions))
migrationVersions := make([]uint32, 0, len(versions))

View File

@@ -8,6 +8,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration13"
"github.com/lightningnetwork/lnd/channeldb/migration16"
"github.com/lightningnetwork/lnd/channeldb/migration24"
"github.com/lightningnetwork/lnd/channeldb/migration30"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
)
@@ -38,5 +39,6 @@ func UseLogger(logger btclog.Logger) {
migration13.UseLogger(logger)
migration16.UseLogger(logger)
migration24.UseLogger(logger)
migration30.UseLogger(logger)
kvdb.UseLogger(logger)
}

View File

@@ -1,7 +1,11 @@
package channeldb
import (
"bytes"
"fmt"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/tlv"
)
var (
@@ -12,6 +16,10 @@ var (
// dbVersionKey is a boltdb key and it's used for storing/retrieving
// current database version.
dbVersionKey = []byte("dbp")
// dbVersionKey is a boltdb key and it's used for storing/retrieving
// a list of optional migrations that have been applied.
optionalVersionKey = []byte("ovk")
)
// Meta structure holds the database meta information.
@@ -80,3 +88,92 @@ func putDbVersion(metaBucket kvdb.RwBucket, meta *Meta) error {
byteOrder.PutUint32(scratch, meta.DbVersionNumber)
return metaBucket.Put(dbVersionKey, scratch)
}
// OptionalMeta structure holds the database optional migration information.
type OptionalMeta struct {
// Versions is a set that contains the versions that have been applied.
// When saved to disk, only the indexes are stored.
Versions map[uint64]string
}
func (om *OptionalMeta) String() string {
s := ""
for index, name := range om.Versions {
s += fmt.Sprintf("%d: %s", index, name)
}
if s == "" {
s = "empty"
}
return s
}
// fetchOptionalMeta reads the optional meta from the database.
func (d *DB) fetchOptionalMeta() (*OptionalMeta, error) {
om := &OptionalMeta{
Versions: make(map[uint64]string),
}
err := kvdb.View(d, func(tx kvdb.RTx) error {
metaBucket := tx.ReadBucket(metaBucket)
if metaBucket == nil {
return ErrMetaNotFound
}
vBytes := metaBucket.Get(optionalVersionKey)
// Exit early if nothing found.
if vBytes == nil {
return nil
}
// Read the versions' length.
r := bytes.NewReader(vBytes)
vLen, err := tlv.ReadVarInt(r, &[8]byte{})
if err != nil {
return err
}
// Write the version index.
for i := uint64(0); i < vLen; i++ {
version, err := tlv.ReadVarInt(r, &[8]byte{})
if err != nil {
return err
}
om.Versions[version] = optionalVersions[i].name
}
return nil
}, func() {})
if err != nil {
return nil, err
}
return om, nil
}
// fetchOptionalMeta writes an optional meta to the database.
func (d *DB) putOptionalMeta(om *OptionalMeta) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
metaBucket, err := tx.CreateTopLevelBucket(metaBucket)
if err != nil {
return err
}
var b bytes.Buffer
// Write the total length.
err = tlv.WriteVarInt(&b, uint64(len(om.Versions)), &[8]byte{})
if err != nil {
return err
}
// Write the version indexes.
for v := range om.Versions {
err := tlv.WriteVarInt(&b, v, &[8]byte{})
if err != nil {
return err
}
}
return metaBucket.Put(optionalVersionKey, b.Bytes())
}, func() {})
}

View File

@@ -44,7 +44,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
t.Fatalf("unable to store meta data: %v", err)
}
versions := []version{
versions := []mandatoryVersion{
{
number: 0,
migration: nil,
@@ -124,7 +124,7 @@ func TestOrderOfMigrations(t *testing.T) {
t.Parallel()
appliedMigration := -1
versions := []version{
versions := []mandatoryVersion{
{0, nil},
{1, nil},
{2, func(tx kvdb.RwTx) error {
@@ -498,3 +498,85 @@ func TestMigrationDryRun(t *testing.T) {
true,
true)
}
// TestOptionalMeta checks the basic read and write for the optional meta.
func TestOptionalMeta(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
defer cleanUp()
require.NoError(t, err)
// Test read an empty optional meta.
om, err := db.fetchOptionalMeta()
require.NoError(t, err, "error getting optional meta")
require.Empty(t, om.Versions, "expected empty versions")
// Test write an optional meta.
om = &OptionalMeta{
Versions: map[uint64]string{
0: optionalVersions[0].name,
},
}
err = db.putOptionalMeta(om)
require.NoError(t, err, "error putting optional meta")
om1, err := db.fetchOptionalMeta()
require.NoError(t, err, "error getting optional meta")
require.Equal(t, om, om1, "unexpected empty versions")
require.Equal(t, "0: prune revocation log", om.String())
}
// TestApplyOptionalVersions checks that the optional migration is applied as
// expected based on the config.
func TestApplyOptionalVersions(t *testing.T) {
t.Parallel()
db, cleanUp, err := MakeTestDB()
defer cleanUp()
require.NoError(t, err)
// Overwrite the migration function so we can count how many times the
// migration has happened.
migrateCount := 0
optionalVersions[0].migration = func(_ kvdb.Backend) error {
migrateCount++
return nil
}
// Test that when the flag is false, no migration happens.
cfg := OptionalMiragtionConfig{}
err = db.applyOptionalVersions(cfg)
require.NoError(t, err, "failed to apply optional migration")
require.Equal(t, 0, migrateCount, "expected no migration")
// Check the optional meta is not updated.
om, err := db.fetchOptionalMeta()
require.NoError(t, err, "error getting optional meta")
require.Empty(t, om.Versions, "expected empty versions")
// Test that when specified, the optional migration is applied.
cfg.PruneRevocationLog = true
err = db.applyOptionalVersions(cfg)
require.NoError(t, err, "failed to apply optional migration")
require.Equal(t, 1, migrateCount, "expected migration")
// Fetch the updated optional meta.
om, err = db.fetchOptionalMeta()
require.NoError(t, err, "error getting optional meta")
// Verify that the optional meta is updated as expected.
omExpected := &OptionalMeta{
Versions: map[uint64]string{
0: optionalVersions[0].name,
},
}
require.Equal(t, omExpected, om, "unexpected empty versions")
// Test that though specified, the optional migration is not run since
// it's already been applied.
cfg.PruneRevocationLog = true
err = db.applyOptionalVersions(cfg)
require.NoError(t, err, "failed to apply optional migration")
require.Equal(t, 1, migrateCount, "expected no migration")
}

View File

@@ -0,0 +1,338 @@
package migration30
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// openChanBucket stores all the currently open channels. This bucket
// has a second, nested bucket which is keyed by a node's ID. Within
// that node ID bucket, all attributes required to track, update, and
// close a channel are stored.
openChannelBucket = []byte("open-chan-bucket")
// errExit is returned when the callback function used in iterator
// needs to exit the iteration.
errExit = errors.New("exit condition met")
)
// updateLocator defines a locator that can be used to find the next record to
// be migrated. This is useful when an interrupted migration that leads to a
// mixed revocation log formats saved in our database, we can then restart the
// migration using the locator to continue migrating the rest.
type updateLocator struct {
// nodePub, chainHash and fundingOutpoint are used to locate the
// channel bucket.
nodePub []byte
chainHash []byte
fundingOutpoint []byte
// nextHeight is used to locate the next old revocation log to be
// migrated. A nil value means we've finished the migration.
nextHeight []byte
}
// fetchChanBucket is a helper function that returns the bucket where a
// channel's data resides in given: the public key for the node, the outpoint,
// and the chainhash that the channel resides on.
func (ul *updateLocator) locateChanBucket(rootBucket kvdb.RwBucket) (
kvdb.RwBucket, error) {
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
nodeChanBucket := rootBucket.NestedReadWriteBucket(ul.nodePub)
if nodeChanBucket == nil {
return nil, mig25.ErrNoActiveChannels
}
// We'll then recurse down an additional layer in order to fetch the
// bucket for this particular chain.
chainBucket := nodeChanBucket.NestedReadWriteBucket(ul.chainHash)
if chainBucket == nil {
return nil, mig25.ErrNoActiveChannels
}
// With the bucket for the node and chain fetched, we can now go down
// another level, for this channel itself.
chanBucket := chainBucket.NestedReadWriteBucket(ul.fundingOutpoint)
if chanBucket == nil {
return nil, mig25.ErrChannelNotFound
}
return chanBucket, nil
}
// findNextMigrateHeight finds the next commit height that's not migrated. It
// returns the commit height bytes found. A nil return value means the
// migration has been completed for this particular channel bucket.
func findNextMigrateHeight(chanBucket kvdb.RwBucket) []byte {
// Read the old log bucket. The old bucket doesn't exist, indicating
// either we don't have any old logs for this channel, or the migration
// has been finished and the old bucket has been deleted.
oldBucket := chanBucket.NestedReadBucket(
revocationLogBucketDeprecated,
)
if oldBucket == nil {
return nil
}
// Acquire a read cursor for the old bucket.
oldCursor := oldBucket.ReadCursor()
// Read the new log bucket. The sub-bucket hasn't been created yet,
// indicating we haven't migrated any logs under this channel. In this
// case, we'll return the first commit height found from the old
// revocation log bucket as the next height.
logBucket := chanBucket.NestedReadBucket(revocationLogBucket)
if logBucket == nil {
nextHeight, _ := oldCursor.First()
return nextHeight
}
// Acquire a read cursor for the new bucket.
cursor := logBucket.ReadCursor()
// Read the last migrated record. If the key is nil, we haven't
// migrated any logs yet. In this case we return the first commit
// height found from the old revocation log bucket. For instance,
// - old log: [1, 2]
// - new log: []
// We will return the first key [1].
migratedHeight, _ := cursor.Last()
if migratedHeight == nil {
nextHeight, _ := oldCursor.First()
return nextHeight
}
// Read the last height from the old log bucket.
endHeight, _ := oldCursor.Last()
switch bytes.Compare(migratedHeight, endHeight) {
// If the height of the last old revocation equals to the migrated
// height, we've done migrating for this channel. For instance,
// - old log: [1, 2]
// - new log: [1, 2]
case 0:
return nil
// If the migrated height is smaller, it means this is a resumed
// migration. In this case we will return the next height found in the
// old bucket. For instance,
// - old log: [1, 2]
// - new log: [1]
// We will return the key [2].
case -1:
// Now point the cursor to the migratedHeight. If we cannot
// find this key from the old log bucket, the database might be
// corrupted. In this case, we would return the first key so
// that we would redo the migration for this chan bucket.
matchedHeight, _ := oldCursor.Seek(migratedHeight)
// NOTE: because Seek will return the next key when the passed
// key cannot be found, we need to compare the `matchedHeight`
// to decide whether `migratedHeight` is found or not.
if !bytes.Equal(matchedHeight, migratedHeight) {
log.Warnf("Old revocation bucket doesn't have "+
"CommitHeight=%v yet it's found in the new "+
"bucket. It's likely the new revocation log "+
"bucket is corrupted. Migrations will be"+
"applied again.",
binary.BigEndian.Uint64(migratedHeight))
// Now return the first height found in the old bucket
// so we can redo the migration.
nextHeight, _ := oldCursor.First()
return nextHeight
}
// Otherwise, find the next height to be migrated.
nextHeight, _ := oldCursor.Next()
return nextHeight
// If the migrated height is greater, it means this node has new logs
// saved after v0.15.0. In this case, we need to further decide whether
// the old logs have been migrated or not.
case 1:
}
// If we ever reached here, it means we have a mixed of new and old
// logs saved. Suppose we have old logs as,
// - old log: [1, 2]
// We'd have four possible scenarios,
// - new log: [ 3, 4] <- no migration happened, return [1].
// - new log: [1, 3, 4] <- resumed migration, return [2].
// - new log: [ 2, 3, 4] <- corrupted migration, return [1].
// - new log: [1, 2, 3, 4] <- finished migration, return nil.
// To find the next migration height, we will iterate the old logs to
// grab the heights and query them in the new bucket until an height
// cannot be found, which is our next migration height. Or, if the old
// heights can all be found, it indicates a finished migration.
// Move the cursor to the first record.
oldKey, _ := oldCursor.First()
// NOTE: this action can be time-consuming as we are iterating the
// records and compare them. However, we would only ever hit here if
// this is a resumed migration with new logs created after v.0.15.0.
for {
// Try to locate the old key in the new bucket. If it cannot be
// found, it will be the next migrate height.
newKey, _ := cursor.Seek(oldKey)
// If the old key is not found in the new bucket, return it as
// our next migration height.
//
// NOTE: because Seek will return the next key when the passed
// key cannot be found, we need to compare the keys to deicde
// whether the old key is found or not.
if !bytes.Equal(newKey, oldKey) {
return oldKey
}
// Otherwise, keep iterating the old bucket.
oldKey, _ = oldCursor.Next()
// If we've done iterating, yet all the old keys can be found
// in the new bucket, this means the migration has been
// finished.
if oldKey == nil {
return nil
}
}
}
// locateNextUpdateNum returns a locator that's used to start our migration. A
// nil locator means the migration has been finished.
func locateNextUpdateNum(openChanBucket kvdb.RwBucket) (*updateLocator, error) {
locator := &updateLocator{}
// cb is the callback function to be used when iterating the buckets.
cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
locator = l
updateNum := findNextMigrateHeight(chanBucket)
// We've found the next commit height and can now exit.
if updateNum != nil {
locator.nextHeight = updateNum
return errExit
}
return nil
}
// Iterate the buckets. If we received an exit signal, return the
// locator.
err := iterateBuckets(openChanBucket, nil, cb)
if err == errExit {
log.Debugf("found locator: nodePub=%x, fundingOutpoint=%x, "+
"nextHeight=%x", locator.nodePub, locator.chainHash,
locator.nextHeight)
return locator, nil
}
// If the err is nil, we've iterated all the sub-buckets and the
// migration is finished.
return nil, err
}
// callback defines a type that's used by the iterator.
type callback func(k, v []byte) error
// iterator is a helper function that iterates a given bucket and performs the
// callback function on each key. If a seeker is specified, it will move the
// cursor to the given position otherwise it will start from the first item.
func iterator(bucket kvdb.RBucket, seeker []byte, cb callback) error {
c := bucket.ReadCursor()
k, v := c.First()
// Move the cursor to the specified position if seeker is non-nil.
if seeker != nil {
k, v = c.Seek(seeker)
}
// Start the iteration and exit on condition.
for k, v := k, v; k != nil; k, v = c.Next() {
// cb might return errExit to signal exiting the iteration.
if err := cb(k, v); err != nil {
return err
}
}
return nil
}
// step defines the callback type that's used when iterating the buckets.
type step func(bucket kvdb.RwBucket, l *updateLocator) error
// iterateBuckets locates the cursor at a given position specified by the
// updateLocator and starts the iteration. If a nil locator is passed, it will
// start the iteration from the beginning. During each iteration, the callback
// function is called and it may exit the iteration when the callback returns
// an errExit to signal an exit condition.
func iterateBuckets(openChanBucket kvdb.RwBucket,
l *updateLocator, cb step) error {
// If the locator is nil, we will initiate an empty one, which is
// further used by the iterator.
if l == nil {
l = &updateLocator{}
}
// iterChanBucket iterates the chain bucket to act on each of the
// channel buckets.
iterChanBucket := func(chain kvdb.RwBucket,
k1, k2, _ []byte, cb step) error {
return iterator(
chain, l.fundingOutpoint,
func(k3, _ []byte) error {
// Read the sub-bucket level 3.
chanBucket := chain.NestedReadWriteBucket(k3)
if chanBucket == nil {
return fmt.Errorf("no bucket for "+
"chanPoint=%x", k3)
}
// Construct a new locator at this position.
locator := &updateLocator{
nodePub: k1,
chainHash: k2,
fundingOutpoint: k3,
}
// Set the seeker to nil so it won't affect
// other buckets.
l.fundingOutpoint = nil
return cb(chanBucket, locator)
})
}
return iterator(openChanBucket, l.nodePub, func(k1, v []byte) error {
// Read the sub-bucket level 1.
node := openChanBucket.NestedReadWriteBucket(k1)
if node == nil {
return fmt.Errorf("no bucket for node %x", k1)
}
return iterator(node, l.chainHash, func(k2, v []byte) error {
// Read the sub-bucket level 2.
chain := node.NestedReadWriteBucket(k2)
if chain == nil {
return fmt.Errorf("no bucket for chain=%x", k2)
}
// Set the seeker to nil so it won't affect other
// buckets.
l.chainHash = nil
return iterChanBucket(chain, k1, k2, v, cb)
})
})
}

View File

@@ -0,0 +1,697 @@
package migration30
import (
"bytes"
"testing"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/require"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
)
var (
testRefundTimeout = uint32(740_000)
testIncoming = true
testRHash = bytes.Repeat([]byte{1}, 32)
testOutputIndex = int32(0)
testHTLCAmt = lnwire.MilliSatoshi(1000_000)
testLocalAmt = btcutil.Amount(10_000)
testRemoteAmt = btcutil.Amount(20_000)
testTx = &wire.MsgTx{
Version: 1,
TxIn: []*wire.TxIn{
{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: 0xffffffff,
},
Sequence: 0xffffffff,
},
},
TxOut: []*wire.TxOut{
{Value: int64(testHTLCAmt.ToSatoshis())},
{Value: int64(testLocalAmt)},
{Value: int64(testRemoteAmt)},
},
LockTime: 5,
}
)
// TestLocateChanBucket checks that the updateLocator can successfully locate a
// chanBucket or returns an error.
func TestLocateChanBucket(t *testing.T) {
t.Parallel()
// Create test database.
cdb, cleanUp, err := migtest.MakeDB()
defer cleanUp()
require.NoError(t, err)
// Create a test channel.
c := createTestChannel(nil)
var buf bytes.Buffer
require.NoError(t, mig.WriteOutpoint(&buf, &c.FundingOutpoint))
// Prepare the info needed to query the bucket.
nodePub := c.IdentityPub.SerializeCompressed()
chainHash := c.ChainHash[:]
cp := buf.Bytes()
// Create test buckets.
err = kvdb.Update(cdb, func(tx kvdb.RwTx) error {
_, err := mig25.CreateChanBucket(tx, &c.OpenChannel)
if err != nil {
return err
}
return nil
}, func() {})
require.NoError(t, err)
// testLocator is a helper closure that tests a given locator's
// locateChanBucket method.
testLocator := func(l *updateLocator) error {
return kvdb.Update(cdb, func(tx kvdb.RwTx) error {
rootBucket := tx.ReadWriteBucket(openChannelBucket)
_, err := l.locateChanBucket(rootBucket)
return err
}, func() {})
}
testCases := []struct {
name string
locator *updateLocator
expectedErr error
}{
{
name: "empty node pub key",
locator: &updateLocator{},
expectedErr: mig25.ErrNoActiveChannels,
},
{
name: "empty chainhash",
locator: &updateLocator{
nodePub: nodePub,
},
expectedErr: mig25.ErrNoActiveChannels,
},
{
name: "empty funding outpoint",
locator: &updateLocator{
nodePub: nodePub,
chainHash: chainHash,
},
expectedErr: mig25.ErrChannelNotFound,
},
{
name: "successful query",
locator: &updateLocator{
nodePub: nodePub,
chainHash: chainHash,
fundingOutpoint: cp,
},
expectedErr: nil,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
err := testLocator(tc.locator)
require.Equal(t, tc.expectedErr, err)
})
}
}
// TestFindNextMigrateHeight checks that given a channel bucket, we can
// successfully find the next un-migrated commit height.
func TestFindNextMigrateHeight(t *testing.T) {
t.Parallel()
// Create test database.
cdb, cleanUp, err := migtest.MakeDB()
defer cleanUp()
require.NoError(t, err)
// tester is a helper closure that finds the next migration height.
tester := func(c *mig26.OpenChannel) []byte {
var height []byte
err := kvdb.Update(cdb, func(tx kvdb.RwTx) error {
chanBucket, err := mig25.FetchChanBucket(
tx, &c.OpenChannel,
)
if err != nil {
return err
}
height = findNextMigrateHeight(chanBucket)
return nil
}, func() {})
require.NoError(t, err)
return height
}
testCases := []struct {
name string
oldLogs []mig.ChannelCommitment
newLogs []mig.ChannelCommitment
expectedHeight []byte
}{
{
// When we don't have any old logs, our next migration
// height would be nil.
name: "empty old logs",
expectedHeight: nil,
},
{
// When we don't have any migrated logs, our next
// migration height would be the first height found in
// the old logs.
name: "empty migrated logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1},
},
{
// When we have migrated logs, the next migration
// height should be the first height found in the old
// logs but not in the migrated logs.
name: "have migrated logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
},
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2},
},
{
// When both the logs have equal indexes, the next
// migration should be nil as we've finished migrating
// for this bucket.
name: "have finished logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
expectedHeight: nil,
},
{
// When there are new logs saved in the new bucket,
// which happens when the node is running with
// v.0.15.0, and we don't have any migrated logs, the
// next migration height should be the first height
// found in the old bucket.
name: "have new logs but no migrated logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(3),
createDummyChannelCommit(4),
},
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1},
},
{
// When there are new logs saved in the new bucket,
// which happens when the node is running with
// v.0.15.0, and we have migrated logs, the returned
// value should be the next un-migrated height.
name: "have new logs and migrated logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(3),
createDummyChannelCommit(4),
},
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2},
},
{
// When there are new logs saved in the new bucket,
// which happens when the node is running with
// v.0.15.0, and we have corrupted logs, the returned
// value should be the first height in the old bucket.
name: "have new logs but missing logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(2),
createDummyChannelCommit(3),
createDummyChannelCommit(4),
},
expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1},
},
{
// When there are new logs saved in the new bucket,
// which happens when the node is running with
// v.0.15.0, and we have finished the migration, we
// expect a nil height to be returned.
name: "have new logs and finished logs",
oldLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
},
newLogs: []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
createDummyChannelCommit(3),
createDummyChannelCommit(4),
},
expectedHeight: nil,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// Create a test channel.
c := createTestChannel(nil)
// Setup the database.
err := setupTestLogs(cdb, c, tc.oldLogs, tc.newLogs)
require.NoError(t, err)
// Run the test and check the expected next migration
// height is returned.
height := tester(c)
require.Equal(t, tc.expectedHeight, height)
})
}
}
// TestIterator checks that the iterator iterate the given bucket correctly.
func TestIterator(t *testing.T) {
t.Parallel()
// Create test database.
cdb, cleanUp, err := migtest.MakeDB()
defer cleanUp()
require.NoError(t, err)
// exitKey is used to signal exit when hitting this key.
exitKey := []byte{1}
// seekKey is used to position the cursor.
seekKey := []byte{2}
// endKey is the last key saved in the test bucket.
endKey := []byte{3}
// Create test bucket.
bucketName := []byte("test-bucket")
err = kvdb.Update(cdb, func(tx kvdb.RwTx) error {
bucket, err := tx.CreateTopLevelBucket(bucketName)
if err != nil {
return err
}
if err := bucket.Put(exitKey, testRHash); err != nil {
return err
}
if err := bucket.Put(seekKey, testRHash); err != nil {
return err
}
return bucket.Put(endKey, testRHash)
}, func() {})
require.NoError(t, err)
// tester is a helper closure that tests the iterator.
tester := func(seeker []byte, cb callback, expectedErr error) {
err := kvdb.View(cdb, func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(bucketName)
return iterator(bucket, seeker, cb)
}, func() {})
// Check the err is returned as expected.
require.Equal(t, expectedErr, err)
}
// keysItered records the keys have been iterated.
keysItered := make([][]byte, 0)
// testCb creates a dummy callback that saves the keys it have
// iterated.
testCb := func(k, v []byte) error {
keysItered = append(keysItered, k)
if bytes.Equal(k, exitKey) {
return errExit
}
return nil
}
// Test that without a seeker, we would iterate from the beginning,
// which will end up iterating only one key since we would exit on it.
tester(nil, testCb, errExit)
require.Equal(t, [][]byte{exitKey}, keysItered)
// Reset the keys.
keysItered = make([][]byte, 0)
// Now test that when we use a seeker, we would start our iteration at
// the seeker posisiton. This means we won't exit it early since we've
// skipped the exitKey.
tester(seekKey, testCb, nil)
require.Equal(t, [][]byte{seekKey, endKey}, keysItered)
}
// TestIterateBuckets checks that we can successfully iterate the buckets and
// update the locator during the iteration.
func TestIterateBuckets(t *testing.T) {
t.Parallel()
// Create test database.
cdb, cleanUp, err := migtest.MakeDB()
defer cleanUp()
require.NoError(t, err)
// Create three test channels.
c1 := createTestChannel(nil)
c2 := createTestChannel(nil)
c3 := createTestChannel(nil)
// Create test buckets.
err = kvdb.Update(cdb, func(tx kvdb.RwTx) error {
_, err := mig25.CreateChanBucket(tx, &c1.OpenChannel)
if err != nil {
return err
}
_, err = mig25.CreateChanBucket(tx, &c2.OpenChannel)
if err != nil {
return err
}
_, err = mig25.CreateChanBucket(tx, &c3.OpenChannel)
if err != nil {
return err
}
return nil
}, func() {})
require.NoError(t, err)
// testCb creates a dummy callback that saves the locator it received.
locators := make([]*updateLocator, 0)
testCb := func(_ kvdb.RwBucket, l *updateLocator) error { // nolint:unparam
locators = append(locators, l)
return nil
}
// Iterate the buckets with a nil locator.
err = kvdb.Update(cdb, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(openChannelBucket)
return iterateBuckets(bucket, nil, testCb)
}, func() {})
require.NoError(t, err)
// We should see three locators.
require.Len(t, locators, 3)
// We now test we can iterate the buckets using a locator.
//
// Copy the locator which points to the second channel.
locator := &updateLocator{
nodePub: locators[1].nodePub,
chainHash: locators[1].chainHash,
fundingOutpoint: locators[1].fundingOutpoint,
}
// Reset the locators.
locators = make([]*updateLocator, 0)
// Iterate the buckets with a locator.
err = kvdb.Update(cdb, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(openChannelBucket)
return iterateBuckets(bucket, locator, testCb)
}, func() {})
require.NoError(t, err)
// We should see two locators.
require.Len(t, locators, 2)
}
// TestLocalNextUpdateNum checks that we can successfully locate the next
// migration target record.
func TestLocalNextUpdateNum(t *testing.T) {
t.Parallel()
// assertLocator checks the locator has expected values in its fields.
assertLocator := func(t *testing.T, c *mig26.OpenChannel,
height []byte, l *updateLocator) {
var buf bytes.Buffer
require.NoError(
t, mig.WriteOutpoint(&buf, &c.FundingOutpoint),
)
// Prepare the info needed to validate the locator.
nodePub := c.IdentityPub.SerializeCompressed()
chainHash := c.ChainHash[:]
cp := buf.Bytes()
require.Equal(t, nodePub, l.nodePub, "wrong nodePub")
require.Equal(t, chainHash, l.chainHash, "wrong chainhash")
require.Equal(t, cp, l.fundingOutpoint, "wrong outpoint")
require.Equal(t, height, l.nextHeight, "wrong nextHeight")
}
// createTwoChannels is a helper closure that creates two testing
// channels and returns the channels sorted by their nodePub to match
// how they are stored in boltdb.
createTwoChannels := func() (*mig26.OpenChannel, *mig26.OpenChannel) {
c1 := createTestChannel(nil)
c2 := createTestChannel(nil)
// If c1 is greater than c2, boltdb will put c2 before c1.
if bytes.Compare(
c1.IdentityPub.SerializeCompressed(),
c2.IdentityPub.SerializeCompressed(),
) > 0 {
c1, c2 = c2, c1
}
return c1, c2
}
// createNotFinished will setup a situation where we have un-migrated
// logs and return the next migration height.
createNotFinished := func(cdb kvdb.Backend,
c *mig26.OpenChannel) []byte {
// Create test logs.
oldLogs := []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
}
newLogs := []mig.ChannelCommitment{
createDummyChannelCommit(1),
}
err := setupTestLogs(cdb, c, oldLogs, newLogs)
require.NoError(t, err)
return []byte{0, 0, 0, 0, 0, 0, 0, 2}
}
// createFinished will setup a situation where all the old logs have
// been migrated and return a nil.
createFinished := func(cdb kvdb.Backend, c *mig26.OpenChannel) []byte { // nolint:unparam
// Create test logs.
oldLogs := []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
}
newLogs := []mig.ChannelCommitment{
createDummyChannelCommit(1),
createDummyChannelCommit(2),
}
err := setupTestLogs(cdb, c, oldLogs, newLogs)
require.NoError(t, err)
return nil
}
// emptyChannel builds a test case where no channel buckets exist.
emptyChannel := func(cdb kvdb.Backend) (
*mig26.OpenChannel, []byte) {
// Create the root bucket.
err := setupTestLogs(cdb, nil, nil, nil)
require.NoError(t, err)
return nil, nil
}
// singleChannelNoLogs builds a test case where we have a single
// channel without any revocation logs.
singleChannelNoLogs := func(cdb kvdb.Backend) (
*mig26.OpenChannel, []byte) {
// Create a test channel.
c := createTestChannel(nil)
// Create test logs.
err := setupTestLogs(cdb, c, nil, nil)
require.NoError(t, err)
return c, nil
}
// singleChannelNotFinished builds a test case where we have a single
// channel and have unfinished old logs.
singleChannelNotFinished := func(cdb kvdb.Backend) (
*mig26.OpenChannel, []byte) {
c := createTestChannel(nil)
return c, createNotFinished(cdb, c)
}
// singleChannelFinished builds a test where we have a single channel
// and have finished all the migration.
singleChannelFinished := func(cdb kvdb.Backend) (
*mig26.OpenChannel, []byte) {
c := createTestChannel(nil)
return c, createFinished(cdb, c)
}
// twoChannelsNotFinished builds a test case where we have two channels
// and have unfinished old logs.
twoChannelsNotFinished := func(cdb kvdb.Backend) (
*mig26.OpenChannel, []byte) {
c1, c2 := createTwoChannels()
createFinished(cdb, c1)
return c2, createNotFinished(cdb, c2)
}
// twoChannelsFinished builds a test case where we have two channels
// and have finished the migration.
twoChannelsFinished := func(cdb kvdb.Backend) (
*mig26.OpenChannel, []byte) {
c1, c2 := createTwoChannels()
createFinished(cdb, c1)
return c2, createFinished(cdb, c2)
}
type setupFunc func(cdb kvdb.Backend) (*mig26.OpenChannel, []byte)
testCases := []struct {
name string
setup setupFunc
expectFinish bool
}{
{
name: "empty buckets",
setup: emptyChannel,
expectFinish: true,
},
{
name: "single channel no logs",
setup: singleChannelNoLogs,
expectFinish: true,
},
{
name: "single channel not finished",
setup: singleChannelNotFinished,
expectFinish: false,
},
{
name: "single channel finished",
setup: singleChannelFinished,
expectFinish: true,
},
{
name: "two channels not finished",
setup: twoChannelsNotFinished,
expectFinish: false,
},
{
name: "two channels finished",
setup: twoChannelsFinished,
expectFinish: true,
},
}
// tester is a helper closure that finds the locator.
tester := func(t *testing.T, cdb kvdb.Backend) *updateLocator {
var l *updateLocator
err := kvdb.Update(cdb, func(tx kvdb.RwTx) error {
rootBucket := tx.ReadWriteBucket(openChannelBucket)
// Find the locator.
locator, err := locateNextUpdateNum(rootBucket)
if err != nil {
return err
}
l = locator
return nil
}, func() {})
require.NoError(t, err)
return l
}
for _, tc := range testCases {
// Create a test database.
cdb, cleanUp, err := migtest.MakeDB()
defer cleanUp()
require.NoError(t, err)
tc := tc
t.Run(tc.name, func(t *testing.T) {
// Setup the test case.
c, height := tc.setup(cdb)
// Run the test and assert the locator.
locator := tester(t, cdb)
if tc.expectFinish {
require.Nil(t, locator, "expected nil locator")
} else {
assertLocator(t, c, height, locator)
}
})
}
}
func createDummyChannelCommit(height uint64) mig.ChannelCommitment {
htlc := mig.HTLC{
Amt: testHTLCAmt,
RefundTimeout: testRefundTimeout,
OutputIndex: testOutputIndex,
Incoming: testIncoming,
}
copy(htlc.RHash[:], testRHash)
c := mig.ChannelCommitment{
CommitHeight: height,
Htlcs: []mig.HTLC{htlc},
CommitTx: testTx,
}
return c
}

View File

@@ -0,0 +1,361 @@
package migration30
import (
"bytes"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/input"
)
// CommitmentKeyRing holds all derived keys needed to construct commitment and
// HTLC transactions. The keys are derived differently depending whether the
// commitment transaction is ours or the remote peer's. Private keys associated
// with each key may belong to the commitment owner or the "other party" which
// is referred to in the field comments, regardless of which is local and which
// is remote.
type CommitmentKeyRing struct {
// CommitPoint is the "per commitment point" used to derive the tweak
// for each base point.
CommitPoint *btcec.PublicKey
// LocalCommitKeyTweak is the tweak used to derive the local public key
// from the local payment base point or the local private key from the
// base point secret. This may be included in a SignDescriptor to
// generate signatures for the local payment key.
//
// NOTE: This will always refer to "our" local key, regardless of
// whether this is our commit or not.
LocalCommitKeyTweak []byte
// TODO(roasbeef): need delay tweak as well?
// LocalHtlcKeyTweak is the tweak used to derive the local HTLC key
// from the local HTLC base point. This value is needed in order to
// derive the final key used within the HTLC scripts in the commitment
// transaction.
//
// NOTE: This will always refer to "our" local HTLC key, regardless of
// whether this is our commit or not.
LocalHtlcKeyTweak []byte
// LocalHtlcKey is the key that will be used in any clause paying to
// our node of any HTLC scripts within the commitment transaction for
// this key ring set.
//
// NOTE: This will always refer to "our" local HTLC key, regardless of
// whether this is our commit or not.
LocalHtlcKey *btcec.PublicKey
// RemoteHtlcKey is the key that will be used in clauses within the
// HTLC script that send money to the remote party.
//
// NOTE: This will always refer to "their" remote HTLC key, regardless
// of whether this is our commit or not.
RemoteHtlcKey *btcec.PublicKey
// ToLocalKey is the commitment transaction owner's key which is
// included in HTLC success and timeout transaction scripts. This is
// the public key used for the to_local output of the commitment
// transaction.
//
// NOTE: Who's key this is depends on the current perspective. If this
// is our commitment this will be our key.
ToLocalKey *btcec.PublicKey
// ToRemoteKey is the non-owner's payment key in the commitment tx.
// This is the key used to generate the to_remote output within the
// commitment transaction.
//
// NOTE: Who's key this is depends on the current perspective. If this
// is our commitment this will be their key.
ToRemoteKey *btcec.PublicKey
// RevocationKey is the key that can be used by the other party to
// redeem outputs from a revoked commitment transaction if it were to
// be published.
//
// NOTE: Who can sign for this key depends on the current perspective.
// If this is our commitment, it means the remote node can sign for
// this key in case of a breach.
RevocationKey *btcec.PublicKey
}
// ScriptInfo holds a redeem script and hash.
type ScriptInfo struct {
// PkScript is the output's PkScript.
PkScript []byte
// WitnessScript is the full script required to properly redeem the
// output. This field should be set to the full script if a p2wsh
// output is being signed. For p2wkh it should be set equal to the
// PkScript.
WitnessScript []byte
}
// findOutputIndexesFromRemote finds the index of our and their outputs from
// the remote commitment transaction. It derives the key ring to compute the
// output scripts and compares them against the outputs inside the commitment
// to find the match.
func findOutputIndexesFromRemote(revocationPreimage *chainhash.Hash,
chanState *mig26.OpenChannel,
oldLog *mig.ChannelCommitment) (uint32, uint32, error) {
// Init the output indexes as empty.
ourIndex := uint32(OutputIndexEmpty)
theirIndex := uint32(OutputIndexEmpty)
chanCommit := oldLog
_, commitmentPoint := btcec.PrivKeyFromBytes(revocationPreimage[:])
// With the commitment point generated, we can now derive the king ring
// which will be used to generate the output scripts.
keyRing := DeriveCommitmentKeys(
commitmentPoint, false, chanState.ChanType,
&chanState.LocalChanCfg, &chanState.RemoteChanCfg,
)
// Since it's remote commitment chain, we'd used the mirrored values.
//
// We use the remote's channel config for the csv delay.
theirDelay := uint32(chanState.RemoteChanCfg.CsvDelay)
// If we are the initiator of this channel, then it's be false from the
// remote's PoV.
isRemoteInitiator := !chanState.IsInitiator
var leaseExpiry uint32
if chanState.ChanType.HasLeaseExpiration() {
leaseExpiry = chanState.ThawHeight
}
// Map the scripts from our PoV. When facing a local commitment, the to
// local output belongs to us and the to remote output belongs to them.
// When facing a remote commitment, the to local output belongs to them
// and the to remote output belongs to us.
// Compute the to local script. From our PoV, when facing a remote
// commitment, the to local output belongs to them.
theirScript, err := CommitScriptToSelf(
chanState.ChanType, isRemoteInitiator, keyRing.ToLocalKey,
keyRing.RevocationKey, theirDelay, leaseExpiry,
)
if err != nil {
return ourIndex, theirIndex, err
}
// Compute the to remote script. From our PoV, when facing a remote
// commitment, the to remote output belongs to us.
ourScript, _, err := CommitScriptToRemote(
chanState.ChanType, isRemoteInitiator, keyRing.ToRemoteKey,
leaseExpiry,
)
if err != nil {
return ourIndex, theirIndex, err
}
// Now compare the scripts to find our/their output index.
for i, txOut := range chanCommit.CommitTx.TxOut {
switch {
case bytes.Equal(txOut.PkScript, ourScript.PkScript):
ourIndex = uint32(i)
case bytes.Equal(txOut.PkScript, theirScript.PkScript):
theirIndex = uint32(i)
}
}
return ourIndex, theirIndex, nil
}
// DeriveCommitmentKeys generates a new commitment key set using the base points
// and commitment point. The keys are derived differently depending on the type
// of channel, and whether the commitment transaction is ours or the remote
// peer's.
func DeriveCommitmentKeys(commitPoint *btcec.PublicKey,
isOurCommit bool, chanType mig25.ChannelType,
localChanCfg, remoteChanCfg *mig.ChannelConfig) *CommitmentKeyRing {
tweaklessCommit := chanType.IsTweakless()
// Depending on if this is our commit or not, we'll choose the correct
// base point.
localBasePoint := localChanCfg.PaymentBasePoint
if isOurCommit {
localBasePoint = localChanCfg.DelayBasePoint
}
// First, we'll derive all the keys that don't depend on the context of
// whose commitment transaction this is.
keyRing := &CommitmentKeyRing{
CommitPoint: commitPoint,
LocalCommitKeyTweak: input.SingleTweakBytes(
commitPoint, localBasePoint.PubKey,
),
LocalHtlcKeyTweak: input.SingleTweakBytes(
commitPoint, localChanCfg.HtlcBasePoint.PubKey,
),
LocalHtlcKey: input.TweakPubKey(
localChanCfg.HtlcBasePoint.PubKey, commitPoint,
),
RemoteHtlcKey: input.TweakPubKey(
remoteChanCfg.HtlcBasePoint.PubKey, commitPoint,
),
}
// We'll now compute the to_local, to_remote, and revocation key based
// on the current commitment point. All keys are tweaked each state in
// order to ensure the keys from each state are unlinkable. To create
// the revocation key, we take the opposite party's revocation base
// point and combine that with the current commitment point.
var (
toLocalBasePoint *btcec.PublicKey
toRemoteBasePoint *btcec.PublicKey
revocationBasePoint *btcec.PublicKey
)
if isOurCommit {
toLocalBasePoint = localChanCfg.DelayBasePoint.PubKey
toRemoteBasePoint = remoteChanCfg.PaymentBasePoint.PubKey
revocationBasePoint = remoteChanCfg.RevocationBasePoint.PubKey
} else {
toLocalBasePoint = remoteChanCfg.DelayBasePoint.PubKey
toRemoteBasePoint = localChanCfg.PaymentBasePoint.PubKey
revocationBasePoint = localChanCfg.RevocationBasePoint.PubKey
}
// With the base points assigned, we can now derive the actual keys
// using the base point, and the current commitment tweak.
keyRing.ToLocalKey = input.TweakPubKey(toLocalBasePoint, commitPoint)
keyRing.RevocationKey = input.DeriveRevocationPubkey(
revocationBasePoint, commitPoint,
)
// If this commitment should omit the tweak for the remote point, then
// we'll use that directly, and ignore the commitPoint tweak.
if tweaklessCommit {
keyRing.ToRemoteKey = toRemoteBasePoint
// If this is not our commitment, the above ToRemoteKey will be
// ours, and we blank out the local commitment tweak to
// indicate that the key should not be tweaked when signing.
if !isOurCommit {
keyRing.LocalCommitKeyTweak = nil
}
} else {
keyRing.ToRemoteKey = input.TweakPubKey(
toRemoteBasePoint, commitPoint,
)
}
return keyRing
}
// CommitScriptToRemote derives the appropriate to_remote script based on the
// channel's commitment type. The `initiator` argument should correspond to the
// owner of the commitment transaction which we are generating the to_remote
// script for. The second return value is the CSV delay of the output script,
// what must be satisfied in order to spend the output.
func CommitScriptToRemote(chanType mig25.ChannelType, initiator bool,
key *btcec.PublicKey, leaseExpiry uint32) (*ScriptInfo, uint32, error) {
switch {
// If we are not the initiator of a leased channel, then the remote
// party has an additional CLTV requirement in addition to the 1 block
// CSV requirement.
case chanType.HasLeaseExpiration() && !initiator:
script, err := input.LeaseCommitScriptToRemoteConfirmed(
key, leaseExpiry,
)
if err != nil {
return nil, 0, err
}
p2wsh, err := input.WitnessScriptHash(script)
if err != nil {
return nil, 0, err
}
return &ScriptInfo{
PkScript: p2wsh,
WitnessScript: script,
}, 1, nil
// If this channel type has anchors, we derive the delayed to_remote
// script.
case chanType.HasAnchors():
script, err := input.CommitScriptToRemoteConfirmed(key)
if err != nil {
return nil, 0, err
}
p2wsh, err := input.WitnessScriptHash(script)
if err != nil {
return nil, 0, err
}
return &ScriptInfo{
PkScript: p2wsh,
WitnessScript: script,
}, 1, nil
default:
// Otherwise the to_remote will be a simple p2wkh.
p2wkh, err := input.CommitScriptUnencumbered(key)
if err != nil {
return nil, 0, err
}
// Since this is a regular P2WKH, the WitnessScipt and PkScript
// should both be set to the script hash.
return &ScriptInfo{
WitnessScript: p2wkh,
PkScript: p2wkh,
}, 0, nil
}
}
// CommitScriptToSelf constructs the public key script for the output on the
// commitment transaction paying to the "owner" of said commitment transaction.
// The `initiator` argument should correspond to the owner of the commitment
// transaction which we are generating the to_local script for. If the other
// party learns of the preimage to the revocation hash, then they can claim all
// the settled funds in the channel, plus the unsettled funds.
func CommitScriptToSelf(chanType mig25.ChannelType, initiator bool,
selfKey, revokeKey *btcec.PublicKey, csvDelay, leaseExpiry uint32) (
*ScriptInfo, error) {
var (
toLocalRedeemScript []byte
err error
)
switch {
// If we are the initiator of a leased channel, then we have an
// additional CLTV requirement in addition to the usual CSV requirement.
case initiator && chanType.HasLeaseExpiration():
toLocalRedeemScript, err = input.LeaseCommitScriptToSelf(
selfKey, revokeKey, csvDelay, leaseExpiry,
)
default:
toLocalRedeemScript, err = input.CommitScriptToSelf(
csvDelay, selfKey, revokeKey,
)
}
if err != nil {
return nil, err
}
toLocalScriptHash, err := input.WitnessScriptHash(toLocalRedeemScript)
if err != nil {
return nil, err
}
return &ScriptInfo{
PkScript: toLocalScriptHash,
WitnessScript: toLocalRedeemScript,
}, nil
}

View File

@@ -0,0 +1,14 @@
package migration30
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View File

@@ -0,0 +1,656 @@
package migration30
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"sync"
mig24 "github.com/lightningnetwork/lnd/channeldb/migration24"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/kvdb"
)
// recordsPerTx specifies the number of records to be migrated in each database
// transaction. In the worst case, each old revocation log is 28,057 bytes.
// 20,000 records would consume 0.56 GB of ram, which is feasible for a modern
// machine.
//
// NOTE: we could've used more ram but it doesn't help with the speed of the
// migration since the most of the CPU time is used for calculating the output
// indexes.
const recordsPerTx = 20_000
// MigrateRevocationLog migrates the old revocation logs into the newer format
// and deletes them once finished, with the deletion only happens once ALL the
// old logs have been migrates.
func MigrateRevocationLog(db kvdb.Backend) error {
log.Infof("Migrating revocation logs, might take a while...")
var (
err error
// finished is used to exit the for loop.
finished bool
// total is the number of total records.
total uint64
// migrated is the number of already migrated records.
migrated uint64
)
// First of all, read the stats of the revocation logs.
total, migrated, err = logMigrationStat(db)
if err != nil {
return err
}
log.Infof("Total logs=%d, migrated=%d", total, migrated)
// Exit early if the old logs have already been migrated and deleted.
if total == 0 {
log.Info("Migration already finished!")
return nil
}
for {
if finished {
log.Infof("Migrating old revocation logs finished, " +
"now checking the migration results...")
break
}
// Process the migration.
err = kvdb.Update(db, func(tx kvdb.RwTx) error {
finished, err = processMigration(tx)
if err != nil {
return err
}
return nil
}, func() {})
if err != nil {
return err
}
// Each time we finished the above process, we'd read the stats
// again to understand the current progress.
total, migrated, err = logMigrationStat(db)
if err != nil {
return err
}
// Calculate and log the progress if the progress is less than
// one.
progress := float64(migrated) / float64(total) * 100
if progress >= 100 {
continue
}
log.Infof("Migration progress: %.3f%%, still have: %d",
progress, total-migrated)
}
// Before we can safety delete the old buckets, we perform a check to
// make sure the logs are migrated as expected.
err = kvdb.Update(db, validateMigration, func() {})
if err != nil {
return fmt.Errorf("validate migration failed: %v", err)
}
log.Info("Migration check passed, now deleting the old logs...")
// Once the migration completes, we can now safety delete the old
// revocation logs.
if err := deleteOldBuckets(db); err != nil {
return fmt.Errorf("deleteOldBuckets err: %v", err)
}
log.Info("Old revocation log buckets removed!")
return nil
}
// processMigration finds the next un-migrated revocation logs, reads a max
// number of `recordsPerTx` records, converts them into the new revocation logs
// and save them to disk.
func processMigration(tx kvdb.RwTx) (bool, error) {
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
// If no bucket is found, we can exit early.
if openChanBucket == nil {
return false, fmt.Errorf("root bucket not found")
}
// Locate the next migration height.
locator, err := locateNextUpdateNum(openChanBucket)
if err != nil {
return false, fmt.Errorf("locator got error: %v", err)
}
// If the returned locator is nil, we've done migrating the logs.
if locator == nil {
return true, nil
}
// Read a list of old revocation logs.
entryMap, err := readOldRevocationLogs(openChanBucket, locator)
if err != nil {
return false, fmt.Errorf("read old logs err: %v", err)
}
// Migrate the revocation logs.
return false, writeRevocationLogs(openChanBucket, entryMap)
}
// deleteOldBuckets iterates all the channel buckets and deletes the old
// revocation buckets.
func deleteOldBuckets(db kvdb.Backend) error {
// locators records all the chan buckets found in the database.
var locators []*updateLocator
// reader is a helper closure that saves the locator found. Each
// locator is relatively small(33+32+36+8=109 bytes), assuming 1 GB of
// ram we can fit roughly 10 million records. Since each record
// corresponds to a channel, we should have more than enough memory to
// read them all.
reader := func(_ kvdb.RwBucket, l *updateLocator) error { // nolint:unparam
locators = append(locators, l)
return nil
}
// remover is a helper closure that removes the old revocation log
// bucket under the specified chan bucket by the given locator.
remover := func(rootBucket kvdb.RwBucket, l *updateLocator) error {
chanBucket, err := l.locateChanBucket(rootBucket)
if err != nil {
return err
}
return chanBucket.DeleteNestedBucket(
revocationLogBucketDeprecated,
)
}
// Perform the deletion in one db transaction. This should not cause
// any memory issue as the deletion doesn't load any data from the
// buckets.
return kvdb.Update(db, func(tx kvdb.RwTx) error {
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
// Exit early if there's no bucket.
if openChanBucket == nil {
return nil
}
// Iterate the buckets to find all the locators.
err := iterateBuckets(openChanBucket, nil, reader)
if err != nil {
return err
}
// Iterate the locators and delete all the old revocation log
// buckets.
for _, l := range locators {
err := remover(openChanBucket, l)
// If the bucket doesn't exist, we can exit safety.
if err != nil && err != kvdb.ErrBucketNotFound {
return err
}
}
return nil
}, func() {})
}
// writeRevocationLogs unwraps the entryMap and writes the new revocation logs.
func writeRevocationLogs(openChanBucket kvdb.RwBucket,
entryMap logEntries) error {
for locator, logs := range entryMap {
// Find the channel bucket.
chanBucket, err := locator.locateChanBucket(openChanBucket)
if err != nil {
return fmt.Errorf("locateChanBucket err: %v", err)
}
// Create the new log bucket.
logBucket, err := chanBucket.CreateBucketIfNotExists(
revocationLogBucket,
)
if err != nil {
return fmt.Errorf("create log bucket err: %v", err)
}
// Write the new logs.
for _, entry := range logs {
var b bytes.Buffer
err := serializeRevocationLog(&b, entry.log)
if err != nil {
return err
}
logEntrykey := mig24.MakeLogKey(entry.commitHeight)
err = logBucket.Put(logEntrykey[:], b.Bytes())
if err != nil {
return fmt.Errorf("putRevocationLog err: %v",
err)
}
}
}
return nil
}
// logMigrationStat reads the buckets to provide stats over current migration
// progress. The returned values are the numbers of total records and already
// migrated records.
func logMigrationStat(db kvdb.Backend) (uint64, uint64, error) {
var (
err error
// total is the number of total records.
total uint64
// unmigrated is the number of unmigrated records.
unmigrated uint64
)
err = kvdb.Update(db, func(tx kvdb.RwTx) error {
total, unmigrated, err = fetchLogStats(tx)
return err
}, func() {})
log.Debugf("Total logs=%d, unmigrated=%d", total, unmigrated)
return total, total - unmigrated, err
}
// fetchLogStats iterates all the chan buckets to provide stats about the logs.
// The returned values are num of total records, and num of un-migrated
// records.
func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, error) {
var (
total uint64
totalUnmigrated uint64
)
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
// If no bucket is found, we can exit early.
if openChanBucket == nil {
return 0, 0, fmt.Errorf("root bucket not found")
}
// counter is a helper closure used to count the number of records
// based on the given bucket.
counter := func(chanBucket kvdb.RwBucket, bucket []byte) uint64 {
// Read the sub-bucket level 4.
logBucket := chanBucket.NestedReadBucket(bucket)
// Exit early if we don't have the bucket.
if logBucket == nil {
return 0
}
// Jump to the end of the cursor.
key, _ := logBucket.ReadCursor().Last()
// Since the CommitHeight is a zero-based monotonically
// increased index, its value plus one reflects the total
// records under this chan bucket.
lastHeight := binary.BigEndian.Uint64(key) + 1
return lastHeight
}
// countTotal is a callback function used to count the total number of
// records.
countTotal := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
total += counter(chanBucket, revocationLogBucketDeprecated)
return nil
}
// countUnmigrated is a callback function used to count the total
// number of un-migrated records.
countUnmigrated := func(chanBucket kvdb.RwBucket,
l *updateLocator) error {
totalUnmigrated += counter(
chanBucket, revocationLogBucketDeprecated,
)
return nil
}
// Locate the next migration height.
locator, err := locateNextUpdateNum(openChanBucket)
if err != nil {
return 0, 0, fmt.Errorf("locator got error: %v", err)
}
// If the returned locator is not nil, we still have un-migrated
// records so we need to count them. Otherwise we've done migrating the
// logs.
if locator != nil {
err = iterateBuckets(openChanBucket, locator, countUnmigrated)
if err != nil {
return 0, 0, err
}
}
// Count the total number of records by supplying a nil locator.
err = iterateBuckets(openChanBucket, nil, countTotal)
if err != nil {
return 0, 0, err
}
return total, totalUnmigrated, err
}
// logEntry houses the info needed to write a new revocation log.
type logEntry struct {
log *RevocationLog
commitHeight uint64
ourIndex uint32
theirIndex uint32
locator *updateLocator
}
// logEntries maps a bucket locator to a list of entries under that bucket.
type logEntries map[*updateLocator][]*logEntry
// result is made of two channels that's used to send back the constructed new
// revocation log or an error.
type result struct {
newLog chan *logEntry
errChan chan error
}
// readOldRevocationLogs finds a list of old revocation logs and converts them
// into the new revocation logs.
func readOldRevocationLogs(openChanBucket kvdb.RwBucket,
locator *updateLocator) (logEntries, error) {
entries := make(logEntries)
results := make([]*result, 0)
var wg sync.WaitGroup
// collectLogs is a helper closure that reads all newly created
// revocation logs sent over the result channels.
//
// NOTE: the order of the logs cannot be guaranteed, which is fine as
// boltdb will take care of the orders when saving them.
collectLogs := func() error {
wg.Wait()
for _, r := range results {
select {
case entry := <-r.newLog:
entries[entry.locator] = append(
entries[entry.locator], entry,
)
case err := <-r.errChan:
return err
}
}
return nil
}
// createLog is a helper closure that constructs a new revocation log.
//
// NOTE: used as a goroutine.
createLog := func(chanState *mig26.OpenChannel,
c mig.ChannelCommitment, l *updateLocator, r *result) {
defer wg.Done()
// Find the output indexes.
ourIndex, theirIndex, err := findOutputIndexes(chanState, &c)
if err != nil {
r.errChan <- err
}
// Convert the old logs into the new logs. We do this early in
// the read tx so the old large revocation log can be set to
// nil here so save us some memory space.
newLog, err := convertRevocationLog(&c, ourIndex, theirIndex)
if err != nil {
r.errChan <- err
}
// Create the entry that will be used to create the new log.
entry := &logEntry{
log: newLog,
commitHeight: c.CommitHeight,
ourIndex: ourIndex,
theirIndex: theirIndex,
locator: l,
}
r.newLog <- entry
}
// innerCb is the stepping function used when iterating the old log
// bucket.
innerCb := func(chanState *mig26.OpenChannel, l *updateLocator,
_, v []byte) error {
reader := bytes.NewReader(v)
c, err := mig.DeserializeChanCommit(reader)
if err != nil {
return err
}
r := &result{
newLog: make(chan *logEntry, 1),
errChan: make(chan error, 1),
}
results = append(results, r)
// We perform the log creation in a goroutine as it takes some
// time to compute and find output indexes.
wg.Add(1)
go createLog(chanState, c, l, r)
// Check the records read so far and signals exit when we've
// reached our memory cap.
if len(results) >= recordsPerTx {
return errExit
}
return nil
}
// cb is the callback function to be used when iterating the buckets.
cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
// Read the open channel.
c := &mig26.OpenChannel{}
err := mig26.FetchChanInfo(chanBucket, c, false)
if err != nil {
return fmt.Errorf("unable to fetch chan info: %v", err)
}
err = fetchChanRevocationState(chanBucket, c)
if err != nil {
return fmt.Errorf("unable to fetch revocation "+
"state: %v", err)
}
// Read the sub-bucket level 4.
logBucket := chanBucket.NestedReadBucket(
revocationLogBucketDeprecated,
)
// Exit early if we don't have the old bucket.
if logBucket == nil {
return nil
}
// Init the map key when needed.
_, ok := entries[l]
if !ok {
entries[l] = make([]*logEntry, 0, recordsPerTx)
}
return iterator(
logBucket, locator.nextHeight,
func(k, v []byte) error {
// Reset the nextHeight for following chan
// buckets.
locator.nextHeight = nil
return innerCb(c, l, k, v)
},
)
}
err := iterateBuckets(openChanBucket, locator, cb)
// If there's an error and it's not exit signal, we won't collect the
// logs from the result channels.
if err != nil && err != errExit {
return nil, err
}
// Otherwise, collect the logs.
err = collectLogs()
return entries, err
}
// convertRevocationLog uses the fields `CommitTx` and `Htlcs` from a
// ChannelCommitment to construct a revocation log entry.
func convertRevocationLog(commit *mig.ChannelCommitment,
ourOutputIndex, theirOutputIndex uint32) (*RevocationLog, error) {
// Sanity check that the output indexes can be safely converted.
if ourOutputIndex > math.MaxUint16 {
return nil, ErrOutputIndexTooBig
}
if theirOutputIndex > math.MaxUint16 {
return nil, ErrOutputIndexTooBig
}
rl := &RevocationLog{
OurOutputIndex: uint16(ourOutputIndex),
TheirOutputIndex: uint16(theirOutputIndex),
CommitTxHash: commit.CommitTx.TxHash(),
HTLCEntries: make([]*HTLCEntry, 0, len(commit.Htlcs)),
}
for _, htlc := range commit.Htlcs {
// Skip dust HTLCs.
if htlc.OutputIndex < 0 {
continue
}
// Sanity check that the output indexes can be safely
// converted.
if htlc.OutputIndex > math.MaxUint16 {
return nil, ErrOutputIndexTooBig
}
entry := &HTLCEntry{
RHash: htlc.RHash,
RefundTimeout: htlc.RefundTimeout,
Incoming: htlc.Incoming,
OutputIndex: uint16(htlc.OutputIndex),
Amt: htlc.Amt.ToSatoshis(),
}
rl.HTLCEntries = append(rl.HTLCEntries, entry)
}
return rl, nil
}
// validateMigration checks that the data saved in the new buckets match those
// saved in the old buckets. It does so by checking the last keys saved in both
// buckets can match, given the assumption that the `CommitHeight` is
// monotonically increased value so the last key represents the total number of
// records saved.
func validateMigration(tx kvdb.RwTx) error {
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
// If no bucket is found, we can exit early.
if openChanBucket == nil {
return nil
}
// exitWithErr is a helper closure that prepends an error message with
// the locator info.
exitWithErr := func(l *updateLocator, msg string) error {
return fmt.Errorf("unmatched records found under <nodePub=%x"+
", chainHash=%x, fundingOutpoint=%x>: %v", l.nodePub,
l.chainHash, l.fundingOutpoint, msg)
}
// cb is the callback function to be used when iterating the buckets.
cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
// Read both the old and new revocation log buckets.
oldBucket := chanBucket.NestedReadBucket(
revocationLogBucketDeprecated,
)
newBucket := chanBucket.NestedReadBucket(revocationLogBucket)
// Exit early if the old bucket is nil.
//
// NOTE: the new bucket may not be nil here as new logs might
// have been created using lnd@v0.15.0.
if oldBucket == nil {
return nil
}
// Return an error if the expected new bucket cannot be found.
if newBucket == nil {
return exitWithErr(l, "expected new bucket")
}
// Acquire the cursors.
oldCursor := oldBucket.ReadCursor()
newCursor := newBucket.ReadCursor()
// Jump to the end of the cursors to do a quick check.
newKey, _ := oldCursor.Last()
oldKey, _ := newCursor.Last()
// We expected the CommitHeights to be matched for nodes prior
// to v0.15.0.
if bytes.Equal(newKey, oldKey) {
return nil
}
// If the keys do not match, it's likely the node is running
// v0.15.0 and have new logs created. In this case, we will
// validate that every record in the old bucket can be found in
// the new bucket.
oldKey, _ = oldCursor.First()
for {
// Try to locate the old key in the new bucket and we
// expect it to be found.
newKey, _ := newCursor.Seek(oldKey)
// If the old key is not found in the new bucket,
// return an error.
//
// NOTE: because Seek will return the next key when the
// passed key cannot be found, we need to compare the
// keys to deicde whether the old key is found or not.
if !bytes.Equal(newKey, oldKey) {
errMsg := fmt.Sprintf("old bucket has "+
"CommitHeight=%v cannot be found in "+
"new bucket", oldKey)
return exitWithErr(l, errMsg)
}
// Otherwise, keep iterating the old bucket.
oldKey, _ = oldCursor.Next()
// If we've done iterating, all keys have been matched
// and we can safely exit.
if oldKey == nil {
return nil
}
}
}
return iterateBuckets(openChanBucket, nil, cb)
}

View File

@@ -0,0 +1,574 @@
package migration30
import (
"bytes"
"fmt"
"testing"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type (
beforeMigrationFunc func(db kvdb.Backend) error
afterMigrationFunc func(t *testing.T, db kvdb.Backend)
)
// TestMigrateRevocationLog provide a comprehensive test for the revocation log
// migration. The revocation logs are stored inside a deeply nested bucket, and
// can be accessed via nodePub:chainHash:fundingOutpoint:revocationLogBucket.
// Based on each value in the chain, we'd end up in a different db state. This
// test alters nodePub, fundingOutpoint, and revocationLogBucket to test
// against possible db states, leaving the chainHash staying the same as it's
// less likely to be changed. In specific, we test based on whether we have one
// or two peers(nodePub). For each peer, we test whether we have one or two
// channels(fundingOutpoint). And for each channel, we test 5 cases based on
// the revocation migration states(see buildChannelCases). The total states
// grow quickly and the test may take longer than 5min.
func TestMigrateRevocationLog(t *testing.T) {
t.Parallel()
testCases := make([]*testCase, 0)
// Create two peers, each has two channels.
alice1, alice2 := createTwoChannels()
bob1, bob2 := createTwoChannels()
// Sort the two peers to match the order saved in boltdb.
if bytes.Compare(
alice1.IdentityPub.SerializeCompressed(),
bob1.IdentityPub.SerializeCompressed(),
) > 0 {
alice1, bob1 = bob1, alice1
alice2, bob2 = bob2, alice2
}
// Build test cases for two peers. Each peer is independent so we
// combine the test cases based on its current db state. This would
// create a total of 30x30=900 cases.
for _, p1 := range buildPeerCases(alice1, alice2, false) {
for _, p2 := range buildPeerCases(bob1, bob2, p1.unfinished) {
setups := make([]beforeMigrationFunc, 0)
setups = append(setups, p1.setups...)
setups = append(setups, p2.setups...)
asserters := make([]afterMigrationFunc, 0)
asserters = append(asserters, p1.asserters...)
asserters = append(asserters, p2.asserters...)
name := fmt.Sprintf("alice: %s, bob: %s",
p1.name, p2.name)
tc := &testCase{
name: name,
setups: setups,
asserters: asserters,
}
testCases = append(testCases, tc)
}
}
fmt.Printf("Running %d test cases...\n", len(testCases))
for i, tc := range testCases {
tc := tc
// Construct a test case name that can be easily traced.
name := fmt.Sprintf("case_%d", i)
fmt.Println(name, tc.name)
success := t.Run(name, func(t *testing.T) {
// Log the test's actual name on failure.
t.Log("Test setup: ", tc.name)
beforeMigration := func(db kvdb.Backend) error {
for _, setup := range tc.setups {
if err := setup(db); err != nil {
return err
}
}
return nil
}
afterMigration := func(db kvdb.Backend) error {
for _, asserter := range tc.asserters {
asserter(t, db)
}
return nil
}
migtest.ApplyMigrationWithDb(
t,
beforeMigration,
afterMigration,
MigrateRevocationLog,
)
})
if !success {
return
}
}
}
// TestValidateMigration checks that the function `validateMigration` behaves
// as expected.
func TestValidateMigration(t *testing.T) {
c := createTestChannel(nil)
testCases := []struct {
name string
setup func(db kvdb.Backend) error
expectFail bool
}{
{
// Finished prior to v0.15.0.
name: "valid migration",
setup: func(db kvdb.Backend) error {
return createFinished(db, c, true)
},
expectFail: false,
},
{
// Finished after to v0.15.0.
name: "valid migration after v0.15.0",
setup: func(db kvdb.Backend) error {
return createFinished(db, c, false)
},
expectFail: false,
},
{
// Missing logs prior to v0.15.0.
name: "invalid migration",
setup: func(db kvdb.Backend) error {
return createNotFinished(db, c, true)
},
expectFail: true,
},
{
// Missing logs after to v0.15.0.
name: "invalid migration after v0.15.0",
setup: func(db kvdb.Backend) error {
return createNotFinished(db, c, false)
},
expectFail: true,
},
}
for _, tc := range testCases {
tc := tc
// Create a test db.
cdb, cleanUp, err := migtest.MakeDB()
defer cleanUp()
require.NoError(t, err, "failed to create test db")
t.Run(tc.name, func(t *testing.T) {
// Setup test logs.
err := tc.setup(cdb)
require.NoError(t, err, "failed to setup")
// Call the actual function and check the error is
// returned as expected.
err = kvdb.Update(cdb, validateMigration, func() {})
if tc.expectFail {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
// createTwoChannels creates two channels that have the same chainHash and
// IdentityPub, simulating having two channels under the same peer.
func createTwoChannels() (*mig26.OpenChannel, *mig26.OpenChannel) {
// Create two channels under the same peer.
c1 := createTestChannel(nil)
c2 := createTestChannel(c1.IdentityPub)
// If c1 is greater than c2, boltdb will put c2 before c1.
if bytes.Compare(
c1.FundingOutpoint.Hash[:],
c2.FundingOutpoint.Hash[:],
) > 0 {
c1, c2 = c2, c1
}
return c1, c2
}
// channelTestCase defines a single test case given a particular channel state.
type channelTestCase struct {
name string
setup beforeMigrationFunc
asserter afterMigrationFunc
unfinished bool
}
// buildChannelCases builds five channel test cases. These cases can be viewed
// as basic units that are used to build more complex test cases based on
// number of channels and peers.
func buildChannelCases(c *mig26.OpenChannel,
overwrite bool) []*channelTestCase {
// assertNewLogs is a helper closure that checks the old bucket and the
// two new logs are saved.
assertNewLogs := func(t *testing.T, db kvdb.Backend) {
// Check that the old bucket is removed.
assertOldLogBucketDeleted(t, db, c)
l := fetchNewLog(t, db, c, logHeight1)
assertRevocationLog(t, newLog1, l)
l = fetchNewLog(t, db, c, logHeight2)
assertRevocationLog(t, newLog2, l)
}
// case1 defines a case where we don't have a chanBucket.
case1 := &channelTestCase{
name: "no channel",
setup: func(db kvdb.Backend) error {
return setupTestLogs(db, nil, nil, nil)
},
// No need to assert anything.
asserter: func(t *testing.T, db kvdb.Backend) {},
}
// case2 defines a case when the chanBucket has no old revocation logs.
case2 := &channelTestCase{
name: "empty old logs",
setup: func(db kvdb.Backend) error {
return setupTestLogs(db, c, nil, nil)
},
// No need to assert anything.
asserter: func(t *testing.T, db kvdb.Backend) {},
}
// case3 defines a case when the chanBucket has finished its migration.
case3 := &channelTestCase{
name: "finished migration",
setup: func(db kvdb.Backend) error {
return createFinished(db, c, true)
},
asserter: func(t *testing.T, db kvdb.Backend) {
// Check that the old bucket is removed.
assertOldLogBucketDeleted(t, db, c)
// Fetch the new log. We should see
// OurOutputIndex matching the testOurIndex
// value, indicating that for migrated logs we
// won't touch them.
//
// NOTE: when the log is created before
// migration, OurOutputIndex would be
// testOurIndex rather than OutputIndexEmpty.
l := fetchNewLog(t, db, c, logHeight1)
require.EqualValues(
t, testOurIndex, l.OurOutputIndex,
"expected log to be NOT overwritten",
)
// Fetch the new log. We should see
// TheirOutputIndex matching the testTheirIndex
// value, indicating that for migrated logs we
// won't touch them.
//
// NOTE: when the log is created before
// migration, TheirOutputIndex would be
// testTheirIndex rather than OutputIndexEmpty.
l = fetchNewLog(t, db, c, logHeight2)
require.EqualValues(
t, testTheirIndex, l.TheirOutputIndex,
"expected log to be NOT overwritten",
)
},
}
// case4 defines a case when the chanBucket has both old and new logs,
// which happens when the migration is ongoing.
case4 := &channelTestCase{
name: "unfinished migration",
setup: func(db kvdb.Backend) error {
return createNotFinished(db, c, true)
},
asserter: func(t *testing.T, db kvdb.Backend) {
// Check that the old bucket is removed.
assertOldLogBucketDeleted(t, db, c)
// Fetch the new log. We should see
// OurOutputIndex matching the testOurIndex
// value, indicating that for migrated logs we
// won't touch them.
//
// NOTE: when the log is created before
// migration, OurOutputIndex would be
// testOurIndex rather than OutputIndexEmpty.
l := fetchNewLog(t, db, c, logHeight1)
require.EqualValues(
t, testOurIndex, l.OurOutputIndex,
"expected log to be NOT overwritten",
)
// We expect to have one new log.
l = fetchNewLog(t, db, c, logHeight2)
assertRevocationLog(t, newLog2, l)
},
unfinished: true,
}
// case5 defines a case when the chanBucket has no new logs, which
// happens when we haven't migrated anything for this bucket yet.
case5 := &channelTestCase{
name: "initial migration",
setup: func(db kvdb.Backend) error {
return createNotStarted(db, c, true)
},
asserter: assertNewLogs,
unfinished: true,
}
// Check that the already migrated logs are overwritten. For two
// channels sorted and stored in boltdb, when the first channel has
// unfinished migrations, even channel two has migrated logs, they will
// be overwritten to make sure the data stay consistent.
if overwrite {
case3.name += " overwritten"
case3.asserter = assertNewLogs
case4.name += " overwritten"
case4.asserter = assertNewLogs
}
return []*channelTestCase{case1, case2, case3, case4, case5}
}
// testCase defines a case for a particular db state that we want to test based
// on whether we have one or two peers, one or two channels for each peer, and
// the particular state for each channel.
type testCase struct {
// name has the format: peer: [channel state].
name string
// setups is a list of setup functions we'd run sequentially to provide
// the initial db state.
setups []beforeMigrationFunc
// asserters is a list of assertions we'd perform after the migration
// function has been called.
asserters []afterMigrationFunc
// unfinished specifies that the test case is testing a case where the
// revocation migration is considered unfinished. This is useful if
// it's used to construct a larger test case where there's a following
// case with a state of finished, we can then test that the revocation
// logs are overwritten even if the state says finished.
unfinished bool
}
// buildPeerCases builds test cases based on whether we have one or two
// channels saved under this peer. When there's one channel, we have 5 states,
// and when there are two, we have 25 states, a total of 30 cases.
func buildPeerCases(c1, c2 *mig26.OpenChannel, unfinished bool) []*testCase {
testCases := make([]*testCase, 0)
// Single peer with one channel.
for _, c := range buildChannelCases(c1, unfinished) {
name := fmt.Sprintf("[channel: %s]", c.name)
tc := &testCase{
name: name,
setups: []beforeMigrationFunc{c.setup},
asserters: []afterMigrationFunc{c.asserter},
unfinished: c.unfinished,
}
testCases = append(testCases, tc)
}
// Single peer with two channels.
testCases = append(
testCases, buildTwoChannelCases(c1, c2, unfinished)...,
)
return testCases
}
// buildTwoChannelCases takes two channels to build test cases that covers all
// combinations of the two channels' state. Since each channel has 5 states,
// this will give us a total 25 states.
func buildTwoChannelCases(c1, c2 *mig26.OpenChannel,
unfinished bool) []*testCase {
testCases := make([]*testCase, 0)
// buildCase is a helper closure that contructs a test case based on
// the two smaller test cases.
buildCase := func(tc1, tc2 *channelTestCase) {
setups := make([]beforeMigrationFunc, 0)
setups = append(setups, tc1.setup)
setups = append(setups, tc2.setup)
asserters := make([]afterMigrationFunc, 0)
asserters = append(asserters, tc1.asserter)
asserters = append(asserters, tc2.asserter)
// If any of the test cases has unfinished state, the test case
// would have a state of unfinished, indicating any peers after
// this one must overwrite their revocation logs.
unfinished := tc1.unfinished || tc2.unfinished
name := fmt.Sprintf("[channelOne: %s] [channelTwo: %s]",
tc1.name, tc2.name)
tc := &testCase{
name: name,
setups: setups,
asserters: asserters,
unfinished: unfinished,
}
testCases = append(testCases, tc)
}
// Build channel cases for both of the channels and combine them.
for _, tc1 := range buildChannelCases(c1, unfinished) {
// The second channel's already migrated logs will be
// overwritten if the first channel has unfinished state, which
// are case4 and case5.
unfinished := unfinished || tc1.unfinished
for _, tc2 := range buildChannelCases(c2, unfinished) {
buildCase(tc1, tc2)
}
}
return testCases
}
// assertOldLogBucketDeleted asserts that the given channel's old revocation
// log bucket doesn't exist.
func assertOldLogBucketDeleted(t testing.TB, cdb kvdb.Backend,
c *mig26.OpenChannel) {
var logBucket kvdb.RBucket
err := kvdb.Update(cdb, func(tx kvdb.RwTx) error {
chanBucket, err := mig25.FetchChanBucket(tx, &c.OpenChannel)
if err != nil {
return err
}
logBucket = chanBucket.NestedReadBucket(
revocationLogBucketDeprecated,
)
return err
}, func() {})
require.NoError(t, err, "read bucket failed")
require.Nil(t, logBucket, "expected old bucket to be deleted")
}
// fetchNewLog asserts a revocation log can be found using the given updateNum
// for the specified channel.
func fetchNewLog(t testing.TB, cdb kvdb.Backend,
c *mig26.OpenChannel, updateNum uint64) RevocationLog {
var newLog RevocationLog
err := kvdb.Update(cdb, func(tx kvdb.RwTx) error {
chanBucket, err := mig25.FetchChanBucket(tx, &c.OpenChannel)
if err != nil {
return err
}
logBucket, err := fetchLogBucket(chanBucket)
if err != nil {
return err
}
newLog, err = fetchRevocationLog(logBucket, updateNum)
return err
}, func() {})
require.NoError(t, err, "failed to query revocation log")
return newLog
}
// assertRevocationLog asserts two revocation logs are equal.
func assertRevocationLog(t testing.TB, want, got RevocationLog) {
require.Equal(t, want.OurOutputIndex, got.OurOutputIndex,
"wrong OurOutputIndex")
require.Equal(t, want.TheirOutputIndex, got.TheirOutputIndex,
"wrong TheirOutputIndex")
require.Equal(t, want.CommitTxHash, got.CommitTxHash,
"wrong CommitTxHash")
require.Equal(t, len(want.HTLCEntries), len(got.HTLCEntries),
"wrong HTLCEntries length")
for i, expectedHTLC := range want.HTLCEntries {
htlc := got.HTLCEntries[i]
require.Equal(t, expectedHTLC.Amt, htlc.Amt, "wrong Amt")
require.Equal(t, expectedHTLC.RHash, htlc.RHash, "wrong RHash")
require.Equal(t, expectedHTLC.Incoming, htlc.Incoming,
"wrong Incoming")
require.Equal(t, expectedHTLC.OutputIndex, htlc.OutputIndex,
"wrong OutputIndex")
require.Equal(t, expectedHTLC.RefundTimeout, htlc.RefundTimeout,
"wrong RefundTimeout")
}
}
// BenchmarkMigration creates a benchmark test for the migration. The test uses
// the flag `-benchtime` to specify how many revocation logs we want to test.
func BenchmarkMigration(b *testing.B) {
// Stop the timer and start it again later when the actual migration
// starts.
b.StopTimer()
// Gather number of records by reading `-benchtime` flag.
numLogs := b.N
// Create a mock store.
mockStore := &mockStore{}
mockStore.On("AddNextEntry", mock.Anything).Return(nil)
mockStore.On("Encode", mock.Anything).Return(nil)
// Build the test data.
oldLogs := make([]mig.ChannelCommitment, numLogs)
beforeMigration := func(db kvdb.Backend) error {
fmt.Printf("\nBuilding test data for %d logs...\n", numLogs)
defer fmt.Println("Finished building test data, migrating...")
// We use a mock store here to bypass the check in
// `AddNextEntry` so we don't need a "read" preimage here. This
// shouldn't affect our benchmark result as the migration will
// load the actual store from db.
c := createTestChannel(nil)
c.RevocationStore = mockStore
// Create the test logs.
for i := 0; i < numLogs; i++ {
oldLog := oldLog2
oldLog.CommitHeight = uint64(i)
oldLogs[i] = oldLog
}
return setupTestLogs(db, c, oldLogs, nil)
}
// Run the migration test.
migtest.ApplyMigrationWithDb(
b,
beforeMigration,
nil,
func(db kvdb.Backend) error {
b.StartTimer()
defer b.StopTimer()
return MigrateRevocationLog(db)
},
)
}

View File

@@ -0,0 +1,551 @@
package migration30
import (
"bytes"
"errors"
"io"
"math"
mig24 "github.com/lightningnetwork/lnd/channeldb/migration24"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/btcsuite/btcd/btcutil"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/tlv"
)
// OutputIndexEmpty is used when the output index doesn't exist.
const OutputIndexEmpty = math.MaxUint16
var (
// revocationLogBucketDeprecated is dedicated for storing the necessary
// delta state between channel updates required to re-construct a past
// state in order to punish a counterparty attempting a non-cooperative
// channel closure. This key should be accessed from within the
// sub-bucket of a target channel, identified by its channel point.
//
// Deprecated: This bucket is kept for read-only in case the user
// choose not to migrate the old data.
revocationLogBucketDeprecated = []byte("revocation-log-key")
// revocationLogBucket is a sub-bucket under openChannelBucket. This
// sub-bucket is dedicated for storing the minimal info required to
// re-construct a past state in order to punish a counterparty
// attempting a non-cooperative channel closure.
revocationLogBucket = []byte("revocation-log")
// revocationStateKey stores their current revocation hash, our
// preimage producer and their preimage store.
revocationStateKey = []byte("revocation-state-key")
// ErrNoRevocationsFound is returned when revocation state for a
// particular channel cannot be found.
ErrNoRevocationsFound = errors.New("no revocations found")
// ErrLogEntryNotFound is returned when we cannot find a log entry at
// the height requested in the revocation log.
ErrLogEntryNotFound = errors.New("log entry not found")
// ErrOutputIndexTooBig is returned when the output index is greater
// than uint16.
ErrOutputIndexTooBig = errors.New("output index is over uint16")
)
// HTLCEntry specifies the minimal info needed to be stored on disk for ALL the
// historical HTLCs, which is useful for constructing RevocationLog when a
// breach is detected.
// The actual size of each HTLCEntry varies based on its RHash and Amt(sat),
// summarized as follows,
//
// | RHash empty | Amt<=252 | Amt<=65,535 | Amt<=4,294,967,295 | otherwise |
// |:-----------:|:--------:|:-----------:|:------------------:|:---------:|
// | true | 19 | 21 | 23 | 26 |
// | false | 51 | 53 | 55 | 58 |
//
// So the size varies from 19 bytes to 58 bytes, where most likely to be 23 or
// 55 bytes.
//
// NOTE: all the fields saved to disk use the primitive go types so they can be
// made into tlv records without further conversion.
type HTLCEntry struct {
// RHash is the payment hash of the HTLC.
RHash [32]byte
// RefundTimeout is the absolute timeout on the HTLC that the sender
// must wait before reclaiming the funds in limbo.
RefundTimeout uint32
// OutputIndex is the output index for this particular HTLC output
// within the commitment transaction.
//
// NOTE: we use uint16 instead of int32 here to save us 2 bytes, which
// gives us a max number of HTLCs of 65K.
OutputIndex uint16
// Incoming denotes whether we're the receiver or the sender of this
// HTLC.
//
// NOTE: this field is the memory representation of the field
// incomingUint.
Incoming bool
// Amt is the amount of satoshis this HTLC escrows.
//
// NOTE: this field is the memory representation of the field amtUint.
Amt btcutil.Amount
// amtTlv is the uint64 format of Amt. This field is created so we can
// easily make it into a tlv record and save it to disk.
//
// NOTE: we keep this field for accounting purpose only. If the disk
// space becomes an issue, we could delete this field to save us extra
// 8 bytes.
amtTlv uint64
// incomingTlv is the uint8 format of Incoming. This field is created
// so we can easily make it into a tlv record and save it to disk.
incomingTlv uint8
}
// RHashLen is used by MakeDynamicRecord to return the size of the RHash.
//
// NOTE: for zero hash, we return a length 0.
func (h *HTLCEntry) RHashLen() uint64 {
if h.RHash == lntypes.ZeroHash {
return 0
}
return 32
}
// RHashEncoder is the customized encoder which skips encoding the empty hash.
func RHashEncoder(w io.Writer, val interface{}, buf *[8]byte) error {
v, ok := val.(*[32]byte)
if !ok {
return tlv.NewTypeForEncodingErr(val, "RHash")
}
// If the value is an empty hash, we will skip encoding it.
if *v == lntypes.ZeroHash {
return nil
}
return tlv.EBytes32(w, v, buf)
}
// RHashDecoder is the customized decoder which skips decoding the empty hash.
func RHashDecoder(r io.Reader, val interface{}, buf *[8]byte, l uint64) error {
v, ok := val.(*[32]byte)
if !ok {
return tlv.NewTypeForEncodingErr(val, "RHash")
}
// If the length is zero, we will skip encoding the empty hash.
if l == 0 {
return nil
}
return tlv.DBytes32(r, v, buf, 32)
}
// toTlvStream converts an HTLCEntry record into a tlv representation.
func (h *HTLCEntry) toTlvStream() (*tlv.Stream, error) {
const (
// A set of tlv type definitions used to serialize htlc entries
// to the database. We define it here instead of the head of
// the file to avoid naming conflicts.
//
// NOTE: A migration should be added whenever this list
// changes.
rHashType tlv.Type = 0
refundTimeoutType tlv.Type = 1
outputIndexType tlv.Type = 2
incomingType tlv.Type = 3
amtType tlv.Type = 4
)
return tlv.NewStream(
tlv.MakeDynamicRecord(
rHashType, &h.RHash, h.RHashLen,
RHashEncoder, RHashDecoder,
),
tlv.MakePrimitiveRecord(
refundTimeoutType, &h.RefundTimeout,
),
tlv.MakePrimitiveRecord(
outputIndexType, &h.OutputIndex,
),
tlv.MakePrimitiveRecord(incomingType, &h.incomingTlv),
// We will save 3 bytes if the amount is less or equal to
// 4,294,967,295 msat, or roughly 0.043 bitcoin.
tlv.MakeBigSizeRecord(amtType, &h.amtTlv),
)
}
// RevocationLog stores the info needed to construct a breach retribution. Its
// fields can be viewed as a subset of a ChannelCommitment's. In the database,
// all historical versions of the RevocationLog are saved using the
// CommitHeight as the key.
//
// NOTE: all the fields use the primitive go types so they can be made into tlv
// records without further conversion.
type RevocationLog struct {
// OurOutputIndex specifies our output index in this commitment. In a
// remote commitment transaction, this is the to remote output index.
OurOutputIndex uint16
// TheirOutputIndex specifies their output index in this commitment. In
// a remote commitment transaction, this is the to local output index.
TheirOutputIndex uint16
// CommitTxHash is the hash of the latest version of the commitment
// state, broadcast able by us.
CommitTxHash [32]byte
// HTLCEntries is the set of HTLCEntry's that are pending at this
// particular commitment height.
HTLCEntries []*HTLCEntry
}
// toTlvStream converts an RevocationLog record into a tlv representation.
func (rl *RevocationLog) toTlvStream() (*tlv.Stream, error) {
const (
// A set of tlv type definitions used to serialize the body of
// revocation logs to the database. We define it here instead
// of the head of the file to avoid naming conflicts.
//
// NOTE: A migration should be added whenever this list
// changes.
ourOutputIndexType tlv.Type = 0
theirOutputIndexType tlv.Type = 1
commitTxHashType tlv.Type = 2
)
return tlv.NewStream(
tlv.MakePrimitiveRecord(ourOutputIndexType, &rl.OurOutputIndex),
tlv.MakePrimitiveRecord(
theirOutputIndexType, &rl.TheirOutputIndex,
),
tlv.MakePrimitiveRecord(commitTxHashType, &rl.CommitTxHash),
)
}
// putRevocationLog uses the fields `CommitTx` and `Htlcs` from a
// ChannelCommitment to construct a revocation log entry and saves them to
// disk. It also saves our output index and their output index, which are
// useful when creating breach retribution.
func putRevocationLog(bucket kvdb.RwBucket, commit *mig.ChannelCommitment,
ourOutputIndex, theirOutputIndex uint32) error {
// Sanity check that the output indexes can be safely converted.
if ourOutputIndex > math.MaxUint16 {
return ErrOutputIndexTooBig
}
if theirOutputIndex > math.MaxUint16 {
return ErrOutputIndexTooBig
}
rl := &RevocationLog{
OurOutputIndex: uint16(ourOutputIndex),
TheirOutputIndex: uint16(theirOutputIndex),
CommitTxHash: commit.CommitTx.TxHash(),
HTLCEntries: make([]*HTLCEntry, 0, len(commit.Htlcs)),
}
for _, htlc := range commit.Htlcs {
// Skip dust HTLCs.
if htlc.OutputIndex < 0 {
continue
}
// Sanity check that the output indexes can be safely
// converted.
if htlc.OutputIndex > math.MaxUint16 {
return ErrOutputIndexTooBig
}
entry := &HTLCEntry{
RHash: htlc.RHash,
RefundTimeout: htlc.RefundTimeout,
Incoming: htlc.Incoming,
OutputIndex: uint16(htlc.OutputIndex),
Amt: htlc.Amt.ToSatoshis(),
}
rl.HTLCEntries = append(rl.HTLCEntries, entry)
}
var b bytes.Buffer
err := serializeRevocationLog(&b, rl)
if err != nil {
return err
}
logEntrykey := mig24.MakeLogKey(commit.CommitHeight)
return bucket.Put(logEntrykey[:], b.Bytes())
}
// fetchRevocationLog queries the revocation log bucket to find an log entry.
// Return an error if not found.
func fetchRevocationLog(log kvdb.RBucket,
updateNum uint64) (RevocationLog, error) {
logEntrykey := mig24.MakeLogKey(updateNum)
commitBytes := log.Get(logEntrykey[:])
if commitBytes == nil {
return RevocationLog{}, ErrLogEntryNotFound
}
commitReader := bytes.NewReader(commitBytes)
return deserializeRevocationLog(commitReader)
}
// serializeRevocationLog serializes a RevocationLog record based on tlv
// format.
func serializeRevocationLog(w io.Writer, rl *RevocationLog) error {
// Create the tlv stream.
tlvStream, err := rl.toTlvStream()
if err != nil {
return err
}
// Write the tlv stream.
if err := writeTlvStream(w, tlvStream); err != nil {
return err
}
// Write the HTLCs.
return serializeHTLCEntries(w, rl.HTLCEntries)
}
// serializeHTLCEntries serializes a list of HTLCEntry records based on tlv
// format.
func serializeHTLCEntries(w io.Writer, htlcs []*HTLCEntry) error {
for _, htlc := range htlcs {
// Patch the incomingTlv field.
if htlc.Incoming {
htlc.incomingTlv = 1
}
// Patch the amtTlv field.
htlc.amtTlv = uint64(htlc.Amt)
// Create the tlv stream.
tlvStream, err := htlc.toTlvStream()
if err != nil {
return err
}
// Write the tlv stream.
if err := writeTlvStream(w, tlvStream); err != nil {
return err
}
}
return nil
}
// deserializeRevocationLog deserializes a RevocationLog based on tlv format.
func deserializeRevocationLog(r io.Reader) (RevocationLog, error) {
var rl RevocationLog
// Create the tlv stream.
tlvStream, err := rl.toTlvStream()
if err != nil {
return rl, err
}
// Read the tlv stream.
if err := readTlvStream(r, tlvStream); err != nil {
return rl, err
}
// Read the HTLC entries.
rl.HTLCEntries, err = deserializeHTLCEntries(r)
return rl, err
}
// deserializeHTLCEntries deserializes a list of HTLC entries based on tlv
// format.
func deserializeHTLCEntries(r io.Reader) ([]*HTLCEntry, error) {
var htlcs []*HTLCEntry
for {
var htlc HTLCEntry
// Create the tlv stream.
tlvStream, err := htlc.toTlvStream()
if err != nil {
return nil, err
}
// Read the HTLC entry.
if err := readTlvStream(r, tlvStream); err != nil {
// We've reached the end when hitting an EOF.
if err == io.ErrUnexpectedEOF {
break
}
return nil, err
}
// Patch the Incoming field.
if htlc.incomingTlv == 1 {
htlc.Incoming = true
}
// Patch the Amt field.
htlc.Amt = btcutil.Amount(htlc.amtTlv)
// Append the entry.
htlcs = append(htlcs, &htlc)
}
return htlcs, nil
}
// writeTlvStream is a helper function that encodes the tlv stream into the
// writer.
func writeTlvStream(w io.Writer, s *tlv.Stream) error {
var b bytes.Buffer
if err := s.Encode(&b); err != nil {
return err
}
// Write the stream's length as a varint.
err := tlv.WriteVarInt(w, uint64(b.Len()), &[8]byte{})
if err != nil {
return err
}
if _, err = w.Write(b.Bytes()); err != nil {
return err
}
return nil
}
// readTlvStream is a helper function that decodes the tlv stream from the
// reader.
func readTlvStream(r io.Reader, s *tlv.Stream) error {
var bodyLen uint64
// Read the stream's length.
bodyLen, err := tlv.ReadVarInt(r, &[8]byte{})
switch {
// We'll convert any EOFs to ErrUnexpectedEOF, since this results in an
// invalid record.
case err == io.EOF:
return io.ErrUnexpectedEOF
// Other unexpected errors.
case err != nil:
return err
}
// TODO(yy): add overflow check.
lr := io.LimitReader(r, int64(bodyLen))
return s.Decode(lr)
}
// fetchLogBucket returns a read bucket by visiting both the new and the old
// bucket.
func fetchLogBucket(chanBucket kvdb.RBucket) (kvdb.RBucket, error) {
logBucket := chanBucket.NestedReadBucket(revocationLogBucket)
if logBucket == nil {
logBucket = chanBucket.NestedReadBucket(
revocationLogBucketDeprecated,
)
if logBucket == nil {
return nil, mig25.ErrNoPastDeltas
}
}
return logBucket, nil
}
// putOldRevocationLog saves a revocation log using the old format.
func putOldRevocationLog(log kvdb.RwBucket,
commit *mig.ChannelCommitment) error {
var b bytes.Buffer
if err := mig.SerializeChanCommit(&b, commit); err != nil {
return err
}
logEntrykey := mig24.MakeLogKey(commit.CommitHeight)
return log.Put(logEntrykey[:], b.Bytes())
}
func putChanRevocationState(chanBucket kvdb.RwBucket,
channel *mig26.OpenChannel) error {
var b bytes.Buffer
err := mig.WriteElements(
&b, channel.RemoteCurrentRevocation, channel.RevocationProducer,
channel.RevocationStore,
)
if err != nil {
return err
}
// TODO(roasbeef): don't keep producer on disk
// If the next revocation is present, which is only the case after the
// FundingLocked message has been sent, then we'll write it to disk.
if channel.RemoteNextRevocation != nil {
err = mig.WriteElements(&b, channel.RemoteNextRevocation)
if err != nil {
return err
}
}
return chanBucket.Put(revocationStateKey, b.Bytes())
}
func fetchChanRevocationState(chanBucket kvdb.RBucket,
c *mig26.OpenChannel) error {
revBytes := chanBucket.Get(revocationStateKey)
if revBytes == nil {
return ErrNoRevocationsFound
}
r := bytes.NewReader(revBytes)
err := mig.ReadElements(
r, &c.RemoteCurrentRevocation, &c.RevocationProducer,
&c.RevocationStore,
)
if err != nil {
return err
}
// If there aren't any bytes left in the buffer, then we don't yet have
// the next remote revocation, so we can exit early here.
if r.Len() == 0 {
return nil
}
// Otherwise we'll read the next revocation for the remote party which
// is always the last item within the buffer.
return mig.ReadElements(r, &c.RemoteNextRevocation)
}
func findOutputIndexes(chanState *mig26.OpenChannel,
oldLog *mig.ChannelCommitment) (uint32, uint32, error) {
// With the state number broadcast known, we can now derive/restore the
// proper revocation preimage necessary to sweep the remote party's
// output.
revocationPreimage, err := chanState.RevocationStore.LookUp(
oldLog.CommitHeight,
)
if err != nil {
return 0, 0, err
}
return findOutputIndexesFromRemote(
revocationPreimage, chanState, oldLog,
)
}

View File

@@ -0,0 +1,51 @@
package migration30
import (
"encoding/binary"
"io"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/shachain"
"github.com/stretchr/testify/mock"
)
// mockStore mocks the shachain.Store.
type mockStore struct {
mock.Mock
}
// A compile time check to ensure mockStore implements the Store interface.
var _ shachain.Store = (*mockStore)(nil)
func (m *mockStore) LookUp(height uint64) (*chainhash.Hash, error) {
args := m.Called(height)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*chainhash.Hash), args.Error(1)
}
func (m *mockStore) AddNextEntry(preimage *chainhash.Hash) error {
args := m.Called(preimage)
return args.Error(0)
}
// Encode encodes a series of dummy values to pass the serialize/deserialize
// process.
func (m *mockStore) Encode(w io.Writer) error {
err := binary.Write(w, binary.BigEndian, int8(1))
if err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, uint64(0)); err != nil {
return err
}
if _, err = w.Write(preimage2); err != nil {
return err
}
return binary.Write(w, binary.BigEndian, uint64(0))
}

View File

@@ -0,0 +1,554 @@
package migration30
import (
"bytes"
"fmt"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/shachain"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
)
var (
testChainHash = chainhash.Hash{1, 2, 3}
testChanType = mig25.SingleFunderTweaklessBit
// testOurIndex and testTheirIndex are artificial indexes that're saved
// to db during test setup. They are different from indexes populated
// from the actual migration process so we can check whether a new
// revocation log is overwritten or not.
testOurIndex = uint32(100)
testTheirIndex = uint32(200)
// dummyInput is used in our commit tx.
dummyInput = &wire.TxIn{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: 0xffffffff,
},
Sequence: 0xffffffff,
}
// htlcScript is the PkScript used in the HTLC output. This script
// corresponds to revocation preimage2.
htlcScript = []byte{
0x0, 0x20, 0x3d, 0x51, 0x66, 0xda, 0x39, 0x93,
0x7b, 0x49, 0xaf, 0x2, 0xf2, 0x2f, 0x90, 0x52,
0x8e, 0x45, 0x24, 0x34, 0x8f, 0xd8, 0x76, 0x7,
0x5a, 0xfc, 0x52, 0x8d, 0x68, 0xdd, 0xbc, 0xce,
0x3e, 0x5d,
}
// toLocalScript is the PkScript used in to-local output.
toLocalScript = []byte{
0x0, 0x14, 0xc6, 0x9, 0x62, 0xab, 0x60, 0xbe,
0x40, 0xd, 0xab, 0x31, 0xc, 0x13, 0x14, 0x15,
0x93, 0xe6, 0xa2, 0x94, 0xe4, 0x2a,
}
// preimage1 defines a revocation preimage, generated from itest.
preimage1 = []byte{
0x95, 0xb4, 0x7c, 0x5a, 0x2b, 0xfd, 0x6f, 0xf4,
0x70, 0x8, 0xc, 0x70, 0x82, 0x36, 0xc8, 0x5,
0x88, 0x16, 0xaf, 0x29, 0xb5, 0x8, 0xfd, 0x5a,
0x40, 0x28, 0x24, 0xc, 0x2a, 0x7f, 0x96, 0xcd,
}
// commitTx1 is the tx saved in the first old revocation.
commitTx1 = &wire.MsgTx{
Version: 2,
// Add a dummy input.
TxIn: []*wire.TxIn{dummyInput},
TxOut: []*wire.TxOut{
{
Value: 990_950,
PkScript: toLocalScript,
},
},
}
// logHeight1 is the CommitHeight used by oldLog1.
logHeight1 = uint64(0)
// oldLog1 defines an old revocation that has no HTLCs.
oldLog1 = mig.ChannelCommitment{
CommitHeight: logHeight1,
LocalLogIndex: 0,
LocalHtlcIndex: 0,
RemoteLogIndex: 0,
RemoteHtlcIndex: 0,
LocalBalance: lnwire.MilliSatoshi(990_950_000),
RemoteBalance: 0,
CommitTx: commitTx1,
}
// newLog1 is the new version of oldLog1.
newLog1 = RevocationLog{
OurOutputIndex: 0,
TheirOutputIndex: OutputIndexEmpty,
CommitTxHash: commitTx1.TxHash(),
}
// preimage2 defines the second revocation preimage used in the test,
// generated from itest.
preimage2 = []byte{
0xac, 0x60, 0x7a, 0x59, 0x9, 0xd6, 0x11, 0xb2,
0xf5, 0x6e, 0xaa, 0xc6, 0xb9, 0x0, 0x12, 0xdc,
0xf0, 0x89, 0x58, 0x90, 0x8a, 0xa2, 0xc6, 0xfc,
0xf1, 0x2, 0x74, 0x87, 0x30, 0x51, 0x5e, 0xea,
}
// commitTx2 is the tx saved in the second old revocation.
commitTx2 = &wire.MsgTx{
Version: 2,
// Add a dummy input.
TxIn: []*wire.TxIn{dummyInput},
TxOut: []*wire.TxOut{
{
Value: 100_000,
PkScript: htlcScript,
},
{
Value: 888_800,
PkScript: toLocalScript,
},
},
}
// rHash is the payment hash used in the htlc below.
rHash = [32]byte{
0x42, 0x5e, 0xd4, 0xe4, 0xa3, 0x6b, 0x30, 0xea,
0x21, 0xb9, 0xe, 0x21, 0xc7, 0x12, 0xc6, 0x49,
0xe8, 0x21, 0x4c, 0x29, 0xb7, 0xea, 0xf6, 0x80,
0x89, 0xd1, 0x3, 0x9c, 0x6e, 0x55, 0x38, 0x4c,
}
// htlc defines an HTLC that's saved in the old revocation log.
htlc = mig.HTLC{
RHash: rHash,
Amt: lnwire.MilliSatoshi(100_000_000),
RefundTimeout: 489,
OutputIndex: 0,
Incoming: false,
OnionBlob: bytes.Repeat([]byte{0xff}, 1366),
HtlcIndex: 0,
LogIndex: 0,
}
// logHeight2 is the CommitHeight used by oldLog2.
logHeight2 = uint64(1)
// oldLog2 defines an old revocation that has one HTLC.
oldLog2 = mig.ChannelCommitment{
CommitHeight: logHeight2,
LocalLogIndex: 1,
LocalHtlcIndex: 1,
RemoteLogIndex: 0,
RemoteHtlcIndex: 0,
LocalBalance: lnwire.MilliSatoshi(888_800_000),
RemoteBalance: 0,
CommitTx: commitTx2,
Htlcs: []mig.HTLC{htlc},
}
// newLog2 is the new version of the oldLog2.
newLog2 = RevocationLog{
OurOutputIndex: 1,
TheirOutputIndex: OutputIndexEmpty,
CommitTxHash: commitTx2.TxHash(),
HTLCEntries: []*HTLCEntry{
{
RHash: rHash,
RefundTimeout: 489,
OutputIndex: 0,
Incoming: false,
Amt: btcutil.Amount(100_000),
},
},
}
// newLog3 defines an revocation log that's been created after v0.15.0.
newLog3 = mig.ChannelCommitment{
CommitHeight: logHeight2 + 1,
LocalLogIndex: 1,
LocalHtlcIndex: 1,
RemoteLogIndex: 0,
RemoteHtlcIndex: 0,
LocalBalance: lnwire.MilliSatoshi(888_800_000),
RemoteBalance: 0,
CommitTx: commitTx2,
Htlcs: []mig.HTLC{htlc},
}
// The following public keys are taken from the itest results.
localMusigKey, _ = btcec.ParsePubKey([]byte{
0x2,
0xda, 0x42, 0xa4, 0x4a, 0x6b, 0x42, 0xfe, 0xcb,
0x2f, 0x7e, 0x35, 0x89, 0x99, 0xdd, 0x43, 0xba,
0x4b, 0xf1, 0x9c, 0xf, 0x18, 0xef, 0x9, 0x83,
0x35, 0x31, 0x59, 0xa4, 0x3b, 0xde, 0xa, 0xde,
})
localRevocationBasePoint, _ = btcec.ParsePubKey([]byte{
0x2,
0x6, 0x16, 0xd1, 0xb1, 0x4f, 0xee, 0x11, 0x86,
0x55, 0xfe, 0x31, 0x66, 0x6f, 0x43, 0x1, 0x80,
0xa8, 0xa7, 0x5c, 0x2, 0x92, 0xe5, 0x7c, 0x4,
0x31, 0xa6, 0xcf, 0x43, 0xb6, 0xdb, 0xe6, 0x10,
})
localPaymentBasePoint, _ = btcec.ParsePubKey([]byte{
0x2,
0x88, 0x65, 0x16, 0xc2, 0x37, 0x3f, 0xc5, 0x16,
0x62, 0x71, 0x0, 0xdd, 0x4d, 0x43, 0x28, 0x43,
0x32, 0x91, 0x75, 0xcc, 0xd8, 0x81, 0xb6, 0xb0,
0xd8, 0x96, 0x78, 0xad, 0x18, 0x3b, 0x16, 0xe1,
})
localDelayBasePoint, _ = btcec.ParsePubKey([]byte{
0x2,
0xea, 0x41, 0x48, 0x11, 0x2, 0x59, 0xe3, 0x5c,
0x51, 0x15, 0x90, 0x25, 0x4a, 0x61, 0x5, 0x51,
0xb3, 0x8, 0xe9, 0xd5, 0xf, 0xc6, 0x91, 0x25,
0x14, 0xd2, 0xcf, 0xc8, 0xc5, 0x5b, 0xd9, 0x88,
})
localHtlcBasePoint, _ = btcec.ParsePubKey([]byte{
0x3,
0xfa, 0x1f, 0x6, 0x3a, 0xa4, 0x75, 0x2e, 0x74,
0x3e, 0x55, 0x9, 0x20, 0x6e, 0xf6, 0xa8, 0xe1,
0xd7, 0x61, 0x50, 0x75, 0xa8, 0x34, 0x15, 0xc3,
0x6b, 0xdc, 0xb0, 0xbf, 0xaa, 0x66, 0xd7, 0xa7,
})
remoteMultiSigKey, _ = btcec.ParsePubKey([]byte{
0x2,
0x2b, 0x88, 0x7c, 0x6a, 0xf8, 0xb3, 0x51, 0x61,
0xd3, 0x1c, 0xf1, 0xe4, 0x43, 0xc2, 0x8c, 0x5e,
0xfa, 0x8e, 0xb5, 0xe9, 0xd0, 0x14, 0xb5, 0x33,
0x6a, 0xcc, 0xd, 0x11, 0x42, 0xb8, 0x4b, 0x7d,
})
remoteRevocationBasePoint, _ = btcec.ParsePubKey([]byte{
0x2,
0x6c, 0x39, 0xa3, 0x6d, 0x93, 0x69, 0xac, 0x14,
0x1f, 0xbb, 0x4, 0x86, 0x3, 0x82, 0x5, 0xe2,
0xcb, 0xb0, 0x62, 0x41, 0xa, 0x93, 0x3, 0x6c,
0x8d, 0xc0, 0x42, 0x4d, 0x9e, 0x51, 0x9b, 0x36,
})
remotePaymentBasePoint, _ = btcec.ParsePubKey([]byte{
0x3,
0xab, 0x74, 0x1e, 0x83, 0x48, 0xe3, 0xb5, 0x6,
0x25, 0x1c, 0x80, 0xe7, 0xf2, 0x3e, 0x7d, 0xb7,
0x7a, 0xc7, 0xd, 0x6, 0x3b, 0xbc, 0x74, 0x96,
0x8e, 0x9b, 0x2d, 0xd1, 0x42, 0x71, 0xa5, 0x2a,
})
remoteDelayBasePoint, _ = btcec.ParsePubKey([]byte{
0x2,
0x4b, 0xdd, 0x52, 0x46, 0x1b, 0x50, 0x89, 0xb9,
0x49, 0x4, 0xf2, 0xd2, 0x98, 0x7d, 0x51, 0xa1,
0xa6, 0x3f, 0x9b, 0xd0, 0x40, 0x7c, 0x93, 0x74,
0x3b, 0x8c, 0x4d, 0x63, 0x32, 0x90, 0xa, 0xca,
})
remoteHtlcBasePoint, _ = btcec.ParsePubKey([]byte{
0x3,
0x5b, 0x8f, 0x4a, 0x71, 0x4c, 0x2e, 0x71, 0x14,
0x86, 0x1f, 0x30, 0x96, 0xc0, 0xd4, 0x11, 0x76,
0xf8, 0xc3, 0xfc, 0x7, 0x2d, 0x15, 0x99, 0x55,
0x8, 0x69, 0xf6, 0x1, 0xa2, 0xcd, 0x6b, 0xa7,
})
)
// setupTestLogs takes care of creating the related buckets and inserts testing
// records.
func setupTestLogs(db kvdb.Backend, c *mig26.OpenChannel,
oldLogs, newLogs []mig.ChannelCommitment) error {
return kvdb.Update(db, func(tx kvdb.RwTx) error {
// If the open channel is nil, only create the root
// bucket and skip creating the channel bucket.
if c == nil {
_, err := tx.CreateTopLevelBucket(openChannelBucket)
return err
}
// Create test buckets.
chanBucket, err := mig25.CreateChanBucket(tx, &c.OpenChannel)
if err != nil {
return err
}
// Save channel info.
if err := mig26.PutChanInfo(chanBucket, c, false); err != nil {
return fmt.Errorf("PutChanInfo got %v", err)
}
// Save revocation state.
if err := putChanRevocationState(chanBucket, c); err != nil {
return fmt.Errorf("putChanRevocationState got %v", err)
}
// Create old logs.
err = writeOldRevocationLogs(chanBucket, oldLogs)
if err != nil {
return fmt.Errorf("write old logs: %v", err)
}
// Create new logs.
return writeNewRevocationLogs(chanBucket, newLogs)
}, func() {})
}
// createTestChannel creates an OpenChannel using the specified nodePub and
// outpoint. If any of the params is nil, a random value is populated.
func createTestChannel(nodePub *btcec.PublicKey) *mig26.OpenChannel {
// Create a random private key that's used to provide randomness.
priv, _ := btcec.NewPrivateKey()
// If passed public key is nil, use the random public key.
if nodePub == nil {
nodePub = priv.PubKey()
}
// Create a random channel point.
var op wire.OutPoint
copy(op.Hash[:], priv.Serialize())
testProducer := shachain.NewRevocationProducer(op.Hash)
store, _ := createTestStore()
localCfg := mig.ChannelConfig{
ChannelConstraints: mig.ChannelConstraints{
DustLimit: btcutil.Amount(354),
MaxAcceptedHtlcs: 483,
CsvDelay: 4,
},
MultiSigKey: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 0,
Index: 0,
},
PubKey: localMusigKey,
},
RevocationBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 1,
Index: 0,
},
PubKey: localRevocationBasePoint,
},
HtlcBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 2,
Index: 0,
},
PubKey: localHtlcBasePoint,
},
PaymentBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 3,
Index: 0,
},
PubKey: localPaymentBasePoint,
},
DelayBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 4,
Index: 0,
},
PubKey: localDelayBasePoint,
},
}
remoteCfg := mig.ChannelConfig{
ChannelConstraints: mig.ChannelConstraints{
DustLimit: btcutil.Amount(354),
MaxAcceptedHtlcs: 483,
CsvDelay: 4,
},
MultiSigKey: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 0,
Index: 0,
},
PubKey: remoteMultiSigKey,
},
RevocationBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 0,
Index: 0,
},
PubKey: remoteRevocationBasePoint,
},
HtlcBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 0,
Index: 0,
},
PubKey: remoteHtlcBasePoint,
},
PaymentBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 0,
Index: 0,
},
PubKey: remotePaymentBasePoint,
},
DelayBasePoint: keychain.KeyDescriptor{
KeyLocator: keychain.KeyLocator{
Family: 0,
Index: 0,
},
PubKey: remoteDelayBasePoint,
},
}
c := &mig26.OpenChannel{
OpenChannel: mig25.OpenChannel{
OpenChannel: mig.OpenChannel{
ChainHash: testChainHash,
IdentityPub: nodePub,
FundingOutpoint: op,
LocalChanCfg: localCfg,
RemoteChanCfg: remoteCfg,
// Assign dummy values.
RemoteCurrentRevocation: nodePub,
RevocationProducer: testProducer,
RevocationStore: store,
},
ChanType: testChanType,
},
}
return c
}
// writeOldRevocationLogs saves an old revocation log to db.
func writeOldRevocationLogs(chanBucket kvdb.RwBucket,
oldLogs []mig.ChannelCommitment) error {
// Don't bother continue if the logs are empty.
if len(oldLogs) == 0 {
return nil
}
logBucket, err := chanBucket.CreateBucketIfNotExists(
revocationLogBucketDeprecated,
)
if err != nil {
return err
}
for _, c := range oldLogs {
if err := putOldRevocationLog(logBucket, &c); err != nil {
return err
}
}
return nil
}
// writeNewRevocationLogs saves a new revocation log to db.
func writeNewRevocationLogs(chanBucket kvdb.RwBucket,
oldLogs []mig.ChannelCommitment) error {
// Don't bother continue if the logs are empty.
if len(oldLogs) == 0 {
return nil
}
logBucket, err := chanBucket.CreateBucketIfNotExists(
revocationLogBucket,
)
if err != nil {
return err
}
for _, c := range oldLogs {
// NOTE: we just blindly write the output indexes to db here
// whereas normally, we would find the correct indexes from the
// old commit tx. We do this intentionally so we can
// distinguish a newly created log from an already saved one.
err := putRevocationLog(
logBucket, &c, testOurIndex, testTheirIndex,
)
if err != nil {
return err
}
}
return nil
}
// createTestStore creates a revocation store and always saves the above
// defined two preimages into the store.
func createTestStore() (shachain.Store, error) {
var p chainhash.Hash
copy(p[:], preimage1)
testStore := shachain.NewRevocationStore()
if err := testStore.AddNextEntry(&p); err != nil {
return nil, err
}
copy(p[:], preimage2)
if err := testStore.AddNextEntry(&p); err != nil {
return nil, err
}
return testStore, nil
}
// createNotStarted will setup a situation where we haven't started the
// migration for the channel. We use the legacy to denote whether to simulate a
// node with v0.15.0.
func createNotStarted(cdb kvdb.Backend, c *mig26.OpenChannel,
legacy bool) error {
var newLogs []mig.ChannelCommitment
// Create test logs.
oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2}
// Add a new log if the node is running with v0.15.0.
if !legacy {
newLogs = []mig.ChannelCommitment{newLog3}
}
return setupTestLogs(cdb, c, oldLogs, newLogs)
}
// createNotFinished will setup a situation where we have un-migrated logs and
// return the next migration height. We use the legacy to denote whether to
// simulate a node with v0.15.0.
func createNotFinished(cdb kvdb.Backend, c *mig26.OpenChannel,
legacy bool) error {
// Create test logs.
oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2}
newLogs := []mig.ChannelCommitment{oldLog1}
// Add a new log if the node is running with v0.15.0.
if !legacy {
newLogs = append(newLogs, newLog3)
}
return setupTestLogs(cdb, c, oldLogs, newLogs)
}
// createFinished will setup a situation where all the old logs have been
// migrated and return a nil. We use the legacy to denote whether to simulate a
// node with v0.15.0.
func createFinished(cdb kvdb.Backend, c *mig26.OpenChannel,
legacy bool) error {
// Create test logs.
oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2}
newLogs := []mig.ChannelCommitment{oldLog1, oldLog2}
// Add a new log if the node is running with v0.15.0.
if !legacy {
newLogs = append(newLogs, newLog3)
}
return setupTestLogs(cdb, c, oldLogs, newLogs)
}

View File

@@ -84,6 +84,45 @@ func ApplyMigration(t *testing.T,
}
}
// ApplyMigrationWithDb is a helper test function that encapsulates the general
// steps which are needed to properly check the result of applying migration
// function. This function differs from ApplyMigration as it requires the
// supplied migration functions to take a db instance and construct their own
// database transactions.
func ApplyMigrationWithDb(t testing.TB, beforeMigration, afterMigration,
migrationFunc func(db kvdb.Backend) error) {
t.Helper()
cdb, cleanUp, err := MakeDB()
defer cleanUp()
if err != nil {
t.Fatal(err)
}
// beforeMigration usually used for populating the database
// with test data.
if err := beforeMigration(cdb); err != nil {
t.Fatalf("beforeMigration error: %v", err)
}
// Apply migration.
if err := migrationFunc(cdb); err != nil {
t.Fatalf("migrationFunc error: %v", err)
}
// If there's no afterMigration, exit here.
if afterMigration == nil {
return
}
// afterMigration usually used for checking the database state
// and throwing the error if something went wrong.
if err := afterMigration(cdb); err != nil {
t.Fatalf("afterMigration error: %v", err)
}
}
func newError(e interface{}) error {
var err error
switch e := e.(type) {

View File

@@ -25,9 +25,18 @@ const (
DefaultPreAllocCacheNumNodes = 15000
)
// OptionalMiragtionConfig defines the flags used to signal whether a
// particular migration needs to be applied.
type OptionalMiragtionConfig struct {
// PruneRevocationLog specifies that the revocation log migration needs
// to be applied.
PruneRevocationLog bool
}
// Options holds parameters for tuning and customizing a channeldb.DB.
type Options struct {
kvdb.BoltBackendConfig
OptionalMiragtionConfig
// RejectCacheSize is the maximum number of rejectCacheEntries to hold
// in the rejection cache.
@@ -76,6 +85,7 @@ func DefaultOptions() Options {
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
DBTimeout: kvdb.DefaultDBTimeout,
},
OptionalMiragtionConfig: OptionalMiragtionConfig{},
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes,
@@ -176,3 +186,11 @@ func OptionKeepFailedPaymentAttempts(keepFailedPaymentAttempts bool) OptionModif
o.keepFailedPaymentAttempts = keepFailedPaymentAttempts
}
}
// OptionPruneRevocationLog specifies whether the migration for pruning
// revocation logs needs to be applied or not.
func OptionPruneRevocationLog(prune bool) OptionModifier {
return func(o *Options) {
o.OptionalMiragtionConfig.PruneRevocationLog = prune
}
}

View File

@@ -857,6 +857,7 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
channeldb.OptionSetUseGraphCache(!cfg.DB.NoGraphCache),
channeldb.OptionKeepFailedPaymentAttempts(cfg.KeepFailedPaymentAttempts),
channeldb.OptionPruneRevocationLog(cfg.DB.PruneRevocation),
}
// We want to pre-allocate the channel graph cache according to what we

View File

@@ -35,6 +35,23 @@
* [Delete failed payment attempts](https://github.com/lightningnetwork/lnd/pull/6438)
once payments are settled, unless specified with `keep-failed-payment-attempts` flag.
* [A new db configuration flag
`db.prune-revocation`](https://github.com/lightningnetwork/lnd/pull/6469) is
introduced to take the advantage enabled by [a recent space
optimization](https://github.com/lightningnetwork/lnd/pull/6347). Users can
set this flag to `true` to run an optional db migration during `lnd`'s
startup. This flag will prune the old revocation logs and save them using the
new format that can save large amount of disk space.
For a busy channel with millions of updates, this migration can take quite
some time. The benchmark shows it takes roughly 70 seconds to finish a
migration with 1 million logs. Of course the actual time taken can vary from
machine to machine. Users can run the following benchmark test to get an
accurate time it'll take for a channel with 1 millions updates to plan ahead,
```sh
cd ./channeldb/migration30
go test -bench=. -run=TestMigrateRevocationLogMemCap -benchtime=1000000x -timeout=10m -benchmem
```
## Documentation
* [Add minor comment](https://github.com/lightningnetwork/lnd/pull/6559) on

View File

@@ -61,6 +61,8 @@ type DB struct {
Postgres *postgres.Config `group:"postgres" namespace:"postgres" description:"Postgres settings."`
NoGraphCache bool `long:"no-graph-cache" description:"Don't use the in-memory graph cache for path finding. Much slower but uses less RAM. Can only be used with a bolt database backend."`
PruneRevocation bool `long:"prune-revocation" description:"Run the optional migration that prunes the revocation logs to save disk space."`
}
// DefaultDB creates and returns a new default DB config.

View File

@@ -1205,6 +1205,12 @@ litecoin.node=ltcd
; less RAM. Can only be used with a bolt database backend.
; db.no-graph-cache=true
; Specify whether the optional migration for pruning old revocation logs
; should be applied. This migration will only save disk space if there are open
; channels prior to lnd@v0.15.0.
; db.prune-revocation=false
[etcd]
; Etcd database host.