From b16df45076c7fcf2657137b018c8b8f2af280bbc Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 9 Mar 2023 10:26:06 +0200 Subject: [PATCH] watchtower: add sessionID index In this commit, a new session-ID index is added to the tower client db with the help of a migration. This index holds a mapping from a db-assigned-ID (a uint64 encoded using BigSize encoding) to real session ID (33 bytes). This mapping will help us save space in future when persisting references to sessions. --- watchtower/wtdb/client_db.go | 82 ++++++++++- watchtower/wtdb/log.go | 2 + watchtower/wtdb/migration6/client_db.go | 114 ++++++++++++++ watchtower/wtdb/migration6/client_db_test.go | 147 +++++++++++++++++++ watchtower/wtdb/migration6/codec.go | 17 +++ watchtower/wtdb/migration6/log.go | 14 ++ watchtower/wtdb/version.go | 4 + 7 files changed, 373 insertions(+), 7 deletions(-) create mode 100644 watchtower/wtdb/migration6/client_db.go create mode 100644 watchtower/wtdb/migration6/client_db_test.go create mode 100644 watchtower/wtdb/migration6/codec.go create mode 100644 watchtower/wtdb/migration6/log.go diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 3cbe0c6f1..2079a0c6f 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -35,10 +35,15 @@ var ( // cSessionBkt is a top-level bucket storing: // session-id => cSessionBody -> encoded ClientSessionBody + // => cSessionDBID -> db-assigned-id // => cSessionCommits => seqnum -> encoded CommittedUpdate // => cSessionAckRangeIndex => db-chan-id => start -> end cSessionBkt = []byte("client-session-bucket") + // cSessionDBID is a key used in the cSessionBkt to store the + // db-assigned-id of a session. + cSessionDBID = []byte("client-session-db-id") + // cSessionBody is a sub-bucket of cSessionBkt storing only the body of // the ClientSession. cSessionBody = []byte("client-session-body") @@ -55,6 +60,10 @@ var ( // db-assigned-id -> channel-ID cChanIDIndexBkt = []byte("client-channel-id-index") + // cSessionIDIndexBkt is a top-level bucket storing: + // db-assigned-id -> session-id + cSessionIDIndexBkt = []byte("client-session-id-index") + // cTowerBkt is a top-level bucket storing: // tower-id -> encoded Tower. cTowerBkt = []byte("client-tower-bucket") @@ -241,6 +250,7 @@ func initClientDBBuckets(tx kvdb.RwTx) error { cTowerIndexBkt, cTowerToSessionIndexBkt, cChanIDIndexBkt, + cSessionIDIndexBkt, } for _, bucket := range buckets { @@ -723,24 +733,58 @@ func (c *ClientDB) CreateClientSession(session *ClientSession) error { } } - // Add the new entry to the towerID-to-SessionID index. - indexBkt := towerToSessionIndex.NestedReadWriteBucket( - towerID.Bytes(), - ) - if indexBkt == nil { - return ErrTowerNotFound + // Get the session-ID index bucket. + dbIDIndex := tx.ReadWriteBucket(cSessionIDIndexBkt) + if dbIDIndex == nil { + return ErrUninitializedDB } - err = indexBkt.Put(session.ID[:], []byte{1}) + // Get a new, unique, ID for this session from the session-ID + // index bucket. + nextSeq, err := dbIDIndex.NextSequence() if err != nil { return err } + // Add the new entry to the dbID-to-SessionID index. + newIndex, err := writeBigSize(nextSeq) + if err != nil { + return err + } + + err = dbIDIndex.Put(newIndex, session.ID[:]) + if err != nil { + return err + } + + // Also add the db-assigned-id to the session bucket under the + // cSessionDBID key. sessionBkt, err := sessions.CreateBucket(session.ID[:]) if err != nil { return err } + err = sessionBkt.Put(cSessionDBID, newIndex) + if err != nil { + return err + } + + // TODO(elle): migrate the towerID-to-SessionID to use the + // new db-assigned sessionID's rather. + + // Add the new entry to the towerID-to-SessionID index. + towerSessions := towerToSessionIndex.NestedReadWriteBucket( + towerID.Bytes(), + ) + if towerSessions == nil { + return ErrTowerNotFound + } + + err = towerSessions.Put(session.ID[:], []byte{1}) + if err != nil { + return err + } + // Finally, write the client session's body in the sessions // bucket. return putClientSessionBody(sessionBkt, session) @@ -1882,6 +1926,30 @@ func getDBChanID(chanDetailsBkt kvdb.RBucket, chanID lnwire.ChannelID) (uint64, return id, idBytes, nil } +// getDBSessionID returns the db-assigned session ID for the given real session +// ID. It returns both the uint64 and byte representation. +func getDBSessionID(sessionsBkt kvdb.RBucket, sessionID SessionID) (uint64, + []byte, error) { + + sessionBkt := sessionsBkt.NestedReadBucket(sessionID[:]) + if sessionBkt == nil { + return 0, nil, ErrClientSessionNotFound + } + + idBytes := sessionBkt.Get(cSessionDBID) + if len(idBytes) == 0 { + return 0, nil, fmt.Errorf("no db-assigned ID found for "+ + "session ID %s", sessionID) + } + + id, err := readBigSize(idBytes) + if err != nil { + return 0, nil, err + } + + return id, idBytes, nil +} + // writeBigSize will encode the given uint64 as a BigSize byte slice. func writeBigSize(i uint64) ([]byte, error) { var b bytes.Buffer diff --git a/watchtower/wtdb/log.go b/watchtower/wtdb/log.go index f7952ba4e..638542abb 100644 --- a/watchtower/wtdb/log.go +++ b/watchtower/wtdb/log.go @@ -8,6 +8,7 @@ import ( "github.com/lightningnetwork/lnd/watchtower/wtdb/migration3" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration4" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration5" + "github.com/lightningnetwork/lnd/watchtower/wtdb/migration6" ) // log is a logger that is initialized with no output filters. This @@ -36,6 +37,7 @@ func UseLogger(logger btclog.Logger) { migration3.UseLogger(logger) migration4.UseLogger(logger) migration5.UseLogger(logger) + migration6.UseLogger(logger) } // logClosure is used to provide a closure over expensive logging operations so diff --git a/watchtower/wtdb/migration6/client_db.go b/watchtower/wtdb/migration6/client_db.go new file mode 100644 index 000000000..8d5ffbc29 --- /dev/null +++ b/watchtower/wtdb/migration6/client_db.go @@ -0,0 +1,114 @@ +package migration6 + +import ( + "bytes" + "encoding/binary" + "errors" + + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/tlv" +) + +var ( + // cSessionBkt is a top-level bucket storing: + // session-id => cSessionBody -> encoded ClientSessionBody + // => cSessionDBID -> db-assigned-id + // => cSessionCommits => seqnum -> encoded CommittedUpdate + // => cSessionAcks => seqnum -> encoded BackupID + cSessionBkt = []byte("client-session-bucket") + + // cSessionDBID is a key used in the cSessionBkt to store the + // db-assigned-id of a session. + cSessionDBID = []byte("client-session-db-id") + + // cSessionIDIndexBkt is a top-level bucket storing: + // db-assigned-id -> session-id + cSessionIDIndexBkt = []byte("client-session-id-index") + + // cSessionBody is a sub-bucket of cSessionBkt storing only the body of + // the ClientSession. + cSessionBody = []byte("client-session-body") + + // ErrUninitializedDB signals that top-level buckets for the database + // have not been initialized. + ErrUninitializedDB = errors.New("db not initialized") + + // ErrCorruptClientSession signals that the client session's on-disk + // structure deviates from what is expected. + ErrCorruptClientSession = errors.New("client session corrupted") + + byteOrder = binary.BigEndian +) + +// MigrateSessionIDIndex adds a new session ID index to the tower client db. +// This index is a mapping from db-assigned ID (a uint64 encoded using BigSize) +// to real session ID (33 bytes). This mapping will allow us to persist session +// pointers with fewer bytes in the future. +func MigrateSessionIDIndex(tx kvdb.RwTx) error { + log.Infof("Migrating the tower client db to add a new session ID " + + "index which stores a mapping from db-assigned ID to real " + + "session ID") + + // Create a new top-level bucket for the index. + indexBkt, err := tx.CreateTopLevelBucket(cSessionIDIndexBkt) + if err != nil { + return err + } + + // Get the existing top-level sessions bucket. + sessionsBkt := tx.ReadWriteBucket(cSessionBkt) + if sessionsBkt == nil { + return ErrUninitializedDB + } + + // Iterate over the sessions bucket where each key is a session-ID. + return sessionsBkt.ForEach(func(sessionID, _ []byte) error { + // Ask the DB for a new, unique, id for the index bucket. + nextSeq, err := indexBkt.NextSequence() + if err != nil { + return err + } + + newIndex, err := writeBigSize(nextSeq) + if err != nil { + return err + } + + // Add the new db-assigned-ID to real-session-ID pair to the + // new index bucket. + err = indexBkt.Put(newIndex, sessionID) + if err != nil { + return err + } + + // Get the sub-bucket for this specific session ID. + sessionBkt := sessionsBkt.NestedReadWriteBucket(sessionID) + if sessionBkt == nil { + return ErrCorruptClientSession + } + + // Here we ensure that the session bucket includes a session + // body. The only reason we do this is so that we can simulate + // a migration fail in a test to ensure that a migration fail + // results in an untouched db. + sessionBodyBytes := sessionBkt.Get(cSessionBody) + if sessionBodyBytes == nil { + return ErrCorruptClientSession + } + + // Add the db-assigned ID of the session to the session under + // the cSessionDBID key. + return sessionBkt.Put(cSessionDBID, newIndex) + }) +} + +// writeBigSize will encode the given uint64 as a BigSize byte slice. +func writeBigSize(i uint64) ([]byte, error) { + var b bytes.Buffer + err := tlv.WriteVarInt(&b, i, &[8]byte{}) + if err != nil { + return nil, err + } + + return b.Bytes(), nil +} diff --git a/watchtower/wtdb/migration6/client_db_test.go b/watchtower/wtdb/migration6/client_db_test.go new file mode 100644 index 000000000..c4928e2f9 --- /dev/null +++ b/watchtower/wtdb/migration6/client_db_test.go @@ -0,0 +1,147 @@ +package migration6 + +import ( + "bytes" + "testing" + + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/tlv" +) + +var ( + // pre is the expected data in the sessions bucket before the migration. + pre = map[string]interface{}{ + sessionIDToString(100): map[string]interface{}{ + string(cSessionBody): string([]byte{1, 2, 3}), + }, + sessionIDToString(222): map[string]interface{}{ + string(cSessionBody): string([]byte{4, 5, 6}), + }, + } + + // preFailCorruptDB should fail the migration due to no session body + // being found for a given session ID. + preFailCorruptDB = map[string]interface{}{ + sessionIDToString(100): "", + } + + // post is the expected session index after migration. + postIndex = map[string]interface{}{ + indexToString(1): sessionIDToString(100), + indexToString(2): sessionIDToString(222), + } + + // postSessions is the expected data in the sessions bucket after the + // migration. + postSessions = map[string]interface{}{ + sessionIDToString(100): map[string]interface{}{ + string(cSessionBody): string([]byte{1, 2, 3}), + string(cSessionDBID): indexToString(1), + }, + sessionIDToString(222): map[string]interface{}{ + string(cSessionBody): string([]byte{4, 5, 6}), + string(cSessionDBID): indexToString(2), + }, + } +) + +// TestMigrateSessionIDIndex tests that the MigrateSessionIDIndex function +// correctly adds a new session-id index to the DB and also correctly updates +// the existing session bucket. +func TestMigrateSessionIDIndex(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + shouldFail bool + pre map[string]interface{} + postSessions map[string]interface{} + postIndex map[string]interface{} + }{ + { + name: "migration ok", + shouldFail: false, + pre: pre, + postSessions: postSessions, + postIndex: postIndex, + }, + { + name: "fail due to corrupt db", + shouldFail: true, + pre: preFailCorruptDB, + }, + { + name: "no channel details", + shouldFail: false, + pre: nil, + postSessions: nil, + postIndex: nil, + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + // Before the migration we have a details bucket. + before := func(tx kvdb.RwTx) error { + return migtest.RestoreDB( + tx, cSessionBkt, test.pre, + ) + } + + // After the migration, we should have an untouched + // summary bucket and a new index bucket. + after := func(tx kvdb.RwTx) error { + // If the migration fails, the details bucket + // should be untouched. + if test.shouldFail { + if err := migtest.VerifyDB( + tx, cSessionBkt, test.pre, + ); err != nil { + return err + } + + return nil + } + + // Else, we expect an updated summary bucket + // and a new index bucket. + err := migtest.VerifyDB( + tx, cSessionBkt, test.postSessions, + ) + if err != nil { + return err + } + + return migtest.VerifyDB( + tx, cSessionIDIndexBkt, test.postIndex, + ) + } + + migtest.ApplyMigration( + t, before, after, MigrateSessionIDIndex, + test.shouldFail, + ) + }) + } +} + +func indexToString(id uint64) string { + var newIndex bytes.Buffer + err := tlv.WriteVarInt(&newIndex, id, &[8]byte{}) + if err != nil { + panic(err) + } + + return newIndex.String() +} + +func sessionIDToString(id uint64) string { + var chanID SessionID + byteOrder.PutUint64(chanID[:], id) + return chanID.String() +} diff --git a/watchtower/wtdb/migration6/codec.go b/watchtower/wtdb/migration6/codec.go new file mode 100644 index 000000000..11edbf299 --- /dev/null +++ b/watchtower/wtdb/migration6/codec.go @@ -0,0 +1,17 @@ +package migration6 + +import ( + "encoding/hex" +) + +// SessionIDSize is 33-bytes; it is a serialized, compressed public key. +const SessionIDSize = 33 + +// SessionID is created from the remote public key of a client, and serves as a +// unique identifier and authentication for sending state updates. +type SessionID [SessionIDSize]byte + +// String returns a hex encoding of the session id. +func (s SessionID) String() string { + return hex.EncodeToString(s[:]) +} diff --git a/watchtower/wtdb/migration6/log.go b/watchtower/wtdb/migration6/log.go new file mode 100644 index 000000000..e43e7d27e --- /dev/null +++ b/watchtower/wtdb/migration6/log.go @@ -0,0 +1,14 @@ +package migration6 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/watchtower/wtdb/version.go b/watchtower/wtdb/version.go index dbcad3715..1a186044a 100644 --- a/watchtower/wtdb/version.go +++ b/watchtower/wtdb/version.go @@ -10,6 +10,7 @@ import ( "github.com/lightningnetwork/lnd/watchtower/wtdb/migration3" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration4" "github.com/lightningnetwork/lnd/watchtower/wtdb/migration5" + "github.com/lightningnetwork/lnd/watchtower/wtdb/migration6" ) // txMigration is a function which takes a prior outdated version of the @@ -59,6 +60,9 @@ var clientDBVersions = []version{ { txMigration: migration5.MigrateCompleteTowerToSessionIndex, }, + { + txMigration: migration6.MigrateSessionIDIndex, + }, } // getLatestDBVersion returns the last known database version.