watchtower/wtdb: add ability for a multi-tx db migration

In this commit, we add the ability to add a wtdb version migration that
does not get given a transaction but rather a whole db object. This will
be useful for migrations that are best done in multiple transaction in
order to use less RAM.
This commit is contained in:
Elle Mouton
2022-12-23 10:25:34 +02:00
parent af076d8ff4
commit 870a91a1e8

View File

@@ -1,6 +1,8 @@
package wtdb package wtdb
import ( import (
"fmt"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration1" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration1"
@@ -8,15 +10,25 @@ import (
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration3" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration3"
) )
// migration is a function which takes a prior outdated version of the database // txMigration is a function which takes a prior outdated version of the
// instances and mutates the key/bucket structure to arrive at a more // database instances and mutates the key/bucket structure to arrive at a more
// up-to-date version of the database. // up-to-date version of the database. It uses an existing database transaction
type migration func(tx kvdb.RwTx) error // to do so.
type txMigration func(tx kvdb.RwTx) error
// dbMigration is a function which takes a prior outdated version of the
// database instances and mutates the key/bucket structure to arrive at a more
// up-to-date version of the database. If such a migration is defined, the
// migration is responsible for starting any database transactions. This
// migration type is useful in the case where it would be beneficial (in terms
// of RAM usage) to split the migration between multiple db transactions.
type dbMigration func(db kvdb.Backend) error
// version pairs a version number with the migration that would need to be // version pairs a version number with the migration that would need to be
// applied from the prior version to upgrade. // applied from the prior version to upgrade.
type version struct { type version struct {
migration migration txMigration txMigration
dbMigration dbMigration
} }
// towerDBVersions stores all versions and migrations of the tower database. // towerDBVersions stores all versions and migrations of the tower database.
@@ -29,13 +41,13 @@ var towerDBVersions = []version{}
// migrations must be applied. // migrations must be applied.
var clientDBVersions = []version{ var clientDBVersions = []version{
{ {
migration: migration1.MigrateTowerToSessionIndex, txMigration: migration1.MigrateTowerToSessionIndex,
}, },
{ {
migration: migration2.MigrateClientChannelDetails, txMigration: migration2.MigrateClientChannelDetails,
}, },
{ {
migration: migration3.MigrateChannelIDIndex, txMigration: migration3.MigrateChannelIDIndex,
}, },
} }
@@ -154,23 +166,57 @@ func syncVersions(db versionedDB, versions []version) error {
// Otherwise, apply any migrations in order to bring the database // Otherwise, apply any migrations in order to bring the database
// version up to the highest known version. // version up to the highest known version.
updates := getMigrations(versions, curVersion) updates := getMigrations(versions, curVersion)
return kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error {
for i, update := range updates { for i, update := range updates {
if update.migration == nil { if update.dbMigration != nil && update.txMigration != nil {
continue return fmt.Errorf("cannot specify both a " +
"tx-migration and a db-migration for a " +
"single version")
} }
version := curVersion + uint32(i) + 1 version := curVersion + uint32(i) + 1
log.Infof("Applying migration #%d", version) log.Infof("Applying migration #%d", version)
err := update.migration(tx) if update.dbMigration != nil {
err = update.dbMigration(db.bdb())
if err != nil { if err != nil {
log.Errorf("Unable to apply migration #%d: %v", log.Errorf("Unable to apply migration #%d: %v",
version, err) version, err)
return err return err
} }
// Note that unlike a txMigration, here we update the
// db version in a transaction that is separate to the
// transaction in which the db migration took place.
// This means that the db migration function must be
// idempotent.
err = kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error {
return putDBVersion(tx, version)
}, func() {})
if err != nil {
return err
} }
return putDBVersion(tx, latestVersion) continue
}, func() {}) }
if update.txMigration == nil {
continue
}
err = kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error {
err := update.txMigration(tx)
if err != nil {
log.Errorf("Unable to apply migration #%d: %v",
version, err)
return err
}
return putDBVersion(tx, version)
}, func() {})
if err != nil {
return err
}
}
return nil
} }