Merge pull request #8233 from ellemouton/unackedUpdatesBug

wtclient: handle un-acked updates for exhausted sessions
This commit is contained in:
Elle 2024-01-12 09:49:20 +02:00 committed by GitHub
commit 2824fe27d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 133 additions and 16 deletions

View File

@ -62,6 +62,9 @@
protects against the case where htlcs are added asynchronously resulting in
stuck channels.
* [Properly handle un-acked updates for exhausted watchtower
sessions](https://github.com/lightningnetwork/lnd/pull/8233)
# New Features
## Functional Enhancements

View File

@ -66,10 +66,14 @@ func (c *client) genSessionFilter(
}
// ExhaustedSessionFilter constructs a wtdb.ClientSessionFilterFn filter
// function that will filter out any sessions that have been exhausted.
func ExhaustedSessionFilter() wtdb.ClientSessionFilterFn {
return func(session *wtdb.ClientSession) bool {
return session.SeqNum < session.Policy.MaxUpdates
// function that will filter out any sessions that have been exhausted. A
// session is considered exhausted only if it has no un-acked updates and the
// sequence number of the session is equal to the max updates of the session
// policy.
func ExhaustedSessionFilter() wtdb.ClientSessWithNumCommittedUpdatesFilterFn {
return func(session *wtdb.ClientSession, numUnAcked uint16) bool {
return session.SeqNum < session.Policy.MaxUpdates ||
numUnAcked > 0
}
}

View File

@ -2417,6 +2417,104 @@ var clientTests = []clientTest{
server2.waitForUpdates(hints[numUpdates/2:], waitTime)
},
},
{
// Previously we would not load a session into memory if its
// seq num was equal to it's max-updates. This meant that we
// would then not properly handle any committed updates for the
// session meaning that we would then not remove the tower if
// needed. This test demonstrates that this has been fixed.
name: "can remove tower with an un-acked update in " +
"an exhausted session after a restart",
cfg: harnessCfg{
localBalance: localBalance,
remoteBalance: remoteBalance,
policy: wtpolicy.Policy{
TxPolicy: defaultTxPolicy,
MaxUpdates: 5,
},
},
fn: func(h *testHarness) {
const (
numUpdates = 5
chanID = 0
)
// Generate numUpdates retributions.
hints := h.advanceChannelN(chanID, numUpdates)
// Back up all but one of the updates so that the
// session is almost full.
h.backupStates(chanID, 0, numUpdates-1, nil)
// Wait for the updates to be populated in the server's
// database.
h.server.waitForUpdates(hints[:numUpdates-1], waitTime)
// Now stop the server and restart it with the
// NoAckUpdates set to true.
h.server.restart(func(cfg *wtserver.Config) {
cfg.NoAckUpdates = true
})
// Back up the remaining task. This will bind the
// backup task to the session with the server. The
// client will also attempt to get the ack for one
// update which will cause a CommittedUpdate to be
// persisted which will also mean that the SeqNum of the
// session is now equal to MaxUpdates of the session
// policy.
h.backupStates(chanID, numUpdates-1, numUpdates, nil)
tower, err := h.clientDB.LoadTower(
h.server.addr.IdentityKey,
)
require.NoError(h.t, err)
// Wait till the updates have been persisted.
err = wait.Predicate(func() bool {
var numCommittedUpdates int
countUpdates := func(_ *wtdb.ClientSession,
update *wtdb.CommittedUpdate) {
numCommittedUpdates++
}
_, err := h.clientDB.ListClientSessions(
&tower.ID, wtdb.WithPerCommittedUpdate(
countUpdates,
),
)
require.NoError(h.t, err)
return numCommittedUpdates == 1
}, waitTime)
require.NoError(h.t, err)
// Now restart the client. On restart, the previous
// session should still be loaded even though it is
// exhausted since it has an un-acked update.
require.NoError(h.t, h.clientMgr.Stop())
h.startClient()
// Now remove the tower.
err = h.clientMgr.RemoveTower(
h.server.addr.IdentityKey, nil,
)
require.NoError(h.t, err)
// Add a new tower.
server2 := newServerHarness(
h.t, h.net, towerAddr2Str, nil,
)
server2.start()
h.addTower(server2.addr)
// Now we assert that the backups are backed up to the
// new tower.
server2.waitForUpdates(hints[numUpdates-1:], waitTime)
},
},
{
// This test shows that if a channel is closed while an update
// for that channel still exists in an in-memory queue

View File

@ -2305,6 +2305,12 @@ func getClientSessionBody(sessions kvdb.RBucket,
// that read sessions from the DB.
type ClientSessionFilterFn func(*ClientSession) bool
// ClientSessWithNumCommittedUpdatesFilterFn describes the signature of a
// callback function that can be used to filter out a session based on the
// contents of ClientSession along with the number of un-acked committed updates
// that the session has.
type ClientSessWithNumCommittedUpdatesFilterFn func(*ClientSession, uint16) bool
// PerMaxHeightCB describes the signature of a callback function that can be
// called for each channel that a session has updates for to communicate the
// maximum commitment height that the session has backed up for the channel.
@ -2366,7 +2372,7 @@ type ClientSessionListCfg struct {
// functions in ClientSessionListCfg. If a session fails this filter
// function then all it means is that it won't be included in the list
// of sessions to return.
PostEvaluateFilterFn ClientSessionFilterFn
PostEvaluateFilterFn ClientSessWithNumCommittedUpdatesFilterFn
}
// NewClientSessionCfg constructs a new ClientSessionListCfg.
@ -2427,7 +2433,9 @@ func WithPreEvalFilterFn(fn ClientSessionFilterFn) ClientSessionListOption {
// run against the other ClientSessionListCfg call-backs) whereas the session
// will only reach the PostEvalFilterFn call-back once it has already been
// evaluated by all the other call-backs.
func WithPostEvalFilterFn(fn ClientSessionFilterFn) ClientSessionListOption {
func WithPostEvalFilterFn(
fn ClientSessWithNumCommittedUpdatesFilterFn) ClientSessionListOption {
return func(cfg *ClientSessionListCfg) {
cfg.PostEvaluateFilterFn = fn
}
@ -2459,7 +2467,7 @@ func (c *ClientDB) getClientSession(sessionsBkt, chanIDIndexBkt kvdb.RBucket,
// Pass the session's committed (un-acked) updates through the call-back
// if one is provided.
err = filterClientSessionCommits(
numCommittedUpdates, err := filterClientSessionCommits(
sessionBkt, session, cfg.PerCommittedUpdate,
)
if err != nil {
@ -2477,7 +2485,7 @@ func (c *ClientDB) getClientSession(sessionsBkt, chanIDIndexBkt kvdb.RBucket,
}
if cfg.PostEvaluateFilterFn != nil &&
!cfg.PostEvaluateFilterFn(session) {
!cfg.PostEvaluateFilterFn(session, numCommittedUpdates) {
return nil, ErrSessionFailedFilterFn
}
@ -2586,18 +2594,21 @@ func (c *ClientDB) filterClientSessionAcks(sessionBkt,
// identified by the serialized session id and passes them to the given
// PerCommittedUpdateCB callback.
func filterClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession,
cb PerCommittedUpdateCB) error {
if cb == nil {
return nil
}
cb PerCommittedUpdateCB) (uint16, error) {
sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits)
if sessionCommits == nil {
return nil
return 0, nil
}
var numUpdates uint16
err := sessionCommits.ForEach(func(k, v []byte) error {
numUpdates++
if cb == nil {
return nil
}
var committedUpdate CommittedUpdate
err := committedUpdate.Decode(bytes.NewReader(v))
if err != nil {
@ -2606,13 +2617,14 @@ func filterClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession,
committedUpdate.SeqNum = byteOrder.Uint16(k)
cb(s, &committedUpdate)
return nil
})
if err != nil {
return err
return 0, err
}
return nil
return numUpdates, nil
}
// putClientSessionBody stores the body of the ClientSession (everything but the