watchtower: add sessionID index

In this commit, a new session-ID index is added to the tower client db
with the help of a migration. This index holds a mapping from a
db-assigned-ID (a uint64 encoded using BigSize encoding) to real session
ID (33 bytes). This mapping will help us save space in future when
persisting references to sessions.
This commit is contained in:
Elle Mouton
2023-03-09 10:26:06 +02:00
parent ae39cd9e91
commit b16df45076
7 changed files with 373 additions and 7 deletions

View File

@@ -35,10 +35,15 @@ var (
// cSessionBkt is a top-level bucket storing:
// session-id => cSessionBody -> encoded ClientSessionBody
// => cSessionDBID -> db-assigned-id
// => cSessionCommits => seqnum -> encoded CommittedUpdate
// => cSessionAckRangeIndex => db-chan-id => start -> end
cSessionBkt = []byte("client-session-bucket")
// cSessionDBID is a key used in the cSessionBkt to store the
// db-assigned-id of a session.
cSessionDBID = []byte("client-session-db-id")
// cSessionBody is a sub-bucket of cSessionBkt storing only the body of
// the ClientSession.
cSessionBody = []byte("client-session-body")
@@ -55,6 +60,10 @@ var (
// db-assigned-id -> channel-ID
cChanIDIndexBkt = []byte("client-channel-id-index")
// cSessionIDIndexBkt is a top-level bucket storing:
// db-assigned-id -> session-id
cSessionIDIndexBkt = []byte("client-session-id-index")
// cTowerBkt is a top-level bucket storing:
// tower-id -> encoded Tower.
cTowerBkt = []byte("client-tower-bucket")
@@ -241,6 +250,7 @@ func initClientDBBuckets(tx kvdb.RwTx) error {
cTowerIndexBkt,
cTowerToSessionIndexBkt,
cChanIDIndexBkt,
cSessionIDIndexBkt,
}
for _, bucket := range buckets {
@@ -723,24 +733,58 @@ func (c *ClientDB) CreateClientSession(session *ClientSession) error {
}
}
// Add the new entry to the towerID-to-SessionID index.
indexBkt := towerToSessionIndex.NestedReadWriteBucket(
towerID.Bytes(),
)
if indexBkt == nil {
return ErrTowerNotFound
// Get the session-ID index bucket.
dbIDIndex := tx.ReadWriteBucket(cSessionIDIndexBkt)
if dbIDIndex == nil {
return ErrUninitializedDB
}
err = indexBkt.Put(session.ID[:], []byte{1})
// Get a new, unique, ID for this session from the session-ID
// index bucket.
nextSeq, err := dbIDIndex.NextSequence()
if err != nil {
return err
}
// Add the new entry to the dbID-to-SessionID index.
newIndex, err := writeBigSize(nextSeq)
if err != nil {
return err
}
err = dbIDIndex.Put(newIndex, session.ID[:])
if err != nil {
return err
}
// Also add the db-assigned-id to the session bucket under the
// cSessionDBID key.
sessionBkt, err := sessions.CreateBucket(session.ID[:])
if err != nil {
return err
}
err = sessionBkt.Put(cSessionDBID, newIndex)
if err != nil {
return err
}
// TODO(elle): migrate the towerID-to-SessionID to use the
// new db-assigned sessionID's rather.
// Add the new entry to the towerID-to-SessionID index.
towerSessions := towerToSessionIndex.NestedReadWriteBucket(
towerID.Bytes(),
)
if towerSessions == nil {
return ErrTowerNotFound
}
err = towerSessions.Put(session.ID[:], []byte{1})
if err != nil {
return err
}
// Finally, write the client session's body in the sessions
// bucket.
return putClientSessionBody(sessionBkt, session)
@@ -1882,6 +1926,30 @@ func getDBChanID(chanDetailsBkt kvdb.RBucket, chanID lnwire.ChannelID) (uint64,
return id, idBytes, nil
}
// getDBSessionID returns the db-assigned session ID for the given real session
// ID. It returns both the uint64 and byte representation.
func getDBSessionID(sessionsBkt kvdb.RBucket, sessionID SessionID) (uint64,
[]byte, error) {
sessionBkt := sessionsBkt.NestedReadBucket(sessionID[:])
if sessionBkt == nil {
return 0, nil, ErrClientSessionNotFound
}
idBytes := sessionBkt.Get(cSessionDBID)
if len(idBytes) == 0 {
return 0, nil, fmt.Errorf("no db-assigned ID found for "+
"session ID %s", sessionID)
}
id, err := readBigSize(idBytes)
if err != nil {
return 0, nil, err
}
return id, idBytes, nil
}
// writeBigSize will encode the given uint64 as a BigSize byte slice.
func writeBigSize(i uint64) ([]byte, error) {
var b bytes.Buffer