watchtower: build channel to sessionIDs index

In this commit, a migration is added that adds an index from channel to
sessionIDs (using the DB-assigned session IDs). This will make it easier
in future to know which sessions have updates for which channels.
This commit is contained in:
Elle Mouton
2023-03-09 10:30:12 +02:00
parent b16df45076
commit ee0353dd24
7 changed files with 485 additions and 1 deletions

View File

@@ -23,8 +23,13 @@ var (
// cChanDetailsBkt is a top-level bucket storing:
// channel-id => cChannelSummary -> encoded ClientChanSummary.
// => cChanDBID -> db-assigned-id
// => cChanSessions => db-session-id -> 1
cChanDetailsBkt = []byte("client-channel-detail-bucket")
// cChanSessions is a sub-bucket of cChanDetailsBkt which stores:
// db-session-id -> 1
cChanSessions = []byte("client-channel-sessions")
// cChanDBID is a key used in the cChanDetailsBkt to store the
// db-assigned-id of a channel.
cChanDBID = []byte("client-channel-db-id")
@@ -1454,7 +1459,7 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
return ErrUninitializedDB
}
chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
if chanDetailsBkt == nil {
return ErrUninitializedDB
}
@@ -1538,6 +1543,23 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
return err
}
dbSessionID, _, err := getDBSessionID(sessions, *id)
if err != nil {
return err
}
chanDetails := chanDetailsBkt.NestedReadWriteBucket(
committedUpdate.BackupID.ChanID[:],
)
if chanDetails == nil {
return ErrChannelNotRegistered
}
err = putChannelToSessionMapping(chanDetails, dbSessionID)
if err != nil {
return err
}
// Get the range index for the given session-channel pair.
index, err := c.getRangeIndex(tx, *id, chanID)
if err != nil {
@@ -1548,6 +1570,26 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
}, func() {})
}
// putChannelToSessionMapping adds the given session ID to a channel's
// cChanSessions bucket.
func putChannelToSessionMapping(chanDetails kvdb.RwBucket,
dbSessID uint64) error {
chanSessIDsBkt, err := chanDetails.CreateBucketIfNotExists(
cChanSessions,
)
if err != nil {
return err
}
b, err := writeBigSize(dbSessID)
if err != nil {
return err
}
return chanSessIDsBkt.Put(b, []byte{1})
}
// getClientSessionBody loads the body of a ClientSession from the sessions
// bucket corresponding to the serialized session id. This does not deserialize
// the CommittedUpdates, AckUpdates or the Tower associated with the session.