watchtower/wtdb: start populating channel max commitment

In this commit, a new key, cChanMaxCommitmentHeight, is added to the
channel details bucket. This key will hold the highest commitment number
that the tower has been handed for this channel. In this commit, we
start writing to it in the two places where a backup is first persisted
in the tower client db: 1) CommitUpdate and 2) in the Queue's `addItem`
method. The logic for both 1 & 2 is tested in the next commit which adds
a DB helper that allows us to read the new field.

A follow up commit will do a migration to back-fill the new field.
This commit is contained in:
Elle Mouton 2023-11-23 15:33:05 +02:00
parent f005b248ce
commit 01ba2661db
No known key found for this signature in database
GPG Key ID: D7D916376026F177
2 changed files with 80 additions and 2 deletions

View File

@ -25,6 +25,7 @@ var (
// => cChanDBID -> db-assigned-id
// => cChanSessions => db-session-id -> 1
// => cChanClosedHeight -> block-height
// => cChanMaxCommitmentHeight -> commitment-height
cChanDetailsBkt = []byte("client-channel-detail-bucket")
// cChanSessions is a sub-bucket of cChanDetailsBkt which stores:
@ -45,6 +46,13 @@ var (
// body of ClientChanSummary.
cChannelSummary = []byte("client-channel-summary")
// cChanMaxCommitmentHeight is a key used in the cChanDetailsBkt used
// to store the highest commitment height for this channel that the
// tower has been handed.
cChanMaxCommitmentHeight = []byte(
"client-channel-max-commitment-height",
)
// cSessionBkt is a top-level bucket storing:
// session-id => cSessionBody -> encoded ClientSessionBody
// => cSessionDBID -> db-assigned-id
@ -1963,6 +1971,12 @@ func (c *ClientDB) CommitUpdate(id *SessionID,
return err
}
// Update the channel's max commitment height if needed.
err = maybeUpdateMaxCommitHeight(tx, update.BackupID)
if err != nil {
return err
}
// Finally, capture the session's last applied value so it can
// be sent in the next state update to the tower.
lastApplied = session.TowerLastApplied
@ -2178,9 +2192,11 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
// GetDBQueue returns a BackupID Queue instance under the given namespace.
func (c *ClientDB) GetDBQueue(namespace []byte) Queue[*BackupID] {
return NewQueueDB[*BackupID](
return NewQueueDB(
c.db, namespace, func() *BackupID {
return &BackupID{}
}, func(tx kvdb.RwTx, item *BackupID) error {
return maybeUpdateMaxCommitHeight(tx, *item)
},
)
}
@ -2720,6 +2736,58 @@ func getDBSessionID(sessionsBkt kvdb.RBucket, sessionID SessionID) (uint64,
return id, idBytes, nil
}
// maybeUpdateMaxCommitHeight updates the given channel details bucket with the
// given height if it is larger than the current max height stored for the
// channel.
func maybeUpdateMaxCommitHeight(tx kvdb.RwTx, backupID BackupID) error {
chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
if chanDetailsBkt == nil {
return ErrUninitializedDB
}
// If an entry for this channel does not exist in the channel details
// bucket then we exit here as this means that the channel has been
// closed.
chanDetails := chanDetailsBkt.NestedReadWriteBucket(backupID.ChanID[:])
if chanDetails == nil {
return nil
}
putHeight := func() error {
b, err := writeBigSize(backupID.CommitHeight)
if err != nil {
return err
}
return chanDetails.Put(
cChanMaxCommitmentHeight, b,
)
}
// Get current height.
heightBytes := chanDetails.Get(cChanMaxCommitmentHeight)
// The height might have not been set yet, in which case
// we can just write the new height.
if len(heightBytes) == 0 {
return putHeight()
}
// Otherwise, read in the current max commitment height for the channel.
currentHeight, err := readBigSize(heightBytes)
if err != nil {
return err
}
// If the new height is not larger than the current persisted height,
// then there is nothing left for us to do.
if backupID.CommitHeight <= currentHeight {
return nil
}
return putHeight()
}
func getRealSessionID(sessIDIndexBkt kvdb.RBucket, dbID uint64) (*SessionID,
error) {

View File

@ -80,6 +80,7 @@ type DiskQueueDB[T Serializable] struct {
db kvdb.Backend
topLevelBkt []byte
constructor func() T
onItemWrite func(tx kvdb.RwTx, item T) error
}
// A compile-time check to ensure that DiskQueueDB implements the Queue
@ -89,12 +90,14 @@ var _ Queue[Serializable] = (*DiskQueueDB[Serializable])(nil)
// NewQueueDB constructs a new DiskQueueDB. A queueBktName must be provided so
// that the DiskQueueDB can create its own namespace in the bolt db.
func NewQueueDB[T Serializable](db kvdb.Backend, queueBktName []byte,
constructor func() T) Queue[T] {
constructor func() T,
onItemWrite func(tx kvdb.RwTx, item T) error) Queue[T] {
return &DiskQueueDB[T]{
db: db,
topLevelBkt: queueBktName,
constructor: constructor,
onItemWrite: onItemWrite,
}
}
@ -279,6 +282,13 @@ func (d *DiskQueueDB[T]) addItem(tx kvdb.RwTx, queueName []byte, item T) error {
return err
}
if d.onItemWrite != nil {
err = d.onItemWrite(tx, item)
if err != nil {
return err
}
}
// Find the index to use for placing this new item at the back of the
// queue.
var nextIndex uint64