wtdb: let DeleteCommitedUpdates move session to terminal

In this commit, we adjust the DeleteCommitmentUpdate method so that it
marks a session as Terminal (if there are updates to delete) since once
we have deleted a commitment update from a session - the session is no
longer useable.
This commit is contained in:
Elle Mouton
2023-11-27 16:30:53 +02:00
parent 5cb8c8df7e
commit cbf08940ca
2 changed files with 71 additions and 10 deletions

View File

@@ -1860,9 +1860,10 @@ func (c *ClientDB) MarkChannelClosed(chanID lnwire.ChannelID,
// isSessionClosable returns true if a session is considered closable. A session
// is considered closable only if all the following points are true:
// 1) It has no un-acked updates.
// 2) It is exhausted (ie it can't accept any more updates)
// 3) All the channels that it has acked updates for are closed.
// 1. It has no un-acked updates.
// 2. It is exhausted (ie it can't accept any more updates) OR it has been
// marked as terminal.
// 3. All the channels that it has acked updates for are closed.
func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket,
id *SessionID) (bool, error) {
@@ -1893,10 +1894,14 @@ func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket,
return false, err
}
isTerminal := session.Status == CSessionTerminal
// We have already checked that the session has no more committed
// updates. So now we can check if the session is exhausted.
if session.SeqNum < session.Policy.MaxUpdates {
// If the session is not yet exhausted, it is not yet closable.
// updates. So now we can check if the session is exhausted or has a
// terminal state.
if !isTerminal && session.SeqNum < session.Policy.MaxUpdates {
// If the session is not yet exhausted, and it is not yet in a
// terminal state then it is not yet closable.
return false, nil
}
@@ -1916,11 +1921,16 @@ func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket,
}
}
// If the session has no acked-updates, then something is wrong since
// the above check ensures that this session has been exhausted meaning
// that it should have MaxUpdates acked updates.
ackedRangeBkt := sessBkt.NestedReadBucket(cSessionAckRangeIndex)
if ackedRangeBkt == nil {
if isTerminal {
return true, nil
}
// If the session has no acked-updates, and it is not in a
// terminal state then something is wrong since the above check
// ensures that this session has been exhausted meaning that it
// should have MaxUpdates acked updates.
return false, fmt.Errorf("no acked-updates found for "+
"exhausted session %s", id)
}
@@ -2068,7 +2078,6 @@ func (c *ClientDB) CommitUpdate(id *SessionID,
lastApplied = session.TowerLastApplied
return nil
}, func() {
lastApplied = 0
})
@@ -2334,6 +2343,20 @@ func (c *ClientDB) DeleteCommittedUpdates(id *SessionID) error {
return err
}
session, err := getClientSessionBody(sessions, id[:])
if err != nil {
return err
}
// Once we delete a committed update from the session, the
// SeqNum of the session will be incorrect and so the session
// should be marked as terminal.
session.Status = CSessionTerminal
err = putClientSessionBody(sessionBkt, session)
if err != nil {
return err
}
// Delete all the committed updates in one go by deleting the
// session commits bucket.
return sessionBkt.DeleteNestedBucket(cSessionCommits)

View File

@@ -1002,6 +1002,44 @@ func testMarkChannelClosed(h *clientDBHarness) {
// Assert that we now can delete the session.
h.deleteSession(session1.ID, nil)
require.Empty(h.t, h.listClosableSessions(nil))
// We also want to test that a session can be deleted if it has been
// marked as terminal.
// Create session2 with MaxUpdates set to 5.
session2 := h.randSession(h.t, tower.ID, 5)
h.insertSession(session2, nil)
// Create and register channel 7.
chanID7 := randChannelID(h.t)
h.registerChan(chanID7, nil, nil)
// Add two updates for channel 7 in session 2. Ack one of them so that
// the mapping from this channel to this session is created but don't
// ack the other since we want to delete the committed update later.
update = randCommittedUpdateForChannel(h.t, chanID7, 1)
h.commitUpdate(&session2.ID, update, nil)
h.ackUpdate(&session2.ID, 1, 1, nil)
update = randCommittedUpdateForChannel(h.t, chanID7, 2)
h.commitUpdate(&session2.ID, update, nil)
// Check that attempting to delete the session will fail since it is not
// yet considered closable.
h.deleteSession(session2.ID, wtdb.ErrSessionNotClosable)
// Now delete the added committed updates. This should put the
// session in the terminal state after which we should be able to
// delete the session.
h.deleteCommittedUpdates(&session2.ID, nil)
// Marking channel 7 as closed will now return session 2 since it has
// been marked as terminal.
sl = h.markChannelClosed(chanID7, 1, nil)
require.ElementsMatch(h.t, sl, []wtdb.SessionID{session2.ID})
// We should now be able to delete the session.
h.deleteSession(session2.ID, nil)
}
// testAckUpdate asserts the behavior of AckUpdate.