diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 6ebfd273b..850ef5d17 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -1860,9 +1860,10 @@ func (c *ClientDB) MarkChannelClosed(chanID lnwire.ChannelID, // isSessionClosable returns true if a session is considered closable. A session // is considered closable only if all the following points are true: -// 1) It has no un-acked updates. -// 2) It is exhausted (ie it can't accept any more updates) -// 3) All the channels that it has acked updates for are closed. +// 1. It has no un-acked updates. +// 2. It is exhausted (ie it can't accept any more updates) OR it has been +// marked as terminal. +// 3. All the channels that it has acked updates for are closed. func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket, id *SessionID) (bool, error) { @@ -1893,10 +1894,14 @@ func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket, return false, err } + isTerminal := session.Status == CSessionTerminal + // We have already checked that the session has no more committed - // updates. So now we can check if the session is exhausted. - if session.SeqNum < session.Policy.MaxUpdates { - // If the session is not yet exhausted, it is not yet closable. + // updates. So now we can check if the session is exhausted or has a + // terminal state. + if !isTerminal && session.SeqNum < session.Policy.MaxUpdates { + // If the session is not yet exhausted, and it is not yet in a + // terminal state then it is not yet closable. return false, nil } @@ -1916,11 +1921,16 @@ func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket, } } - // If the session has no acked-updates, then something is wrong since - // the above check ensures that this session has been exhausted meaning - // that it should have MaxUpdates acked updates. ackedRangeBkt := sessBkt.NestedReadBucket(cSessionAckRangeIndex) if ackedRangeBkt == nil { + if isTerminal { + return true, nil + } + + // If the session has no acked-updates, and it is not in a + // terminal state then something is wrong since the above check + // ensures that this session has been exhausted meaning that it + // should have MaxUpdates acked updates. return false, fmt.Errorf("no acked-updates found for "+ "exhausted session %s", id) } @@ -2068,7 +2078,6 @@ func (c *ClientDB) CommitUpdate(id *SessionID, lastApplied = session.TowerLastApplied return nil - }, func() { lastApplied = 0 }) @@ -2334,6 +2343,20 @@ func (c *ClientDB) DeleteCommittedUpdates(id *SessionID) error { return err } + session, err := getClientSessionBody(sessions, id[:]) + if err != nil { + return err + } + + // Once we delete a committed update from the session, the + // SeqNum of the session will be incorrect and so the session + // should be marked as terminal. + session.Status = CSessionTerminal + err = putClientSessionBody(sessionBkt, session) + if err != nil { + return err + } + // Delete all the committed updates in one go by deleting the // session commits bucket. return sessionBkt.DeleteNestedBucket(cSessionCommits) diff --git a/watchtower/wtdb/client_db_test.go b/watchtower/wtdb/client_db_test.go index 4619695d0..489c6c016 100644 --- a/watchtower/wtdb/client_db_test.go +++ b/watchtower/wtdb/client_db_test.go @@ -1002,6 +1002,44 @@ func testMarkChannelClosed(h *clientDBHarness) { // Assert that we now can delete the session. h.deleteSession(session1.ID, nil) require.Empty(h.t, h.listClosableSessions(nil)) + + // We also want to test that a session can be deleted if it has been + // marked as terminal. + + // Create session2 with MaxUpdates set to 5. + session2 := h.randSession(h.t, tower.ID, 5) + h.insertSession(session2, nil) + + // Create and register channel 7. + chanID7 := randChannelID(h.t) + h.registerChan(chanID7, nil, nil) + + // Add two updates for channel 7 in session 2. Ack one of them so that + // the mapping from this channel to this session is created but don't + // ack the other since we want to delete the committed update later. + update = randCommittedUpdateForChannel(h.t, chanID7, 1) + h.commitUpdate(&session2.ID, update, nil) + h.ackUpdate(&session2.ID, 1, 1, nil) + + update = randCommittedUpdateForChannel(h.t, chanID7, 2) + h.commitUpdate(&session2.ID, update, nil) + + // Check that attempting to delete the session will fail since it is not + // yet considered closable. + h.deleteSession(session2.ID, wtdb.ErrSessionNotClosable) + + // Now delete the added committed updates. This should put the + // session in the terminal state after which we should be able to + // delete the session. + h.deleteCommittedUpdates(&session2.ID, nil) + + // Marking channel 7 as closed will now return session 2 since it has + // been marked as terminal. + sl = h.markChannelClosed(chanID7, 1, nil) + require.ElementsMatch(h.t, sl, []wtdb.SessionID{session2.ID}) + + // We should now be able to delete the session. + h.deleteSession(session2.ID, nil) } // testAckUpdate asserts the behavior of AckUpdate.