wtclient: demo un-acked update of closed channel bug

This commit adds a new test to the tower client to demonstrate a bug
that can happen if a channel is closed while an update for it has yet to
be acked by the tower server. This will be fixed in an upcomming commit.
This commit is contained in:
Elle Mouton 2023-09-13 12:33:51 +02:00
parent ff0d8fc619
commit adb87dcfb8
No known key found for this signature in database
GPG Key ID: D7D916376026F177

View File

@ -2416,6 +2416,144 @@ var clientTests = []clientTest{
server2.waitForUpdates(hints[numUpdates/2:], waitTime)
},
},
{
// This test demonstrates a bug that will be addressed in a
// follow-up commit. It shows that if a channel is closed while
// an update for that channel still exists in an in-memory queue
// somewhere then it is possible that all the data for that
// channel gets deleted from the tower client DB. This results
// in an error being thrown in the DB AckUpdate method since it
// will try to find the associated channel data but will not
// find it.
name: "channel closed while update is un-acked",
cfg: harnessCfg{
localBalance: localBalance,
remoteBalance: remoteBalance,
policy: wtpolicy.Policy{
TxPolicy: defaultTxPolicy,
MaxUpdates: 5,
},
},
fn: func(h *testHarness) {
const (
numUpdates = 10
chanIDInt = 0
)
h.sendUpdatesOn = true
// Advance the channel with a few updates.
hints := h.advanceChannelN(chanIDInt, numUpdates)
// Backup a few these updates and wait for them to
// arrive at the server. Note that we back up enough
// updates to saturate the session so that the session
// is considered closable when the channel is deleted.
h.backupStates(chanIDInt, 0, numUpdates/2, nil)
h.server.waitForUpdates(hints[:numUpdates/2], waitTime)
// Now, restart the server in a state where it will not
// ack updates. This will allow us to wait for an
// update to be un-acked and persisted.
h.server.restart(func(cfg *wtserver.Config) {
cfg.NoAckUpdates = true
})
// Backup a few more of the update. These should remain
// in the client as un-acked.
h.backupStates(
chanIDInt, numUpdates/2, numUpdates-1, nil,
)
// Wait for the tasks to be bound to sessions.
fetchSessions := h.clientDB.FetchSessionCommittedUpdates
err := wait.Predicate(func() bool {
sessions, err := h.clientDB.ListClientSessions(
nil,
)
require.NoError(h.t, err)
var updates []wtdb.CommittedUpdate
for id := range sessions {
updates, err = fetchSessions(&id)
require.NoError(h.t, err)
if len(updates) != numUpdates-1 {
return true
}
}
return false
}, waitTime)
require.NoError(h.t, err)
// Now we close this channel while the update for it has
// not yet been acked.
h.closeChannel(chanIDInt, 1)
// Closable sessions should now be one.
err = wait.Predicate(func() bool {
cs, err := h.clientDB.ListClosableSessions()
require.NoError(h.t, err)
return len(cs) == 1
}, waitTime)
require.NoError(h.t, err)
// Now, restart the server and allow it to ack updates
// again.
h.server.restart(func(cfg *wtserver.Config) {
cfg.NoAckUpdates = false
})
// Mine a few blocks so that the session close range is
// surpassed.
h.mine(3)
// Wait for there to be no more closable sessions on the
// client side.
err = wait.Predicate(func() bool {
cs, err := h.clientDB.ListClosableSessions()
require.NoError(h.t, err)
return len(cs) == 0
}, waitTime)
require.NoError(h.t, err)
// Wait for channel to be "unregistered".
chanID := chanIDFromInt(chanIDInt)
err = wait.Predicate(func() bool {
err := h.client.BackupState(&chanID, 0)
return errors.Is(
err, wtclient.ErrUnregisteredChannel,
)
}, waitTime)
require.NoError(h.t, err)
// Show that the committed update for the closed channel
// remains in the client's DB.
err = wait.Predicate(func() bool {
sessions, err := h.clientDB.ListClientSessions(
nil,
)
require.NoError(h.t, err)
var updates []wtdb.CommittedUpdate
for id := range sessions {
updates, err = fetchSessions(&id)
require.NoError(h.t, err)
if len(updates) != 0 {
return true
}
}
return false
}, waitTime)
require.NoError(h.t, err)
},
},
}
// TestClient executes the client test suite, asserting the ability to backup