diff --git a/docs/release-notes/release-notes-0.18.0.md b/docs/release-notes/release-notes-0.18.0.md index f59bfee7a..b3a388052 100644 --- a/docs/release-notes/release-notes-0.18.0.md +++ b/docs/release-notes/release-notes-0.18.0.md @@ -62,6 +62,9 @@ protects against the case where htlcs are added asynchronously resulting in stuck channels. +* [Properly handle un-acked updates for exhausted watchtower + sessions](https://github.com/lightningnetwork/lnd/pull/8233) + # New Features ## Functional Enhancements diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 24d974ced..fa76ebd58 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -66,10 +66,14 @@ func (c *client) genSessionFilter( } // ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter -// function that will filter out any sessions that have been exhausted. -func ExhaustedSessionFilter() wtdb.ClientSessionFilterFn { - return func(session *wtdb.ClientSession) bool { - return session.SeqNum < session.Policy.MaxUpdates +// function that will filter out any sessions that have been exhausted. A +// session is considered exhausted only if it has no un-acked updates and the +// sequence number of the session is equal to the max updates of the session +// policy. +func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn { + return func(session *wtdb.ClientSession, numUnAcked uint16) bool { + return session.SeqNum < session.Policy.MaxUpdates || + numUnAcked > 0 } } diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 95e2664eb..c2380dbff 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -2417,6 +2417,104 @@ var clientTests = []clientTest{ server2.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, + { + // Previously we would not load a session into memory if its + // seq num was equal to it's max-updates. This meant that we + // would then not properly handle any committed updates for the + // session meaning that we would then not remove the tower if + // needed. This test demonstrates that this has been fixed. + name: "can remove tower with an un-acked update in " + + "an exhausted session after a restart", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: defaultTxPolicy, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Generate numUpdates retributions. + hints := h.advanceChannelN(chanID, numUpdates) + + // Back up all but one of the updates so that the + // session is almost full. + h.backupStates(chanID, 0, numUpdates-1, nil) + + // Wait for the updates to be populated in the server's + // database. + h.server.waitForUpdates(hints[:numUpdates-1], waitTime) + + // Now stop the server and restart it with the + // NoAckUpdates set to true. + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = true + }) + + // Back up the remaining task. This will bind the + // backup task to the session with the server. The + // client will also attempt to get the ack for one + // update which will cause a CommittedUpdate to be + // persisted which will also mean that the SeqNum of the + // session is now equal to MaxUpdates of the session + // policy. + h.backupStates(chanID, numUpdates-1, numUpdates, nil) + + tower, err := h.clientDB.LoadTower( + h.server.addr.IdentityKey, + ) + require.NoError(h.t, err) + + // Wait till the updates have been persisted. + err = wait.Predicate(func() bool { + var numCommittedUpdates int + countUpdates := func(_ *wtdb.ClientSession, + update *wtdb.CommittedUpdate) { + + numCommittedUpdates++ + } + + _, err := h.clientDB.ListClientSessions( + &tower.ID, wtdb.WithPerCommittedUpdate( + countUpdates, + ), + ) + require.NoError(h.t, err) + + return numCommittedUpdates == 1 + + }, waitTime) + require.NoError(h.t, err) + + // Now restart the client. On restart, the previous + // session should still be loaded even though it is + // exhausted since it has an un-acked update. + require.NoError(h.t, h.clientMgr.Stop()) + h.startClient() + + // Now remove the tower. + err = h.clientMgr.RemoveTower( + h.server.addr.IdentityKey, nil, + ) + require.NoError(h.t, err) + + // Add a new tower. + server2 := newServerHarness( + h.t, h.net, towerAddr2Str, nil, + ) + server2.start() + h.addTower(server2.addr) + + // Now we assert that the backups are backed up to the + // new tower. + server2.waitForUpdates(hints[numUpdates-1:], waitTime) + }, + }, { // This test shows that if a channel is closed while an update // for that channel still exists in an in-memory queue diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 635c6cfa8..3a8eb6725 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -2305,6 +2305,12 @@ func getClientSessionBody(sessions kvdb.RBucket, // that read sessions from the DB. type ClientSessionFilterFn func(*ClientSession) bool +// ClientSessWithNumCommittedUpdatesFilterFn describes the signature of a +// callback function that can be used to filter out a session based on the +// contents of ClientSession along with the number of un-acked committed updates +// that the session has. +type ClientSessWithNumCommittedUpdatesFilterFn func(*ClientSession, uint16) bool + // PerMaxHeightCB describes the signature of a callback function that can be // called for each channel that a session has updates for to communicate the // maximum commitment height that the session has backed up for the channel. @@ -2366,7 +2372,7 @@ type ClientSessionListCfg struct { // functions in ClientSessionListCfg. If a session fails this filter // function then all it means is that it won't be included in the list // of sessions to return. - PostEvaluateFilterFn ClientSessionFilterFn + PostEvaluateFilterFn ClientSessWithNumCommittedUpdatesFilterFn } // NewClientSessionCfg constructs a new ClientSessionListCfg. @@ -2427,7 +2433,9 @@ func WithPreEvalFilterFn(fn ClientSessionFilterFn) ClientSessionListOption { // run against the other ClientSessionListCfg call-backs) whereas the session // will only reach the PostEvalFilterFn call-back once it has already been // evaluated by all the other call-backs. -func WithPostEvalFilterFn(fn ClientSessionFilterFn) ClientSessionListOption { +func WithPostEvalFilterFn( + fn ClientSessWithNumCommittedUpdatesFilterFn) ClientSessionListOption { + return func(cfg *ClientSessionListCfg) { cfg.PostEvaluateFilterFn = fn } @@ -2459,7 +2467,7 @@ func (c *ClientDB) getClientSession(sessionsBkt, chanIDIndexBkt kvdb.RBucket, // Pass the session's committed (un-acked) updates through the call-back // if one is provided. - err = filterClientSessionCommits( + numCommittedUpdates, err := filterClientSessionCommits( sessionBkt, session, cfg.PerCommittedUpdate, ) if err != nil { @@ -2477,7 +2485,7 @@ func (c *ClientDB) getClientSession(sessionsBkt, chanIDIndexBkt kvdb.RBucket, } if cfg.PostEvaluateFilterFn != nil && - !cfg.PostEvaluateFilterFn(session) { + !cfg.PostEvaluateFilterFn(session, numCommittedUpdates) { return nil, ErrSessionFailedFilterFn } @@ -2586,18 +2594,21 @@ func (c *ClientDB) filterClientSessionAcks(sessionBkt, // identified by the serialized session id and passes them to the given // PerCommittedUpdateCB callback. func filterClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession, - cb PerCommittedUpdateCB) error { - - if cb == nil { - return nil - } + cb PerCommittedUpdateCB) (uint16, error) { sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits) if sessionCommits == nil { - return nil + return 0, nil } + var numUpdates uint16 err := sessionCommits.ForEach(func(k, v []byte) error { + numUpdates++ + + if cb == nil { + return nil + } + var committedUpdate CommittedUpdate err := committedUpdate.Decode(bytes.NewReader(v)) if err != nil { @@ -2606,13 +2617,14 @@ func filterClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession, committedUpdate.SeqNum = byteOrder.Uint16(k) cb(s, &committedUpdate) + return nil }) if err != nil { - return err + return 0, err } - return nil + return numUpdates, nil } // putClientSessionBody stores the body of the ClientSession (everything but the