Merge pull request #7491 from ellemouton/fillInTowerToSessionIDIndex

watchtower/wtdb: add tower-to-session index entry for all towers
This commit is contained in:
Oliver Gugger 2023-03-08 13:17:05 +01:00 committed by GitHub
commit a28fbd4690
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 313 additions and 1 deletions

View File

@ -406,7 +406,10 @@ in the lnwire package](https://github.com/lightningnetwork/lnd/pull/7303)
speed of listing sessions for a particular tower ID](
https://github.com/lightningnetwork/lnd/pull/6972). This PR also ensures a
closer coupling of Towers and Sessions and ensures that a session cannot be
added if the tower it is referring to does not exist.
added if the tower it is referring to does not exist. A [follow-up migration
was added](https://github.com/lightningnetwork/lnd/pull/7491) to ensure that
entries are added to the new index for _all_ towers in the db, including those
for which there are not yet associated sessions.
* [Remove `AckedUpdates` & `CommittedUpdates` from the `ClientSession`
struct](https://github.com/lightningnetwork/lnd/pull/6928) in order to

View File

@ -7,6 +7,7 @@ import (
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration2"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration3"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration4"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration5"
)
// log is a logger that is initialized with no output filters. This
@ -34,6 +35,7 @@ func UseLogger(logger btclog.Logger) {
migration2.UseLogger(logger)
migration3.UseLogger(logger)
migration4.UseLogger(logger)
migration5.UseLogger(logger)
}
// logClosure is used to provide a closure over expensive logging operations so

View File

@ -0,0 +1,90 @@
package migration5
import (
"encoding/binary"
"errors"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// cTowerBkt is a top-level bucket storing:
// tower-id -> encoded Tower.
cTowerBkt = []byte("client-tower-bucket")
// cTowerIDToSessionIDIndexBkt is a top-level bucket storing:
// tower-id -> session-id -> 1
cTowerIDToSessionIDIndexBkt = []byte(
"client-tower-to-session-index-bucket",
)
// ErrUninitializedDB signals that top-level buckets for the database
// have not been initialized.
ErrUninitializedDB = errors.New("db not initialized")
// byteOrder is the default endianness used when serializing integers.
byteOrder = binary.BigEndian
)
// MigrateCompleteTowerToSessionIndex ensures that the tower-to-session index
// contains entries for all towers in the db. This is necessary because
// migration1 only created entries in the index for towers that the client had
// at least one session with. This migration thus makes sure that there is
// always a tower-to-sessions index entry for a tower even if there are no
// sessions with that tower.
func MigrateCompleteTowerToSessionIndex(tx kvdb.RwTx) error {
log.Infof("Migrating the tower client db to ensure that there is an " +
"entry in the towerID-to-sessionID index for every tower in " +
"the db")
// First, we collect all the towers that we should add an entry for in
// the index.
towerIDs, err := listTowerIDs(tx)
if err != nil {
return err
}
// Create a new top-level bucket for the index if it does not yet exist.
indexBkt, err := tx.CreateTopLevelBucket(cTowerIDToSessionIDIndexBkt)
if err != nil {
return err
}
// Finally, ensure that there is an entry in the tower-to-session index
// for each of our towers.
for _, id := range towerIDs {
// Create a sub-bucket using the tower ID.
_, err := indexBkt.CreateBucketIfNotExists(id.Bytes())
if err != nil {
return err
}
}
return nil
}
// listTowerIDs iterates through the cTowerBkt and collects a list of all the
// TowerIDs.
func listTowerIDs(tx kvdb.RTx) ([]*TowerID, error) {
var ids []*TowerID
towerBucket := tx.ReadBucket(cTowerBkt)
if towerBucket == nil {
return nil, ErrUninitializedDB
}
err := towerBucket.ForEach(func(towerIDBytes, _ []byte) error {
id, err := TowerIDFromBytes(towerIDBytes)
if err != nil {
return err
}
ids = append(ids, &id)
return nil
})
if err != nil {
return nil, err
}
return ids, nil
}

View File

@ -0,0 +1,162 @@
package migration5
import (
"testing"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
// preIndex is the data in the tower-to-session index before the
// migration.
preIndex = map[string]interface{}{
towerIDString(1): map[string]interface{}{
sessionIDString("1"): string([]byte{1}),
sessionIDString("3"): string([]byte{1}),
},
towerIDString(3): map[string]interface{}{
sessionIDString("4"): string([]byte{1}),
},
}
// preIndexBadTowerID has an invalid TowerID. This is used to test that
// the migration correctly rolls back on failure.
preIndexBadTowerID = map[string]interface{}{
"1": map[string]interface{}{},
}
// towerDBEmpty is the data in an empty tower bucket before the
// migration.
towerDBEmpty = map[string]interface{}{}
towerDBMatchIndex = map[string]interface{}{
towerIDString(1): map[string]interface{}{},
towerIDString(3): map[string]interface{}{},
}
towerDBWithExtraEntries = map[string]interface{}{
towerIDString(1): map[string]interface{}{},
towerIDString(3): map[string]interface{}{},
towerIDString(4): map[string]interface{}{},
}
// post is the expected data after migration.
postIndex = map[string]interface{}{
towerIDString(1): map[string]interface{}{
sessionIDString("1"): string([]byte{1}),
sessionIDString("3"): string([]byte{1}),
},
towerIDString(3): map[string]interface{}{
sessionIDString("4"): string([]byte{1}),
},
towerIDString(4): map[string]interface{}{},
}
)
// TestCompleteTowerToSessionIndex tests that the
// MigrateCompleteTowerToSessionIndex function correctly completes the
// towerID-to-sessionID index in the tower client db.
func TestCompleteTowerToSessionIndex(t *testing.T) {
t.Parallel()
tests := []struct {
name string
shouldFail bool
towerDB map[string]interface{}
pre map[string]interface{}
post map[string]interface{}
}{
{
name: "no changes - empty tower db",
towerDB: towerDBEmpty,
pre: preIndex,
post: preIndex,
},
{
name: "no changes - tower db matches index",
towerDB: towerDBMatchIndex,
pre: preIndex,
post: preIndex,
},
{
name: "fill in missing towers",
towerDB: towerDBWithExtraEntries,
pre: preIndex,
post: postIndex,
},
{
name: "fail due to corrupt db",
shouldFail: true,
towerDB: preIndexBadTowerID,
pre: preIndex,
post: preIndex,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
// Before the migration we have a tower bucket and an
// initial tower-to-session index bucket.
before := func(tx kvdb.RwTx) error {
err := migtest.RestoreDB(
tx, cTowerBkt, test.towerDB,
)
if err != nil {
return err
}
return migtest.RestoreDB(
tx, cTowerIDToSessionIDIndexBkt,
test.pre,
)
}
// After the migration, we should have an untouched
// tower bucket and a possibly tweaked tower-to-session
// index bucket.
after := func(tx kvdb.RwTx) error {
if err := migtest.VerifyDB(
tx, cTowerBkt, test.towerDB,
); err != nil {
return err
}
// If we expect our migration to fail, we don't
// expect our index bucket to be unchanged.
if test.shouldFail {
return migtest.VerifyDB(
tx, cTowerIDToSessionIDIndexBkt,
test.pre,
)
}
return migtest.VerifyDB(
tx, cTowerIDToSessionIDIndexBkt,
test.post,
)
}
migtest.ApplyMigration(
t, before, after,
MigrateCompleteTowerToSessionIndex,
test.shouldFail,
)
})
}
}
func towerIDString(id int) string {
towerID := TowerID(id)
return string(towerID.Bytes())
}
func sessionIDString(id string) string {
var sessID SessionID
copy(sessID[:], id)
return string(sessID[:])
}

View File

@ -0,0 +1,37 @@
package migration5
import (
"encoding/binary"
"fmt"
)
// TowerID is a unique 64-bit identifier allocated to each unique watchtower.
// This allows the client to conserve on-disk space by not needing to always
// reference towers by their pubkey.
type TowerID uint64
// TowerIDFromBytes constructs a TowerID from the provided byte slice. The
// argument must have at least 8 bytes, and should contain the TowerID in
// big-endian byte order.
func TowerIDFromBytes(towerIDBytes []byte) (TowerID, error) {
if len(towerIDBytes) != 8 {
return 0, fmt.Errorf("not enough bytes in tower ID. "+
"Expected 8, got: %d", len(towerIDBytes))
}
return TowerID(byteOrder.Uint64(towerIDBytes)), nil
}
// Bytes encodes a TowerID into an 8-byte slice in big-endian byte order.
func (id TowerID) Bytes() []byte {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], uint64(id))
return buf[:]
}
// SessionIDSize is 33-bytes; it is a serialized, compressed public key.
const SessionIDSize = 33
// SessionID is created from the remote public key of a client, and serves as a
// unique identifier and authentication for sending state updates.
type SessionID [SessionIDSize]byte

View File

@ -0,0 +1,14 @@
package migration5
import (
"github.com/btcsuite/btclog"
)
// log is a logger that is initialized as disabled. This means the package will
// not perform any logging by default until a logger is set.
var log = btclog.Disabled
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
log = logger
}

View File

@ -9,6 +9,7 @@ import (
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration2"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration3"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration4"
"github.com/lightningnetwork/lnd/watchtower/wtdb/migration5"
)
// txMigration is a function which takes a prior outdated version of the
@ -55,6 +56,9 @@ var clientDBVersions = []version{
migration4.DefaultSessionsPerTx,
),
},
{
txMigration: migration5.MigrateCompleteTowerToSessionIndex,
},
}
// getLatestDBVersion returns the last known database version.