channeldb: export DB migration related functions

We'll need to know whether a database was migrated to the latest version
in our upcoming data migration tool. To be able to determine the current
version of a DB and the total number of migrations in existence, we need
to export some of the functions in channeldb.
We also add some helper functions for adding tombstone and other markers
to a database.
This commit is contained in:
Oliver Gugger
2022-09-30 14:17:13 +02:00
parent 28c8e8b6dd
commit 7e76326b97
3 changed files with 189 additions and 7 deletions

View File

@@ -2,6 +2,7 @@ package channeldb
import (
"bytes"
"errors"
"fmt"
"github.com/lightningnetwork/lnd/kvdb"
@@ -20,6 +21,15 @@ var (
// dbVersionKey is a boltdb key and it's used for storing/retrieving
// a list of optional migrations that have been applied.
optionalVersionKey = []byte("ovk")
// TombstoneKey is the key under which we add a tag in the source DB
// after we've successfully and completely migrated it to the target/
// destination DB.
TombstoneKey = []byte("data-migration-tombstone")
// ErrMarkerNotPresent is the error that is returned if the queried
// marker is not present in the given database.
ErrMarkerNotPresent = errors.New("marker not present")
)
// Meta structure holds the database meta information.
@@ -33,7 +43,7 @@ func (d *DB) FetchMeta() (*Meta, error) {
var meta *Meta
err := kvdb.View(d, func(tx kvdb.RTx) error {
return fetchMeta(meta, tx)
return FetchMeta(meta, tx)
}, func() {
meta = &Meta{}
})
@@ -44,10 +54,9 @@ func (d *DB) FetchMeta() (*Meta, error) {
return meta, nil
}
// fetchMeta is an internal helper function used in order to allow callers to
// re-use a database transaction. See the publicly exported FetchMeta method
// for more information.
func fetchMeta(meta *Meta, tx kvdb.RTx) error {
// FetchMeta is a helper function used in order to allow callers to re-use a
// database transaction.
func FetchMeta(meta *Meta, tx kvdb.RTx) error {
metaBucket := tx.ReadBucket(metaBucket)
if metaBucket == nil {
return ErrMetaNotFound
@@ -149,7 +158,7 @@ func (d *DB) fetchOptionalMeta() (*OptionalMeta, error) {
return om, nil
}
// fetchOptionalMeta writes an optional meta to the database.
// putOptionalMeta writes an optional meta to the database.
func (d *DB) putOptionalMeta(om *OptionalMeta) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
metaBucket, err := tx.CreateTopLevelBucket(metaBucket)
@@ -176,3 +185,62 @@ func (d *DB) putOptionalMeta(om *OptionalMeta) error {
return metaBucket.Put(optionalVersionKey, b.Bytes())
}, func() {})
}
// CheckMarkerPresent returns the marker under the requested key or
// ErrMarkerNotFound if either the root bucket or the marker key within that
// bucket does not exist.
func CheckMarkerPresent(tx kvdb.RTx, markerKey []byte) ([]byte, error) {
markerBucket := tx.ReadBucket(markerKey)
if markerBucket == nil {
return nil, ErrMarkerNotPresent
}
val := markerBucket.Get(markerKey)
// If we wrote the marker correctly, we created a bucket _and_ created a
// key with a non-empty value. It doesn't matter to us whether the key
// exists or whether its value is empty, to us, it just means the marker
// isn't there.
if len(val) == 0 {
return nil, ErrMarkerNotPresent
}
return val, nil
}
// EnsureNoTombstone returns an error if there is a tombstone marker in the DB
// of the given transaction.
func EnsureNoTombstone(tx kvdb.RTx) error {
marker, err := CheckMarkerPresent(tx, TombstoneKey)
if err == ErrMarkerNotPresent {
// No marker present, so no tombstone. The DB is still alive.
return nil
}
if err != nil {
return err
}
// There was no error so there is a tombstone marker/tag. We cannot use
// this DB anymore.
return fmt.Errorf("refusing to use db, it was marked with a tombstone "+
"after successful data migration; tombstone reads: %s",
string(marker))
}
// AddMarker adds the marker with the given key into a top level bucket with the
// same name. So the structure will look like:
//
// marker-key (top level bucket)
// |-> marker-key:marker-value (key/value pair)
func AddMarker(tx kvdb.RwTx, markerKey, markerValue []byte) error {
if len(markerValue) == 0 {
return fmt.Errorf("marker value cannot be empty")
}
markerBucket, err := tx.CreateTopLevelBucket(markerKey)
if err != nil {
return err
}
return markerBucket.Put(markerKey, markerValue)
}