From adb87dcfb8fe985b192d71b7c899f046af89eb86 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 13 Sep 2023 12:33:51 +0200 Subject: [PATCH] wtclient: demo un-acked update of closed channel bug This commit adds a new test to the tower client to demonstrate a bug that can happen if a channel is closed while an update for it has yet to be acked by the tower server. This will be fixed in an upcomming commit. --- watchtower/wtclient/client_test.go | 138 +++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index a5b774c10..4f375f487 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -2416,6 +2416,144 @@ var clientTests = []clientTest{ server2.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, + { + // This test demonstrates a bug that will be addressed in a + // follow-up commit. It shows that if a channel is closed while + // an update for that channel still exists in an in-memory queue + // somewhere then it is possible that all the data for that + // channel gets deleted from the tower client DB. This results + // in an error being thrown in the DB AckUpdate method since it + // will try to find the associated channel data but will not + // find it. + name: "channel closed while update is un-acked", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: defaultTxPolicy, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 10 + chanIDInt = 0 + ) + + h.sendUpdatesOn = true + + // Advance the channel with a few updates. + hints := h.advanceChannelN(chanIDInt, numUpdates) + + // Backup a few these updates and wait for them to + // arrive at the server. Note that we back up enough + // updates to saturate the session so that the session + // is considered closable when the channel is deleted. + h.backupStates(chanIDInt, 0, numUpdates/2, nil) + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) + + // Now, restart the server in a state where it will not + // ack updates. This will allow us to wait for an + // update to be un-acked and persisted. + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = true + }) + + // Backup a few more of the update. These should remain + // in the client as un-acked. + h.backupStates( + chanIDInt, numUpdates/2, numUpdates-1, nil, + ) + + // Wait for the tasks to be bound to sessions. + fetchSessions := h.clientDB.FetchSessionCommittedUpdates + err := wait.Predicate(func() bool { + sessions, err := h.clientDB.ListClientSessions( + nil, + ) + require.NoError(h.t, err) + + var updates []wtdb.CommittedUpdate + for id := range sessions { + updates, err = fetchSessions(&id) + require.NoError(h.t, err) + + if len(updates) != numUpdates-1 { + return true + } + } + + return false + }, waitTime) + require.NoError(h.t, err) + + // Now we close this channel while the update for it has + // not yet been acked. + h.closeChannel(chanIDInt, 1) + + // Closable sessions should now be one. + err = wait.Predicate(func() bool { + cs, err := h.clientDB.ListClosableSessions() + require.NoError(h.t, err) + + return len(cs) == 1 + }, waitTime) + require.NoError(h.t, err) + + // Now, restart the server and allow it to ack updates + // again. + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = false + }) + + // Mine a few blocks so that the session close range is + // surpassed. + h.mine(3) + + // Wait for there to be no more closable sessions on the + // client side. + err = wait.Predicate(func() bool { + cs, err := h.clientDB.ListClosableSessions() + require.NoError(h.t, err) + + return len(cs) == 0 + }, waitTime) + require.NoError(h.t, err) + + // Wait for channel to be "unregistered". + chanID := chanIDFromInt(chanIDInt) + err = wait.Predicate(func() bool { + err := h.client.BackupState(&chanID, 0) + + return errors.Is( + err, wtclient.ErrUnregisteredChannel, + ) + }, waitTime) + require.NoError(h.t, err) + + // Show that the committed update for the closed channel + // remains in the client's DB. + err = wait.Predicate(func() bool { + sessions, err := h.clientDB.ListClientSessions( + nil, + ) + require.NoError(h.t, err) + + var updates []wtdb.CommittedUpdate + for id := range sessions { + updates, err = fetchSessions(&id) + require.NoError(h.t, err) + + if len(updates) != 0 { + return true + } + } + + return false + }, waitTime) + require.NoError(h.t, err) + }, + }, } // TestClient executes the client test suite, asserting the ability to backup