From 2ed5788de319c935b72bfbf5a5f2b67ffdc42482 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 23 Mar 2023 09:33:31 +0200 Subject: [PATCH 01/13] wtclient: separate server from testHarness In this commit, a serverHarness is added to the test framework. This will allow us to create more mock servers appart from the main server. --- watchtower/wtclient/client_test.go | 286 +++++++++++++++++------------ 1 file changed, 165 insertions(+), 121 deletions(-) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 8dc956a2c..771f60a8d 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -38,6 +38,8 @@ import ( const ( towerAddrStr = "18.28.243.2:9911" towerAddr2Str = "19.29.244.3:9912" + + timeout = 200 * time.Millisecond ) var ( @@ -392,18 +394,15 @@ func (c *mockChannel) getState( } type testHarness struct { - t *testing.T - cfg harnessCfg - signer *wtmock.MockSigner - capacity lnwire.MilliSatoshi - clientDB *wtmock.ClientDB - clientCfg *wtclient.Config - client wtclient.Client - serverAddr *lnwire.NetAddress - serverDB *wtmock.TowerDB - serverCfg *wtserver.Config - server *wtserver.Server - net *mockNet + t *testing.T + cfg harnessCfg + signer *wtmock.MockSigner + capacity lnwire.MilliSatoshi + clientDB *wtmock.ClientDB + clientCfg *wtclient.Config + client wtclient.Client + server *serverHarness + net *mockNet blockEvents *mockBlockSub height int32 @@ -428,47 +427,23 @@ type harnessCfg struct { } func newHarness(t *testing.T, cfg harnessCfg) *testHarness { - towerTCPAddr, err := net.ResolveTCPAddr("tcp", towerAddrStr) - require.NoError(t, err, "Unable to resolve tower TCP addr") - - privKey, err := btcec.NewPrivateKey() - require.NoError(t, err, "Unable to generate tower private key") - privKeyECDH := &keychain.PrivKeyECDH{PrivKey: privKey} - - towerPubKey := privKey.PubKey() - - towerAddr := &lnwire.NetAddress{ - IdentityKey: towerPubKey, - Address: towerTCPAddr, - } - - const timeout = 200 * time.Millisecond - serverDB := wtmock.NewTowerDB() - - serverCfg := &wtserver.Config{ - DB: serverDB, - ReadTimeout: timeout, - WriteTimeout: timeout, - NodeKeyECDH: privKeyECDH, - NewAddress: func() (btcutil.Address, error) { - return addr, nil - }, - NoAckCreateSession: cfg.noAckCreateSession, - } - signer := wtmock.NewMockSigner() mockNet := newMockNet() clientDB := wtmock.NewClientDB() + server := newServerHarness( + t, mockNet, towerAddrStr, func(serverCfg *wtserver.Config) { + serverCfg.NoAckCreateSession = cfg.noAckCreateSession + }, + ) + h := &testHarness{ t: t, cfg: cfg, signer: signer, capacity: cfg.localBalance + cfg.remoteBalance, clientDB: clientDB, - serverAddr: towerAddr, - serverDB: serverDB, - serverCfg: serverCfg, + server: server, net: mockNet, blockEvents: newMockBlockSub(t), channelEvents: newMockSubscription(t), @@ -528,8 +503,8 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { } if !cfg.noServerStart { - h.startServer() - t.Cleanup(h.stopServer) + h.server.start() + t.Cleanup(h.server.stop) } h.startClient() @@ -553,29 +528,6 @@ func (h *testHarness) mine(numBlocks int) { } } -// startServer creates a new server using the harness's current serverCfg and -// starts it after pointing the mockNet's callback to the new server. -func (h *testHarness) startServer() { - h.t.Helper() - - var err error - h.server, err = wtserver.New(h.serverCfg) - require.NoError(h.t, err) - - h.net.registerConnCallback(h.serverAddr, h.server.InboundPeerConnected) - - require.NoError(h.t, h.server.Start()) -} - -// stopServer stops the main harness server. -func (h *testHarness) stopServer() { - h.t.Helper() - - h.net.removeConnCallback(h.serverAddr) - - require.NoError(h.t, h.server.Stop()) -} - // startClient creates a new server using the harness's current clientCf and // starts it. func (h *testHarness) startClient() { @@ -584,7 +536,7 @@ func (h *testHarness) startClient() { towerTCPAddr, err := net.ResolveTCPAddr("tcp", towerAddrStr) require.NoError(h.t, err) towerAddr := &lnwire.NetAddress{ - IdentityKey: h.serverCfg.NodeKeyECDH.PubKey(), + IdentityKey: h.server.cfg.NodeKeyECDH.PubKey(), Address: towerTCPAddr, } @@ -817,7 +769,7 @@ func (h *testHarness) waitServerUpdates(hints []blob.BreachHint, for { select { case <-time.After(time.Second): - matches, err := h.serverDB.QueryMatches(hints) + matches, err := h.server.db.QueryMatches(hints) require.NoError(h.t, err, "unable to query for hints") if wantUpdates && serverHasHints(matches) { @@ -830,7 +782,7 @@ func (h *testHarness) waitServerUpdates(hints []blob.BreachHint, } case <-failTimeout: - matches, err := h.serverDB.QueryMatches(hints) + matches, err := h.server.db.QueryMatches(hints) require.NoError(h.t, err, "unable to query for hints") require.Truef(h.t, serverHasHints(matches), "breach "+ "hints not received, only got %d/%d", @@ -847,7 +799,7 @@ func (h *testHarness) assertUpdatesForPolicy(hints []blob.BreachHint, expPolicy wtpolicy.Policy) { // Query for matches on the provided hints. - matches, err := h.serverDB.QueryMatches(hints) + matches, err := h.server.db.QueryMatches(hints) require.NoError(h.t, err) // Assert that the number of matches is exactly the number of provided @@ -996,6 +948,96 @@ func (m *mockBlockSub) sendNewBlock(height int32) { } } +// serverHarness represents a mock watchtower server. +type serverHarness struct { + t *testing.T + net *mockNet + cfg *wtserver.Config + addr *lnwire.NetAddress + db *wtmock.TowerDB + server *wtserver.Server +} + +// newServerHarness constructs a new mock watchtower server. +func newServerHarness(t *testing.T, mockNet *mockNet, netAddr string, + opt func(cfg *wtserver.Config)) *serverHarness { + + towerTCPAddr, err := net.ResolveTCPAddr("tcp", netAddr) + require.NoError(t, err, "Unable to resolve tower TCP addr") + + privKey, err := btcec.NewPrivateKey() + require.NoError(t, err, "Unable to generate tower private key") + + privKeyECDH := &keychain.PrivKeyECDH{PrivKey: privKey} + + towerPubKey := privKey.PubKey() + towerAddr := &lnwire.NetAddress{ + IdentityKey: towerPubKey, + Address: towerTCPAddr, + } + + db := wtmock.NewTowerDB() + cfg := &wtserver.Config{ + DB: db, + ReadTimeout: timeout, + WriteTimeout: timeout, + NodeKeyECDH: privKeyECDH, + NewAddress: func() (btcutil.Address, error) { + return addr, nil + }, + } + + if opt != nil { + opt(cfg) + } + + server, err := wtserver.New(cfg) + require.NoError(t, err, "unable to create wtserver") + + return &serverHarness{ + t: t, + net: mockNet, + cfg: cfg, + db: db, + addr: towerAddr, + server: server, + } +} + +// start creates a new server using the harness's current server cfg and starts +// it after registering its Dial callback with the mockNet. +func (s *serverHarness) start() { + s.t.Helper() + + var err error + s.server, err = wtserver.New(s.cfg) + require.NoError(s.t, err) + + s.net.registerConnCallback(s.addr, s.server.InboundPeerConnected) + require.NoError(s.t, s.server.Start()) +} + +// stop halts the server and removes its Dial callback from the mockNet. +func (s *serverHarness) stop() { + s.t.Helper() + + require.NoError(s.t, s.server.Stop()) + s.net.removeConnCallback(s.addr) +} + +// restart stops the server, applies any given config tweaks and then starts the +// server again. +func (s *serverHarness) restart(op func(cfg *wtserver.Config)) { + s.stop() + defer s.start() + + if op == nil { + return + } + + op(s.cfg) +} + const ( localBalance = lnwire.MilliSatoshi(100000000) remoteBalance = lnwire.MilliSatoshi(200000000) @@ -1161,9 +1203,9 @@ var clientTests = []clientTest{ // Now, restart the server and prevent it from acking // state updates. - h.stopServer() - h.serverCfg.NoAckUpdates = true - h.startServer() + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = true + }) // Send the next state update to the tower. Since the // tower isn't acking state updates, we expect this @@ -1181,9 +1223,9 @@ var clientTests = []clientTest{ // Restart the server and allow it to ack the updates // after the client retransmits the unacked update. - h.stopServer() - h.serverCfg.NoAckUpdates = false - h.startServer() + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = false + }) // Restart the client and allow it to process the // committed update. @@ -1228,9 +1270,9 @@ var clientTests = []clientTest{ // Restart the server and prevent it from acking state // updates. - h.stopServer() - h.serverCfg.NoAckUpdates = true - h.startServer() + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = true + }) // Now, queue the retributions for backup. h.backupStates(chanID, 0, numUpdates, nil) @@ -1242,9 +1284,9 @@ var clientTests = []clientTest{ // Restart the server and allow it to ack the updates // after the client retransmits the unacked updates. - h.stopServer() - h.serverCfg.NoAckUpdates = false - h.startServer() + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = false + }) // Wait for all the updates to be populated in the // server's database. @@ -1376,9 +1418,9 @@ var clientTests = []clientTest{ // Restart the server and allow it to ack session // creation. - h.stopServer() - h.serverCfg.NoAckCreateSession = false - h.startServer() + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckCreateSession = false + }) // Restart the client with the same policy, which will // immediately try to overwrite the old session with an @@ -1426,9 +1468,9 @@ var clientTests = []clientTest{ // Restart the server and allow it to ack session // creation. - h.stopServer() - h.serverCfg.NoAckCreateSession = false - h.startServer() + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckCreateSession = false + }) // Restart the client with a new policy, which will // immediately try to overwrite the prior session with @@ -1575,7 +1617,7 @@ var clientTests = []clientTest{ // Fully remove the tower, causing its existing sessions // to be marked inactive. - h.removeTower(h.serverAddr.IdentityKey, nil) + h.removeTower(h.server.addr.IdentityKey, nil) // Back up the remaining states. Since the tower has // been removed, it shouldn't receive any updates. @@ -1585,18 +1627,20 @@ var clientTests = []clientTest{ // Re-add the tower. We prevent the tower from acking // session creation to ensure the inactive sessions are // not used. - h.stopServer() - h.serverCfg.NoAckCreateSession = true - h.startServer() - h.addTower(h.serverAddr) + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckCreateSession = true + }) + + h.addTower(h.server.addr) h.waitServerUpdates(nil, time.Second) // Finally, allow the tower to ack session creation, // allowing the state updates to be sent through the new // session. - h.stopServer() - h.serverCfg.NoAckCreateSession = false - h.startServer() + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckCreateSession = false + }) + h.waitServerUpdates(hints[numUpdates/2:], waitTime) }, }, @@ -1630,9 +1674,9 @@ var clientTests = []clientTest{ // Now, restart the tower and prevent it from acking any // new sessions. We do this here as once the last slot // is exhausted the client will attempt to renegotiate. - h.stopServer() - h.serverCfg.NoAckCreateSession = true - h.startServer() + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckCreateSession = true + }) // Back up the remaining two states. Once the first is // processed, the session will be exhausted but the @@ -1647,7 +1691,7 @@ var clientTests = []clientTest{ // state to process. After the force quite delay // expires, the client should force quite itself and // allow the test to complete. - h.stopServer() + h.server.stop() }, }, { @@ -1680,7 +1724,7 @@ var clientTests = []clientTest{ h.waitServerUpdates(hints[:len(hints)/2], waitTime) // Stop the server. - h.stopServer() + h.server.stop() // Change the address of the server. towerTCPAddr, err := net.ResolveTCPAddr( @@ -1688,12 +1732,12 @@ var clientTests = []clientTest{ ) require.NoError(h.t, err) - oldAddr := h.serverAddr.Address + oldAddr := h.server.addr.Address towerAddr := &lnwire.NetAddress{ - IdentityKey: h.serverAddr.IdentityKey, + IdentityKey: h.server.addr.IdentityKey, Address: towerTCPAddr, } - h.serverAddr = towerAddr + h.server.addr = towerAddr // Add the new tower address to the client. err = h.client.AddTower(towerAddr) @@ -1706,7 +1750,7 @@ var clientTests = []clientTest{ require.NoError(h.t, err) // Restart the server. - h.startServer() + h.server.start() // Now attempt to back up the rest of the updates. h.backupStates(chanID, numUpdates/2, maxUpdates, nil) @@ -1735,7 +1779,7 @@ var clientTests = []clientTest{ // the client should be able to remove the server. err := wait.NoError(func() error { return h.client.RemoveTower( - h.serverAddr.IdentityKey, nil, + h.server.addr.IdentityKey, nil, ) }, waitTime) require.NoError(h.t, err) @@ -1744,12 +1788,12 @@ var clientTests = []clientTest{ // when the client calls it. This will force the client // to remain in the state where it has locked the // address of the server. - h.server, err = wtserver.New(h.serverCfg) + h.server.server, err = wtserver.New(h.server.cfg) require.NoError(h.t, err) cancel := make(chan struct{}) h.net.registerConnCallback( - h.serverAddr, func(peer wtserver.Peer) { + h.server.addr, func(peer wtserver.Peer) { select { case <-h.quit: case <-cancel: @@ -1764,20 +1808,20 @@ var clientTests = []clientTest{ require.NoError(h.t, err) towerAddr := &lnwire.NetAddress{ - IdentityKey: h.serverAddr.IdentityKey, + IdentityKey: h.server.addr.IdentityKey, Address: towerTCPAddr, } // Register the new address in the mock-net. h.net.registerConnCallback( - towerAddr, h.server.InboundPeerConnected, + towerAddr, h.server.server.InboundPeerConnected, ) // Now start the server. - require.NoError(h.t, h.server.Start()) + require.NoError(h.t, h.server.server.Start()) // Re-add the server to the client - err = h.client.AddTower(h.serverAddr) + err = h.client.AddTower(h.server.addr) require.NoError(h.t, err) // Also add the new tower address. @@ -1790,8 +1834,8 @@ var clientTests = []clientTest{ // negotiation. err = wait.Predicate(func() bool { err = h.client.RemoveTower( - h.serverAddr.IdentityKey, - h.serverAddr.Address, + h.server.addr.IdentityKey, + h.server.addr.Address, ) return errors.Is(err, wtclient.ErrAddrInUse) }, waitTime) @@ -1801,7 +1845,7 @@ var clientTests = []clientTest{ // it is not being used for session negotiation. err = wait.NoError(func() error { return h.client.RemoveTower( - h.serverAddr.IdentityKey, towerTCPAddr, + h.server.addr.IdentityKey, towerTCPAddr, ) }, waitTime) require.NoError(h.t, err) @@ -1813,7 +1857,7 @@ var clientTests = []clientTest{ // address. err = wait.NoError(func() error { return h.client.RemoveTower( - h.serverAddr.IdentityKey, nil, + h.server.addr.IdentityKey, nil, ) }, waitTime) require.NoError(h.t, err) @@ -1970,7 +2014,7 @@ var clientTests = []clientTest{ // Assert that the server is also aware of all of these // sessions. for sid := range closableSess { - _, err := h.serverDB.GetSessionInfo(&sid) + _, err := h.server.db.GetSessionInfo(&sid) require.NoError(h.t, err) } @@ -2004,7 +2048,7 @@ var clientTests = []clientTest{ return false } - _, err := h.serverDB.GetSessionInfo( + _, err := h.server.db.GetSessionInfo( &sid, ) if !errors.Is( @@ -2107,7 +2151,7 @@ var clientTests = []clientTest{ // Restart the Client (force quit). And also now start // the server. h.client.ForceQuit() - h.startServer() + h.server.start() h.startClient() // Back up a few more states. From d979f593319e5b92696fcef6b87638dfb908aca3 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 23 Mar 2023 09:39:04 +0200 Subject: [PATCH 02/13] wtclient: move server specific methods to serverHarness --- watchtower/wtclient/client_test.go | 246 +++++++++++++++-------------- 1 file changed, 126 insertions(+), 120 deletions(-) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 771f60a8d..1507029e2 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -729,92 +729,6 @@ func (h *testHarness) recvPayments(id, from, to uint64, return hints } -// waitServerUpdates blocks until the breach hints provided all appear in the -// watchtower's database or the timeout expires. This is used to test that the -// client in fact sends the updates to the server, even if it is offline. -func (h *testHarness) waitServerUpdates(hints []blob.BreachHint, - timeout time.Duration) { - - h.t.Helper() - - // If no breach hints are provided, we will wait out the full timeout to - // assert that no updates appear. - wantUpdates := len(hints) > 0 - - hintSet := make(map[blob.BreachHint]struct{}) - for _, hint := range hints { - hintSet[hint] = struct{}{} - } - - require.Lenf(h.t, hints, len(hintSet), "breach hints are not unique, "+ - "list-len: %d set-len: %d", len(hints), len(hintSet)) - - // Closure to assert the server's matches are consistent with the hint - // set. - serverHasHints := func(matches []wtdb.Match) bool { - if len(hintSet) != len(matches) { - return false - } - - for _, match := range matches { - _, ok := hintSet[match.Hint] - require.Truef(h.t, ok, "match %v in db is not in "+ - "hint set", match.Hint) - } - - return true - } - - failTimeout := time.After(timeout) - for { - select { - case <-time.After(time.Second): - matches, err := h.server.db.QueryMatches(hints) - require.NoError(h.t, err, "unable to query for hints") - - if wantUpdates && serverHasHints(matches) { - return - } - - if wantUpdates { - h.t.Logf("Received %d/%d\n", len(matches), - len(hints)) - } - - case <-failTimeout: - matches, err := h.server.db.QueryMatches(hints) - require.NoError(h.t, err, "unable to query for hints") - require.Truef(h.t, serverHasHints(matches), "breach "+ - "hints not received, only got %d/%d", - len(matches), len(hints)) - return - } - } -} - -// assertUpdatesForPolicy queries the server db for matches using the provided -// breach hints, then asserts that each match has a session with the expected -// policy. -func (h *testHarness) assertUpdatesForPolicy(hints []blob.BreachHint, - expPolicy wtpolicy.Policy) { - - // Query for matches on the provided hints. - matches, err := h.server.db.QueryMatches(hints) - require.NoError(h.t, err) - - // Assert that the number of matches is exactly the number of provided - // hints. - require.Lenf(h.t, matches, len(hints), "expected: %d matches, got: %d", - len(hints), len(matches)) - - // Assert that all the matches correspond to a session with the - // expected policy. - for _, match := range matches { - matchPolicy := match.SessionInfo.Policy - require.Equal(h.t, expPolicy, matchPolicy) - } -} - // addTower adds a tower found at `addr` to the client. func (h *testHarness) addTower(addr *lnwire.NetAddress) { h.t.Helper() @@ -1038,6 +952,92 @@ func (s *serverHarness) restart(op func(cfg *wtserver.Config)) { op(s.cfg) } +// waitForUpdates blocks until the breach hints provided all appear in the +// watchtower's database or the timeout expires. This is used to test that the +// client in fact sends the updates to the server, even if it is offline. +func (s *serverHarness) waitForUpdates(hints []blob.BreachHint, + timeout time.Duration) { + + s.t.Helper() + + // If no breach hints are provided, we will wait out the full timeout to + // assert that no updates appear. + wantUpdates := len(hints) > 0 + + hintSet := make(map[blob.BreachHint]struct{}) + for _, hint := range hints { + hintSet[hint] = struct{}{} + } + + require.Lenf(s.t, hints, len(hintSet), "breach hints are not unique, "+ + "list-len: %d set-len: %d", len(hints), len(hintSet)) + + // Closure to assert the server's matches are consistent with the hint + // set. + serverHasHints := func(matches []wtdb.Match) bool { + if len(hintSet) != len(matches) { + return false + } + + for _, match := range matches { + _, ok := hintSet[match.Hint] + require.Truef(s.t, ok, "match %v in db is not in "+ + "hint set", match.Hint) + } + + return true + } + + failTimeout := time.After(timeout) + for { + select { + case <-time.After(time.Second): + matches, err := s.db.QueryMatches(hints) + require.NoError(s.t, err, "unable to query for hints") + + if wantUpdates && serverHasHints(matches) { + return + } + + if wantUpdates { + s.t.Logf("Received %d/%d\n", len(matches), + len(hints)) + } + + case <-failTimeout: + matches, err := s.db.QueryMatches(hints) + require.NoError(s.t, err, "unable to query for hints") + require.Truef(s.t, serverHasHints(matches), "breach "+ + "hints not received, only got %d/%d", + len(matches), len(hints)) + return + } + } +} + +// assertUpdatesForPolicy queries the server db for matches using the provided +// breach hints, then asserts that each match has a session with the expected +// policy. +func (s *serverHarness) assertUpdatesForPolicy(hints []blob.BreachHint, + expPolicy wtpolicy.Policy) { + + // Query for matches on the provided hints. + matches, err := s.db.QueryMatches(hints) + require.NoError(s.t, err) + + // Assert that the number of matches is exactly the number of provided + // hints. + require.Lenf(s.t, matches, len(hints), "expected: %d matches, got: %d", + len(hints), len(matches)) + + // Assert that all the matches correspond to a session with the + // expected policy. + for _, match := range matches { + matchPolicy := match.SessionInfo.Policy + require.Equal(s.t, expPolicy, matchPolicy) + } +} + const ( localBalance = lnwire.MilliSatoshi(100000000) remoteBalance = lnwire.MilliSatoshi(200000000) @@ -1139,7 +1139,7 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, time.Second) + h.server.waitForUpdates(hints, time.Second) }, }, { @@ -1167,7 +1167,7 @@ var clientTests = []clientTest{ // Ensure that no updates are received by the server, // since they should all be marked as ineligible. - h.waitServerUpdates(nil, time.Second) + h.server.waitForUpdates(nil, time.Second) }, }, { @@ -1199,7 +1199,7 @@ var clientTests = []clientTest{ // Wait for both to be reflected in the server's // database. - h.waitServerUpdates(hints[:numSent], time.Second) + h.server.waitForUpdates(hints[:numSent], time.Second) // Now, restart the server and prevent it from acking // state updates. @@ -1233,7 +1233,7 @@ var clientTests = []clientTest{ // Wait for the committed update to be accepted by the // tower. - h.waitServerUpdates(hints[:numSent], time.Second) + h.server.waitForUpdates(hints[:numSent], time.Second) // Finally, send the rest of the updates and wait for // the tower to receive the remaining states. @@ -1241,7 +1241,7 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, time.Second) + h.server.waitForUpdates(hints, time.Second) }, }, @@ -1290,7 +1290,7 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, waitTime) + h.server.waitForUpdates(hints, waitTime) }, }, { @@ -1341,7 +1341,7 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, 3*time.Second) + h.server.waitForUpdates(hints, 3*time.Second) }, }, { @@ -1383,7 +1383,7 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, 10*time.Second) + h.server.waitForUpdates(hints, 10*time.Second) }, }, { @@ -1411,7 +1411,7 @@ var clientTests = []clientTest{ // Since the client is unable to create a session, the // server should have no updates. - h.waitServerUpdates(nil, time.Second) + h.server.waitForUpdates(nil, time.Second) // Force quit the client since it has queued backups. h.client.ForceQuit() @@ -1429,11 +1429,13 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, waitTime) + h.server.waitForUpdates(hints, waitTime) // Assert that the server has updates for the clients // most recent policy. - h.assertUpdatesForPolicy(hints, h.clientCfg.Policy) + h.server.assertUpdatesForPolicy( + hints, h.clientCfg.Policy, + ) }, }, { @@ -1461,7 +1463,7 @@ var clientTests = []clientTest{ // Since the client is unable to create a session, the // server should have no updates. - h.waitServerUpdates(nil, time.Second) + h.server.waitForUpdates(nil, time.Second) // Force quit the client since it has queued backups. h.client.ForceQuit() @@ -1480,11 +1482,13 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, waitTime) + h.server.waitForUpdates(hints, waitTime) // Assert that the server has updates for the clients // most recent policy. - h.assertUpdatesForPolicy(hints, h.clientCfg.Policy) + h.server.assertUpdatesForPolicy( + hints, h.clientCfg.Policy, + ) }, }, { @@ -1515,7 +1519,9 @@ var clientTests = []clientTest{ h.backupStates(chanID, 0, numUpdates/2, nil) // Wait for the server to collect the first half. - h.waitServerUpdates(hints[:numUpdates/2], time.Second) + h.server.waitForUpdates( + hints[:numUpdates/2], time.Second, + ) // Stop the client, which should have no more backups. require.NoError(h.t, h.client.Stop()) @@ -1537,11 +1543,11 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, waitTime) + h.server.waitForUpdates(hints, waitTime) // Assert that the server has updates for the client's // original policy. - h.assertUpdatesForPolicy(hints, expPolicy) + h.server.assertUpdatesForPolicy(hints, expPolicy) }, }, { @@ -1575,7 +1581,7 @@ var clientTests = []clientTest{ // Wait for the first half of the updates to be // populated in the server's database. - h.waitServerUpdates(hints[:len(hints)/2], waitTime) + h.server.waitForUpdates(hints[:len(hints)/2], waitTime) // Restart the client, so we can ensure the deduping is // maintained across restarts. @@ -1588,7 +1594,7 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.waitServerUpdates(hints, waitTime) + h.server.waitForUpdates(hints, waitTime) }, }, { @@ -1613,7 +1619,7 @@ var clientTests = []clientTest{ // first two. hints := h.advanceChannelN(chanID, numUpdates) h.backupStates(chanID, 0, numUpdates/2, nil) - h.waitServerUpdates(hints[:numUpdates/2], waitTime) + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) // Fully remove the tower, causing its existing sessions // to be marked inactive. @@ -1622,7 +1628,7 @@ var clientTests = []clientTest{ // Back up the remaining states. Since the tower has // been removed, it shouldn't receive any updates. h.backupStates(chanID, numUpdates/2, numUpdates, nil) - h.waitServerUpdates(nil, time.Second) + h.server.waitForUpdates(nil, time.Second) // Re-add the tower. We prevent the tower from acking // session creation to ensure the inactive sessions are @@ -1632,7 +1638,7 @@ var clientTests = []clientTest{ }) h.addTower(h.server.addr) - h.waitServerUpdates(nil, time.Second) + h.server.waitForUpdates(nil, time.Second) // Finally, allow the tower to ack session creation, // allowing the state updates to be sent through the new @@ -1641,7 +1647,7 @@ var clientTests = []clientTest{ cfg.NoAckCreateSession = false }) - h.waitServerUpdates(hints[numUpdates/2:], waitTime) + h.server.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, { @@ -1669,7 +1675,7 @@ var clientTests = []clientTest{ // Back up 4 of the 5 states for the negotiated session. h.backupStates(chanID, 0, maxUpdates-1, nil) - h.waitServerUpdates(hints[:maxUpdates-1], waitTime) + h.server.waitForUpdates(hints[:maxUpdates-1], waitTime) // Now, restart the tower and prevent it from acking any // new sessions. We do this here as once the last slot @@ -1684,7 +1690,7 @@ var clientTests = []clientTest{ // the final state. We'll only wait for the first five // states to arrive at the tower. h.backupStates(chanID, maxUpdates-1, numUpdates, nil) - h.waitServerUpdates(hints[:maxUpdates], waitTime) + h.server.waitForUpdates(hints[:maxUpdates], waitTime) // Finally, stop the client which will continue to // attempt session negotiation since it has one more @@ -1721,7 +1727,7 @@ var clientTests = []clientTest{ // Wait for the first half of the updates to be // populated in the server's database. - h.waitServerUpdates(hints[:len(hints)/2], waitTime) + h.server.waitForUpdates(hints[:len(hints)/2], waitTime) // Stop the server. h.server.stop() @@ -1756,7 +1762,7 @@ var clientTests = []clientTest{ h.backupStates(chanID, numUpdates/2, maxUpdates, nil) // Assert that the server does receive the updates. - h.waitServerUpdates(hints[:maxUpdates], waitTime) + h.server.waitForUpdates(hints[:maxUpdates], waitTime) }, }, { @@ -1890,7 +1896,7 @@ var clientTests = []clientTest{ // considered closable when channel 0 is closed. hints := h.advanceChannelN(0, numUpdates) h.backupStates(0, 0, numUpdates, nil) - h.waitServerUpdates(hints, waitTime) + h.server.waitForUpdates(hints, waitTime) // We expect only 1 session to have updates for this // channel. @@ -1930,7 +1936,7 @@ var clientTests = []clientTest{ hints = h.advanceChannelN(1, numUpdates) h.backupStates(1, 0, numUpdates, nil) - h.waitServerUpdates(hints, waitTime) + h.server.waitForUpdates(hints, waitTime) // Determine the ID of the session of interest. sessionIDs = h.relevantSessions(1) @@ -1963,7 +1969,7 @@ var clientTests = []clientTest{ // Fill up only half of the session updates. hints = h.advanceChannelN(2, numUpdates) h.backupStates(2, 0, numUpdates/2, nil) - h.waitServerUpdates(hints[:numUpdates/2], waitTime) + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) // Determine the ID of the session of interest. sessionIDs = h.relevantSessions(2) @@ -1987,7 +1993,7 @@ var clientTests = []clientTest{ hints = h.advanceChannelN(3, numUpdates) h.backupStates(3, 0, numUpdates, nil) - h.waitServerUpdates(hints, waitTime) + h.server.waitForUpdates(hints, waitTime) // Close it. h.closeChannel(3, 1) @@ -2093,7 +2099,7 @@ var clientTests = []clientTest{ // Wait for the updates to be populated in the server's // database. - h.waitServerUpdates(hints[:numUpdates/2], waitTime) + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) // Now stop the client and reset its database. require.NoError(h.t, h.client.Stop()) @@ -2113,7 +2119,7 @@ var clientTests = []clientTest{ h.backupStates(chanID, numUpdates/2, numUpdates, nil) // Show that the server does get the remaining updates. - h.waitServerUpdates(hints[numUpdates/2:], waitTime) + h.server.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, { @@ -2158,7 +2164,7 @@ var clientTests = []clientTest{ h.backupStates(chanID, numUpdates/2, numUpdates, nil) // Assert that the server does receive ALL the updates. - h.waitServerUpdates(hints[0:numUpdates], waitTime) + h.server.waitForUpdates(hints[0:numUpdates], waitTime) }, }, } From d8af5fcc9261c5127b45dbe4e549e26ffd1a1747 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 18 Jul 2023 14:21:24 +0200 Subject: [PATCH 03/13] wtclient: ensure that `waitForUpdates` has no race condition on timeout Enforce that the fail timeout in `waitForUpdates` is greater than the ping timeout. --- watchtower/wtclient/client_test.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 1507029e2..5e76475e1 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -988,6 +988,9 @@ func (s *serverHarness) waitForUpdates(hints []blob.BreachHint, return true } + require.Truef(s.t, timeout.Seconds() > 1, "timeout must be set to "+ + "greater than 1 second") + failTimeout := time.After(timeout) for { select { @@ -1139,7 +1142,7 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.server.waitForUpdates(hints, time.Second) + h.server.waitForUpdates(hints, waitTime) }, }, { @@ -1167,7 +1170,7 @@ var clientTests = []clientTest{ // Ensure that no updates are received by the server, // since they should all be marked as ineligible. - h.server.waitForUpdates(nil, time.Second) + h.server.waitForUpdates(nil, waitTime) }, }, { @@ -1199,7 +1202,7 @@ var clientTests = []clientTest{ // Wait for both to be reflected in the server's // database. - h.server.waitForUpdates(hints[:numSent], time.Second) + h.server.waitForUpdates(hints[:numSent], waitTime) // Now, restart the server and prevent it from acking // state updates. @@ -1233,7 +1236,7 @@ var clientTests = []clientTest{ // Wait for the committed update to be accepted by the // tower. - h.server.waitForUpdates(hints[:numSent], time.Second) + h.server.waitForUpdates(hints[:numSent], waitTime) // Finally, send the rest of the updates and wait for // the tower to receive the remaining states. @@ -1241,7 +1244,7 @@ var clientTests = []clientTest{ // Wait for all the updates to be populated in the // server's database. - h.server.waitForUpdates(hints, time.Second) + h.server.waitForUpdates(hints, waitTime) }, }, @@ -1411,7 +1414,7 @@ var clientTests = []clientTest{ // Since the client is unable to create a session, the // server should have no updates. - h.server.waitForUpdates(nil, time.Second) + h.server.waitForUpdates(nil, waitTime) // Force quit the client since it has queued backups. h.client.ForceQuit() @@ -1463,7 +1466,7 @@ var clientTests = []clientTest{ // Since the client is unable to create a session, the // server should have no updates. - h.server.waitForUpdates(nil, time.Second) + h.server.waitForUpdates(nil, waitTime) // Force quit the client since it has queued backups. h.client.ForceQuit() @@ -1519,9 +1522,7 @@ var clientTests = []clientTest{ h.backupStates(chanID, 0, numUpdates/2, nil) // Wait for the server to collect the first half. - h.server.waitForUpdates( - hints[:numUpdates/2], time.Second, - ) + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) // Stop the client, which should have no more backups. require.NoError(h.t, h.client.Stop()) @@ -1628,7 +1629,7 @@ var clientTests = []clientTest{ // Back up the remaining states. Since the tower has // been removed, it shouldn't receive any updates. h.backupStates(chanID, numUpdates/2, numUpdates, nil) - h.server.waitForUpdates(nil, time.Second) + h.server.waitForUpdates(nil, waitTime) // Re-add the tower. We prevent the tower from acking // session creation to ensure the inactive sessions are @@ -1638,7 +1639,7 @@ var clientTests = []clientTest{ }) h.addTower(h.server.addr) - h.server.waitForUpdates(nil, time.Second) + h.server.waitForUpdates(nil, waitTime) // Finally, allow the tower to ack session creation, // allowing the state updates to be sent through the new From feb35e65d82e3da3845e8139ab9b976fb5ce7f8a Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 23 Mar 2023 09:39:52 +0200 Subject: [PATCH 04/13] wtclient: show that a client can switch towers This commit adds a new watchtower client test to demonstrate that a client is able to successfully switch to a new tower and continue backing up updates to that new tower. --- watchtower/wtclient/client_test.go | 45 ++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 5e76475e1..8f07a4707 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -2168,6 +2168,51 @@ var clientTests = []clientTest{ h.server.waitForUpdates(hints[0:numUpdates], waitTime) }, }, + { + // Assert that the client is able to switch to a new tower if + // the primary one goes down. + name: "switch to new tower", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: defaultTxPolicy, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Generate numUpdates retributions and back a few of + // them up to the main tower. + hints := h.advanceChannelN(chanID, numUpdates) + h.backupStates(chanID, 0, numUpdates/2, nil) + + // Wait for all the backed up updates to be populated in + // the server's database. + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) + + // Now we add a new tower. + server2 := newServerHarness( + h.t, h.net, towerAddr2Str, nil, + ) + server2.start() + h.addTower(server2.addr) + + // Stop the old tower and remove it from the client. + h.server.stop() + h.removeTower(h.server.addr.IdentityKey, nil) + + // Back up the remaining states. + h.backupStates(chanID, numUpdates/2, numUpdates, nil) + + // Assert that the new tower has the remaining states. + server2.waitForUpdates(hints[numUpdates/2:], waitTime) + }, + }, } // TestClient executes the client test suite, asserting the ability to backup From c4fec3ebc95201f30c8e67195dda7ed28dc74088 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 3 Feb 2023 12:24:40 +0200 Subject: [PATCH 05/13] wtclient: show that bound tasks are not replayed This commit demonstrates a bug. It shows that if backup tasks have been bound to a session with a tower (ie, the tasks are in the session's pendingQueue) and then the tower is removed and a new one is added, then the tasks from the pendingQueue are _not_ replayed to the session with the new tower. Instead, they are silently lost. This will be fixed in an upcoming commit. --- watchtower/wtclient/client_test.go | 92 ++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 8f07a4707..ad5682fee 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -952,6 +952,27 @@ func (s *serverHarness) restart(op func(cfg *wtserver.Config)) { op(s.cfg) } +// assertUpdatesNotFound asserts that a set of hints are not found in the +// server's DB. +func (s *serverHarness) assertUpdatesNotFound(hints []blob.BreachHint) { + s.t.Helper() + + hintSet := make(map[blob.BreachHint]struct{}) + for _, hint := range hints { + hintSet[hint] = struct{}{} + } + + time.Sleep(time.Second) + + matches, err := s.db.QueryMatches(hints) + require.NoError(s.t, err, "unable to query for hints") + + for _, match := range matches { + _, ok := hintSet[match.Hint] + require.False(s.t, ok, "breach hint was found in server DB") + } +} + // waitForUpdates blocks until the breach hints provided all appear in the // watchtower's database or the timeout expires. This is used to test that the // client in fact sends the updates to the server, even if it is offline. @@ -2213,6 +2234,77 @@ var clientTests = []clientTest{ server2.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, + { + // Show that if a client switches to a new tower _after_ backup + // tasks have been bound to the session with the first old tower + // then these updates are _not_ replayed onto the new tower. + // This is a bug that will be fixed in a future commit. + name: "switch to new tower after tasks are bound", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: defaultTxPolicy, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Generate numUpdates retributions and back a few of + // them up to the main tower. + hints := h.advanceChannelN(chanID, numUpdates) + h.backupStates(chanID, 0, numUpdates/2, nil) + + // Wait for all these updates to be populated in the + // server's database. + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) + + // Now stop the server. + h.server.stop() + + // Back up a few more tasks. This will bind the + // backup tasks to the session with the old server. + h.backupStates(chanID, numUpdates/2, numUpdates-1, nil) + + // Now we add a new tower. + server2 := newServerHarness( + h.t, h.net, towerAddr2Str, nil, + ) + server2.start() + h.addTower(server2.addr) + + // Now we can remove the old one. + err := wait.Predicate(func() bool { + err := h.client.RemoveTower( + h.server.addr.IdentityKey, nil, + ) + + return err == nil + }, waitTime) + require.NoError(h.t, err) + + // Back up the final task. + h.backupStates(chanID, numUpdates-1, numUpdates, nil) + + // Show that only the latest backup is backed up to the + // server and that the ones backed up while no tower was + // online were _not_ backed up to either server. This is + // a bug that will be fixed in a future commit. + server2.waitForUpdates( + hints[numUpdates-1:], time.Second, + ) + server2.assertUpdatesNotFound( + hints[numUpdates/2 : numUpdates-1], + ) + h.server.assertUpdatesNotFound( + hints[numUpdates/2 : numUpdates-1], + ) + }, + }, } // TestClient executes the client test suite, asserting the ability to backup From 25c4d3f1f76bad48fd83082a9b346a8671d90e32 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 23 Mar 2023 09:45:23 +0200 Subject: [PATCH 06/13] wtclient: make sessionQueueSet thread safe In preparation for an upcoming commit where multiple threads will have access to the TowerClient sessionQueueSet, we turn it into a thread safe struct. --- watchtower/wtclient/client.go | 16 ++++----- watchtower/wtclient/session_queue.go | 54 +++++++++++++++++++++++++--- 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 85d7e19f2..98e930e4f 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -306,7 +306,7 @@ type TowerClient struct { negotiator SessionNegotiator candidateTowers TowerCandidateIterator candidateSessions map[wtdb.SessionID]*ClientSession - activeSessions sessionQueueSet + activeSessions *sessionQueueSet sessionQueue *sessionQueue prevTask *wtdb.BackupID @@ -378,7 +378,7 @@ func New(config *Config) (*TowerClient, error) { log: plog, pipeline: queue, chanCommitHeights: make(map[lnwire.ChannelID]uint64), - activeSessions: make(sessionQueueSet), + activeSessions: newSessionQueueSet(), summaries: chanSummaries, closableSessionQueue: newSessionCloseMinHeap(), statTicker: time.NewTicker(DefaultStatInterval), @@ -1609,7 +1609,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession, func (c *TowerClient) getOrInitActiveQueue(s *ClientSession, updates []wtdb.CommittedUpdate) *sessionQueue { - if sq, ok := c.activeSessions[s.ID]; ok { + if sq, ok := c.activeSessions.Get(s.ID); ok { return sq } @@ -1628,12 +1628,10 @@ func (c *TowerClient) initActiveQueue(s *ClientSession, sq := c.newSessionQueue(s, updates) // Add the session queue as an active session so that we remember to - // stop it on shutdown. - c.activeSessions.Add(sq) - - // Start the queue so that it can be active in processing newly assigned - // tasks or to upload previously committed updates. - sq.Start() + // stop it on shutdown. This method will also start the queue so that it + // can be active in processing newly assigned tasks or to upload + // previously committed updates. + c.activeSessions.AddAndStart(sq) return sq } diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index aa06709ee..d33f94f5b 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -682,19 +682,63 @@ func (q *sessionQueue) signalUntilShutdown() { // sessionQueueSet maintains a mapping of SessionIDs to their corresponding // sessionQueue. -type sessionQueueSet map[wtdb.SessionID]*sessionQueue +type sessionQueueSet struct { + queues map[wtdb.SessionID]*sessionQueue + mu sync.Mutex +} -// Add inserts a sessionQueue into the sessionQueueSet. -func (s *sessionQueueSet) Add(sessionQueue *sessionQueue) { - (*s)[*sessionQueue.ID()] = sessionQueue +// newSessionQueueSet constructs a new sessionQueueSet. +func newSessionQueueSet() *sessionQueueSet { + return &sessionQueueSet{ + queues: make(map[wtdb.SessionID]*sessionQueue), + } +} + +// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it. +func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) { + s.mu.Lock() + defer s.mu.Unlock() + + s.queues[*sessionQueue.ID()] = sessionQueue + + sessionQueue.Start() +} + +// StopAndRemove stops the given session queue and removes it from the +// sessionQueueSet. +func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) { + s.mu.Lock() + defer s.mu.Unlock() + + queue, ok := s.queues[id] + if !ok { + return + } + + queue.Stop() + + delete(s.queues, id) +} + +// Get fetches and returns the sessionQueue with the given ID. +func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) { + s.mu.Lock() + defer s.mu.Unlock() + + q, ok := s.queues[id] + + return q, ok } // ApplyAndWait executes the nil-adic function returned from getApply for each // sessionQueue in the set in parallel, then waits for all of them to finish // before returning to the caller. func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) { + s.mu.Lock() + defer s.mu.Unlock() + var wg sync.WaitGroup - for _, sessionq := range *s { + for _, sessionq := range s.queues { wg.Add(1) go func(sq *sessionQueue) { defer wg.Done() From 449d6b5500f7b98130901fc5dc051e03d0f89093 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 24 May 2023 16:02:58 +0200 Subject: [PATCH 07/13] wtclient: fix handleStaleTower comment --- watchtower/wtclient/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 98e930e4f..b22f1aa64 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -1739,10 +1739,10 @@ func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey, } } -// handleNewTower handles a request for an existing tower to be removed. If none -// of the tower's sessions have pending updates, then they will become inactive -// and removed as candidates. If the active session queue corresponds to any of -// these sessions, a new one will be negotiated. +// handleStaleTower handles a request for an existing tower to be removed. If +// none of the tower's sessions have pending updates, then they will become +// inactive and removed as candidates. If the active session queue corresponds +// to any of these sessions, a new one will be negotiated. func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { // We'll load the tower before potentially removing it in order to // retrieve its ID within the database. From 552ef4bf8189a246eed28806407633c8400f3723 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 25 May 2023 10:48:10 +0200 Subject: [PATCH 08/13] wtclient: replay pending tasks on sessionQueue stop This commit does a few things: - First, it gives the sessionQueue access to the TowerClient task pipeline so that it can replay backup tasks onto the pipeline on Stop. - Given that the above is done, the ForceQuit functionality of the sessionQueue and TowerClient can be removed. - The bug demonstrated in a prior commit is now fixed due to the above changes. --- htlcswitch/interfaces.go | 5 +- server.go | 2 - watchtower/wtclient/client.go | 184 +++++++++---------------- watchtower/wtclient/client_test.go | 118 +++------------- watchtower/wtclient/session_queue.go | 195 ++++++++++++++++----------- 5 files changed, 200 insertions(+), 304 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 32d80ac11..8f418edfe 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -252,9 +252,8 @@ type TowerClient interface { // BackupState initiates a request to back up a particular revoked // state. If the method returns nil, the backup is guaranteed to be - // successful unless the tower is unavailable and client is force quit, - // or the justice transaction would create dust outputs when trying to - // abide by the negotiated policy. + // successful unless the justice transaction would create dust outputs + // when trying to abide by the negotiated policy. BackupState(chanID *lnwire.ChannelID, stateNum uint64) error } diff --git a/server.go b/server.go index e28d8a757..5b421ae2f 100644 --- a/server.go +++ b/server.go @@ -1569,7 +1569,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr, ChainHash: *s.cfg.ActiveNetParams.GenesisHash, MinBackoff: 10 * time.Second, MaxBackoff: 5 * time.Minute, - ForceQuitDelay: wtclient.DefaultForceQuitDelay, MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue, }) if err != nil { @@ -1603,7 +1602,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr, ChainHash: *s.cfg.ActiveNetParams.GenesisHash, MinBackoff: 10 * time.Second, MaxBackoff: 5 * time.Minute, - ForceQuitDelay: wtclient.DefaultForceQuitDelay, MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue, }) if err != nil { diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index b22f1aa64..716db74aa 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -42,11 +42,6 @@ const ( // metrics about the client's operation. DefaultStatInterval = time.Minute - // DefaultForceQuitDelay specifies the default duration after which the - // client should abandon any pending updates or session negotiations - // before terminating. - DefaultForceQuitDelay = 10 * time.Second - // DefaultSessionCloseRange is the range over which we will generate a // random number of blocks to delay closing a session after its last // channel has been closed. @@ -138,9 +133,8 @@ type Client interface { // BackupState initiates a request to back up a particular revoked // state. If the method returns nil, the backup is guaranteed to be - // successful unless the client is force quit, or the justice - // transaction would create dust outputs when trying to abide by the - // negotiated policy. + // successful unless the justice transaction would create dust outputs + // when trying to abide by the negotiated policy. BackupState(chanID *lnwire.ChannelID, stateNum uint64) error // Start initializes the watchtower client, allowing it process requests @@ -151,10 +145,6 @@ type Client interface { // so, it will attempt to flush the pipeline and deliver any queued // states to the tower before exiting. Stop() error - - // ForceQuit will forcibly shutdown the watchtower client. Calling this - // may lead to queued states being dropped. - ForceQuit() } // Config provides the TowerClient with access to the resources it requires to @@ -213,13 +203,6 @@ type Config struct { // the tower must be watching to monitor for breaches. ChainHash chainhash.Hash - // ForceQuitDelay is the duration after attempting to shutdown that the - // client will automatically abort any pending backups if an unclean - // shutdown is detected. If the value is less than or equal to zero, a - // call to Stop may block indefinitely. The client can always be - // ForceQuit externally irrespective of the chosen parameter. - ForceQuitDelay time.Duration - // ReadTimeout is the duration we will wait during a read before // breaking out of a blocking read. If the value is less than or equal // to zero, the default will be used instead. @@ -295,7 +278,6 @@ type staleTowerMsg struct { type TowerClient struct { started sync.Once stopped sync.Once - forced sync.Once cfg *Config @@ -323,9 +305,8 @@ type TowerClient struct { newTowers chan *newTowerMsg staleTowers chan *staleTowerMsg - wg sync.WaitGroup - quit chan struct{} - forceQuit chan struct{} + wg sync.WaitGroup + quit chan struct{} } // Compile-time constraint to ensure *TowerClient implements the Client @@ -385,7 +366,6 @@ func New(config *Config) (*TowerClient, error) { stats: new(ClientStats), newTowers: make(chan *newTowerMsg), staleTowers: make(chan *staleTowerMsg), - forceQuit: make(chan struct{}), quit: make(chan struct{}), } @@ -697,58 +677,44 @@ func (c *TowerClient) Stop() error { c.stopped.Do(func() { c.log.Debugf("Stopping watchtower client") - // 1. To ensure we don't hang forever on shutdown due to - // unintended failures, we'll delay a call to force quit the - // pipeline if a ForceQuitDelay is specified. This will have no - // effect if the pipeline shuts down cleanly before the delay - // fires. - // - // For full safety, this can be set to 0 and wait out - // indefinitely. However for mobile clients which may have a - // limited amount of time to exit before the background process - // is killed, this offers a way to ensure the process - // terminates. - if c.cfg.ForceQuitDelay > 0 { - time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit) - } - - // 2. Shutdown the backup queue, which will prevent any further - // updates from being accepted. In practice, the links should be - // shutdown before the client has been stopped, so all updates - // would have been added prior. - err := c.pipeline.Stop() + // 1. Stop the session negotiator. + err := c.negotiator.Stop() if err != nil { returnErr = err } - // 3. Once the backup queue has shutdown, wait for the main - // dispatcher to exit. The backup queue will signal it's - // completion to the dispatcher, which releases the wait group - // after all tasks have been assigned to session queues. + // 2. Stop the backup dispatcher and any other goroutines. close(c.quit) c.wg.Wait() - // 4. Since all valid tasks have been assigned to session - // queues, we no longer need to negotiate sessions. - err = c.negotiator.Stop() - if err != nil { - returnErr = err + // 3. If there was a left over 'prevTask' from the backup + // dispatcher, replay that onto the pipeline. + if c.prevTask != nil { + err = c.pipeline.QueueBackupID(c.prevTask) + if err != nil { + returnErr = err + } } - c.log.Debugf("Waiting for active session queues to finish "+ - "draining, stats: %s", c.stats) - - // 5. Shutdown all active session queues in parallel. These will - // exit once all updates have been acked by the watchtower. + // 4. Shutdown all active session queues in parallel. These will + // exit once all unhandled updates have been replayed to the + // task pipeline. c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { - return s.Stop + return func() { + err := s.Stop() + if err != nil { + c.log.Errorf("could not stop session "+ + "queue: %s: %v", s.ID(), err) + + returnErr = err + } + } }) - // Skip log if force quitting. - select { - case <-c.forceQuit: - return - default: + // 5. Shutdown the backup queue, which will prevent any further + // updates from being accepted. + if err = c.pipeline.Stop(); err != nil { + returnErr = err } c.log.Debugf("Client successfully stopped, stats: %s", c.stats) @@ -757,43 +723,6 @@ func (c *TowerClient) Stop() error { return returnErr } -// ForceQuit idempotently initiates an unclean shutdown of the watchtower -// client. This should only be executed if Stop is unable to exit cleanly. -func (c *TowerClient) ForceQuit() { - c.forced.Do(func() { - c.log.Infof("Force quitting watchtower client") - - // 1. Shutdown the backup queue, which will prevent any further - // updates from being accepted. In practice, the links should be - // shutdown before the client has been stopped, so all updates - // would have been added prior. - err := c.pipeline.Stop() - if err != nil { - c.log.Errorf("could not stop backup queue: %v", err) - } - - // 2. Once the backup queue has shutdown, wait for the main - // dispatcher to exit. The backup queue will signal it's - // completion to the dispatcher, which releases the wait group - // after all tasks have been assigned to session queues. - close(c.forceQuit) - c.wg.Wait() - - // 3. Since all valid tasks have been assigned to session - // queues, we no longer need to negotiate sessions. - c.negotiator.Stop() - - // 4. Force quit all active session queues in parallel. These - // will exit once all updates have been acked by the watchtower. - c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { - return s.ForceQuit - }) - - c.log.Infof("Watchtower client unclean shutdown complete, "+ - "stats: %s", c.stats) - }) -} - // RegisterChannel persistently initializes any channel-dependent parameters // within the client. This should be called during link startup to ensure that // the client is able to support the link during operation. @@ -832,7 +761,6 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error { // BackupState initiates a request to back up a particular revoked state. If the // method returns nil, the backup is guaranteed to be successful unless the: -// - client is force quit, // - justice transaction would create dust outputs when trying to abide by the // negotiated policy, or // - breached outputs contain too little value to sweep at the target sweep @@ -955,9 +883,6 @@ func (c *TowerClient) handleChannelCloses(chanSub subscribe.Subscription) { err) } - case <-c.forceQuit: - return - case <-c.quit: return } @@ -1085,9 +1010,6 @@ func (c *TowerClient) handleClosableSessions( } } - case <-c.forceQuit: - return - case <-c.quit: return } @@ -1246,8 +1168,7 @@ func (c *TowerClient) deleteSessionFromTower(sess *wtdb.ClientSession) error { // backupDispatcher processes events coming from the taskPipeline and is // responsible for detecting when the client needs to renegotiate a session to -// fulfill continuing demand. The event loop exits after all tasks have been -// received from the upstream taskPipeline, or the taskPipeline is force quit. +// fulfill continuing demand. The event loop exits if the TowerClient is quit. // // NOTE: This method MUST be run as a goroutine. func (c *TowerClient) backupDispatcher() { @@ -1297,7 +1218,7 @@ func (c *TowerClient) backupDispatcher() { case msg := <-c.staleTowers: msg.errChan <- c.handleStaleTower(msg) - case <-c.forceQuit: + case <-c.quit: return } @@ -1381,6 +1302,9 @@ func (c *TowerClient) backupDispatcher() { // of its corresponding candidate sessions as inactive. case msg := <-c.staleTowers: msg.errChan <- c.handleStaleTower(msg) + + case <-c.quit: + return } } } @@ -1422,7 +1346,7 @@ func (c *TowerClient) processTask(task *wtdb.BackupID) { // sessionQueue will be removed if accepting the task left the sessionQueue in // an exhausted state. func (c *TowerClient) taskAccepted(task *wtdb.BackupID, - newStatus reserveStatus) { + newStatus sessionQueueStatus) { c.log.Infof("Queued %v successfully for session %v", task, c.sessionQueue.ID()) @@ -1436,11 +1360,11 @@ func (c *TowerClient) taskAccepted(task *wtdb.BackupID, switch newStatus { // The sessionQueue still has capacity after accepting this task. - case reserveAvailable: + case sessionQueueAvailable: // The sessionQueue is full after accepting this task, so we will need // to request a new one before proceeding. - case reserveExhausted: + case sessionQueueExhausted: c.stats.sessionExhausted() c.log.Debugf("Session %s exhausted", c.sessionQueue.ID()) @@ -1456,16 +1380,17 @@ func (c *TowerClient) taskAccepted(task *wtdb.BackupID, // the state the was in *before* the task was rejected. The client's prevTask // will cache the task if the sessionQueue was exhausted beforehand, and nil // the sessionQueue to find a new session. If the sessionQueue was not -// exhausted, the client marks the task as ineligible, as this implies we -// couldn't construct a valid justice transaction given the session's policy. +// exhausted and not shutting down, the client marks the task as ineligible, as +// this implies we couldn't construct a valid justice transaction given the +// session's policy. func (c *TowerClient) taskRejected(task *wtdb.BackupID, - curStatus reserveStatus) { + curStatus sessionQueueStatus) { switch curStatus { // The sessionQueue has available capacity but the task was rejected, // this indicates that the task was ineligible for backup. - case reserveAvailable: + case sessionQueueAvailable: c.stats.taskIneligible() c.log.Infof("Ignoring ineligible %v", task) @@ -1491,7 +1416,7 @@ func (c *TowerClient) taskRejected(task *wtdb.BackupID, // The sessionQueue rejected the task because it is full, we will stash // this task and try to add it to the next available sessionQueue. - case reserveExhausted: + case sessionQueueExhausted: c.stats.sessionExhausted() c.log.Debugf("Session %v exhausted, %v queued for next session", @@ -1501,6 +1426,18 @@ func (c *TowerClient) taskRejected(task *wtdb.BackupID, // once a new session queue is available. c.sessionQueue = nil c.prevTask = task + + // The sessionQueue rejected the task because it is shutting down. We + // will stash this task and try to add it to the next available + // sessionQueue. + case sessionQueueShuttingDown: + c.log.Debugf("Session %v is shutting down, %v queued for "+ + "next session", c.sessionQueue.ID(), task) + + // Cache the task that we pulled off, so that we can process it + // once a new session queue is available. + c.sessionQueue = nil + c.prevTask = task } } @@ -1600,6 +1537,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession, MaxBackoff: c.cfg.MaxBackoff, Log: c.log, BuildBreachRetribution: c.cfg.BuildBreachRetribution, + TaskPipeline: c.pipeline, }, updates) } @@ -1790,6 +1728,14 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { } for sessionID := range sessions { delete(c.candidateSessions, sessionID) + + // Shutdown the session so that any pending updates are + // replayed back onto the main task pipeline. + err = c.activeSessions.StopAndRemove(sessionID) + if err != nil { + c.log.Errorf("could not stop session %s: %w", sessionID, + err) + } } // If our active session queue corresponds to the stale tower, we'll diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index ad5682fee..7166381af 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -488,7 +488,6 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { WriteTimeout: timeout, MinBackoff: time.Millisecond, MaxBackoff: time.Second, - ForceQuitDelay: 10 * time.Second, SessionCloseRange: 1, MaxTasksInMemQueue: 2, } @@ -508,7 +507,9 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { } h.startClient() - t.Cleanup(h.client.ForceQuit) + t.Cleanup(func() { + require.NoError(t, h.client.Stop()) + }) h.makeChannel(0, h.cfg.localBalance, h.cfg.remoteBalance) if !cfg.noRegisterChan0 { @@ -952,27 +953,6 @@ func (s *serverHarness) restart(op func(cfg *wtserver.Config)) { op(s.cfg) } -// assertUpdatesNotFound asserts that a set of hints are not found in the -// server's DB. -func (s *serverHarness) assertUpdatesNotFound(hints []blob.BreachHint) { - s.t.Helper() - - hintSet := make(map[blob.BreachHint]struct{}) - for _, hint := range hints { - hintSet[hint] = struct{}{} - } - - time.Sleep(time.Second) - - matches, err := s.db.QueryMatches(hints) - require.NoError(s.t, err, "unable to query for hints") - - for _, match := range matches { - _, ok := hintSet[match.Hint] - require.False(s.t, ok, "breach hint was found in server DB") - } -} - // waitForUpdates blocks until the breach hints provided all appear in the // watchtower's database or the timeout expires. This is used to test that the // client in fact sends the updates to the server, even if it is offline. @@ -1238,12 +1218,9 @@ var clientTests = []clientTest{ h.backupState(chanID, numSent, nil) numSent++ - // Force quit the client to abort the state updates it - // has queued. The sleep ensures that the session queues - // have enough time to commit the state updates before - // the client is killed. - time.Sleep(time.Second) - h.client.ForceQuit() + // Stop the client to abort the state updates it has + // queued. + require.NoError(h.t, h.client.Stop()) // Restart the server and allow it to ack the updates // after the client retransmits the unacked update. @@ -1437,8 +1414,8 @@ var clientTests = []clientTest{ // server should have no updates. h.server.waitForUpdates(nil, waitTime) - // Force quit the client since it has queued backups. - h.client.ForceQuit() + // Stop the client since it has queued backups. + require.NoError(h.t, h.client.Stop()) // Restart the server and allow it to ack session // creation. @@ -1489,8 +1466,8 @@ var clientTests = []clientTest{ // server should have no updates. h.server.waitForUpdates(nil, waitTime) - // Force quit the client since it has queued backups. - h.client.ForceQuit() + // Stop the client since it has queued backups. + require.NoError(h.t, h.client.Stop()) // Restart the server and allow it to ack session // creation. @@ -1672,56 +1649,6 @@ var clientTests = []clientTest{ h.server.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, - { - // Asserts that the client's force quite delay will properly - // shutdown the client if it is unable to completely drain the - // task pipeline. - name: "force unclean shutdown", - cfg: harnessCfg{ - localBalance: localBalance, - remoteBalance: remoteBalance, - policy: wtpolicy.Policy{ - TxPolicy: defaultTxPolicy, - MaxUpdates: 5, - }, - }, - fn: func(h *testHarness) { - const ( - chanID = 0 - numUpdates = 6 - maxUpdates = 5 - ) - - // Advance the channel to create all states. - hints := h.advanceChannelN(chanID, numUpdates) - - // Back up 4 of the 5 states for the negotiated session. - h.backupStates(chanID, 0, maxUpdates-1, nil) - h.server.waitForUpdates(hints[:maxUpdates-1], waitTime) - - // Now, restart the tower and prevent it from acking any - // new sessions. We do this here as once the last slot - // is exhausted the client will attempt to renegotiate. - h.server.restart(func(cfg *wtserver.Config) { - cfg.NoAckCreateSession = true - }) - - // Back up the remaining two states. Once the first is - // processed, the session will be exhausted but the - // client won't be able to renegotiate a session for - // the final state. We'll only wait for the first five - // states to arrive at the tower. - h.backupStates(chanID, maxUpdates-1, numUpdates, nil) - h.server.waitForUpdates(hints[:maxUpdates], waitTime) - - // Finally, stop the client which will continue to - // attempt session negotiation since it has one more - // state to process. After the force quite delay - // expires, the client should force quite itself and - // allow the test to complete. - h.server.stop() - }, - }, { // Assert that if a client changes the address for a server and // then tries to back up updates then the client will switch to @@ -1937,7 +1864,7 @@ var clientTests = []clientTest{ require.False(h.t, h.isSessionClosable(sessionIDs[0])) // Restart the client. - h.client.ForceQuit() + require.NoError(h.t, h.client.Stop()) h.startClient() // The session should now have been marked as closable. @@ -2176,9 +2103,8 @@ var clientTests = []clientTest{ h.backupStates(chanID, 0, numUpdates/2, nil) - // Restart the Client (force quit). And also now start - // the server. - h.client.ForceQuit() + // Restart the Client. And also now start the server. + require.NoError(h.t, h.client.Stop()) h.server.start() h.startClient() @@ -2237,8 +2163,7 @@ var clientTests = []clientTest{ { // Show that if a client switches to a new tower _after_ backup // tasks have been bound to the session with the first old tower - // then these updates are _not_ replayed onto the new tower. - // This is a bug that will be fixed in a future commit. + // then these updates are replayed onto the new tower. name: "switch to new tower after tasks are bound", cfg: harnessCfg{ localBalance: localBalance, @@ -2290,18 +2215,11 @@ var clientTests = []clientTest{ // Back up the final task. h.backupStates(chanID, numUpdates-1, numUpdates, nil) - // Show that only the latest backup is backed up to the - // server and that the ones backed up while no tower was - // online were _not_ backed up to either server. This is - // a bug that will be fixed in a future commit. + // Show that all the backups (the ones added while no + // towers were online and the one added after adding the + // second tower) are backed up to the second tower. server2.waitForUpdates( - hints[numUpdates-1:], time.Second, - ) - server2.assertUpdatesNotFound( - hints[numUpdates/2 : numUpdates-1], - ) - h.server.assertUpdatesNotFound( - hints[numUpdates/2 : numUpdates-1], + hints[numUpdates/2:numUpdates], waitTime, ) }, }, diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index d33f94f5b..1377ceb57 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -16,17 +16,21 @@ import ( "github.com/lightningnetwork/lnd/watchtower/wtwire" ) -// reserveStatus is an enum that signals how full a particular session is. -type reserveStatus uint8 +// sessionQueueStatus is an enum that signals how full a particular session is. +type sessionQueueStatus uint8 const ( - // reserveAvailable indicates that the session has space for at least - // one more backup. - reserveAvailable reserveStatus = iota + // sessionQueueAvailable indicates that the session has space for at + // least one more backup. + sessionQueueAvailable sessionQueueStatus = iota - // reserveExhausted indicates that all slots in the session have been - // allocated. - reserveExhausted + // sessionQueueExhausted indicates that all slots in the session have + // been allocated. + sessionQueueExhausted + + // sessionQueueShuttingDown indicates that the session queue is + // shutting down and so is no longer accepting any more backups. + sessionQueueShuttingDown ) // sessionQueueConfig bundles the resources required by the sessionQueue to @@ -62,6 +66,10 @@ type sessionQueueConfig struct { // certain revoked commitment height. BuildBreachRetribution BreachRetributionBuilder + // TaskPipeline is a pipeline which the sessionQueue should use to send + // any unhandled tasks on shutdown of the queue. + TaskPipeline *DiskOverflowQueue[*wtdb.BackupID] + // DB provides access to the client's stable storage. DB DB @@ -85,10 +93,8 @@ type sessionQueueConfig struct { // sessionQueue implements a reliable queue that will encrypt and send accepted // backups to the watchtower specified in the config's ClientSession. Calling -// Quit will attempt to perform a clean shutdown by receiving an ACK from the -// tower for all pending backups before exiting. The clean shutdown can be -// aborted by using ForceQuit, which will attempt to shut down the queue -// immediately. +// Stop will attempt to perform a clean shutdown replaying any un-committed +// pending updates to the TowerClient's main task pipeline. type sessionQueue struct { started sync.Once stopped sync.Once @@ -109,9 +115,8 @@ type sessionQueue struct { retryBackoff time.Duration - quit chan struct{} - forceQuit chan struct{} - shutdown chan struct{} + quit chan struct{} + wg sync.WaitGroup } // newSessionQueue initializes a fresh sessionQueue. @@ -133,8 +138,6 @@ func newSessionQueue(cfg *sessionQueueConfig, seqNum: cfg.ClientSession.SeqNum, retryBackoff: cfg.MinBackoff, quit: make(chan struct{}), - forceQuit: make(chan struct{}), - shutdown: make(chan struct{}), } sq.queueCond = sync.NewCond(&sq.queueMtx) @@ -151,41 +154,77 @@ func newSessionQueue(cfg *sessionQueueConfig, // backups. func (q *sessionQueue) Start() { q.started.Do(func() { + q.wg.Add(1) go q.sessionManager() }) } // Stop idempotently stops the sessionQueue by initiating a clean shutdown that // will clear all pending tasks in the queue before returning to the caller. -func (q *sessionQueue) Stop() { +func (q *sessionQueue) Stop() error { + var returnErr error q.stopped.Do(func() { q.log.Debugf("SessionQueue(%s) stopping ...", q.ID()) close(q.quit) - q.signalUntilShutdown() - // Skip log if we also force quit. - select { - case <-q.forceQuit: + shutdown := make(chan struct{}) + go func() { + for { + select { + case <-time.After(time.Millisecond): + q.queueCond.Signal() + case <-shutdown: + return + } + } + }() + + q.wg.Wait() + close(shutdown) + + // Now, for any task in the pending queue that we have not yet + // created a CommittedUpdate for, re-add the task to the main + // task pipeline. + updates, err := q.cfg.DB.FetchSessionCommittedUpdates(q.ID()) + if err != nil { + returnErr = err return - default: } + unAckedUpdates := make(map[wtdb.BackupID]bool) + for _, update := range updates { + unAckedUpdates[update.BackupID] = true + } + + // Push any task that was on the pending queue that there is + // not yet a committed update for back to the main task + // pipeline. + q.queueCond.L.Lock() + for q.pendingQueue.Len() > 0 { + next := q.pendingQueue.Front() + q.pendingQueue.Remove(next) + + //nolint:forcetypeassert + task := next.Value.(*backupTask) + + if unAckedUpdates[task.id] { + continue + } + + err := q.cfg.TaskPipeline.QueueBackupID(&task.id) + if err != nil { + log.Errorf("could not re-queue backup task: "+ + "%v", err) + continue + } + } + q.queueCond.L.Unlock() + q.log.Debugf("SessionQueue(%s) stopped", q.ID()) }) -} -// ForceQuit idempotently aborts any clean shutdown in progress and returns to -// he caller after all lingering goroutines have spun down. -func (q *sessionQueue) ForceQuit() { - q.forced.Do(func() { - q.log.Infof("SessionQueue(%s) force quitting...", q.ID()) - - close(q.forceQuit) - q.signalUntilShutdown() - - q.log.Infof("SessionQueue(%s) force quit", q.ID()) - }) + return returnErr } // ID returns the wtdb.SessionID for the queue, which can be used to uniquely @@ -196,10 +235,28 @@ func (q *sessionQueue) ID() *wtdb.SessionID { // AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's // tower. The session will only be accepted if the queue is not already -// exhausted and the task is successfully bound to the ClientSession. -func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { +// exhausted or shutting down and the task is successfully bound to the +// ClientSession. +func (q *sessionQueue) AcceptTask(task *backupTask) (sessionQueueStatus, bool) { + // Exit early if the queue has started shutting down. + select { + case <-q.quit: + return sessionQueueShuttingDown, false + default: + } + q.queueCond.L.Lock() + // There is a chance that sessionQueue started shutting down between + // the last quit channel check and waiting for the lock. So check one + // more time here. + select { + case <-q.quit: + q.queueCond.L.Unlock() + return sessionQueueShuttingDown, false + default: + } + numPending := uint32(q.pendingQueue.Len()) maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+ @@ -207,14 +264,14 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { q.ID(), task.id, q.seqNum, numPending, maxUpdates) // Examine the current reserve status of the session queue. - curStatus := q.reserveStatus() + curStatus := q.status() switch curStatus { // The session queue is exhausted, and cannot accept the task because it // is full. Reject the task such that it can be tried against a // different session. - case reserveExhausted: + case sessionQueueExhausted: q.queueCond.L.Unlock() return curStatus, false @@ -224,7 +281,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { // tried again. // // TODO(conner): queue backups and retry with different session params. - case reserveAvailable: + case sessionQueueAvailable: err := task.bindSession( &q.cfg.ClientSession.ClientSessionBody, q.cfg.BuildBreachRetribution, @@ -244,7 +301,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { // Finally, compute the session's *new* reserve status. This will be // used by the client to determine if it can continue using this session // queue, or if it should negotiate a new one. - newStatus := q.reserveStatus() + newStatus := q.status() q.queueCond.L.Unlock() q.queueCond.Signal() @@ -255,7 +312,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { // sessionManager is the primary event loop for the sessionQueue, and is // responsible for encrypting and sending accepted tasks to the tower. func (q *sessionQueue) sessionManager() { - defer close(q.shutdown) + defer q.wg.Done() for { q.queueCond.L.Lock() @@ -266,12 +323,6 @@ func (q *sessionQueue) sessionManager() { select { case <-q.quit: - if q.commitQueue.Len() == 0 && - q.pendingQueue.Len() == 0 { - q.queueCond.L.Unlock() - return - } - case <-q.forceQuit: q.queueCond.L.Unlock() return default: @@ -279,12 +330,9 @@ func (q *sessionQueue) sessionManager() { } q.queueCond.L.Unlock() - // Exit immediately if a force quit has been requested. If - // either of the queues still has state updates to send to the - // tower, we may never exit in the above case if we are unable - // to reach the tower for some reason. + // Exit immediately if the sessionQueue has been stopped. select { - case <-q.forceQuit: + case <-q.quit: return default: } @@ -333,7 +381,7 @@ func (q *sessionQueue) drainBackups() { q.increaseBackoff() select { case <-time.After(q.retryBackoff): - case <-q.forceQuit: + case <-q.quit: } return } @@ -366,7 +414,7 @@ func (q *sessionQueue) drainBackups() { q.increaseBackoff() select { case <-time.After(q.retryBackoff): - case <-q.forceQuit: + case <-q.quit: } return } @@ -388,7 +436,7 @@ func (q *sessionQueue) drainBackups() { // when we will do so. select { case <-time.After(time.Millisecond): - case <-q.forceQuit: + case <-q.quit: return } } @@ -635,21 +683,21 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer, return nil } -// reserveStatus returns a reserveStatus indicating whether the sessionQueue can -// accept another task. reserveAvailable is returned when a task can be -// accepted, and reserveExhausted is returned if the all slots in the session -// have been allocated. +// status returns a sessionQueueStatus indicating whether the sessionQueue can +// accept another task. sessionQueueAvailable is returned when a task can be +// accepted, and sessionQueueExhausted is returned if the all slots in the +// session have been allocated. // // NOTE: This method MUST be called with queueCond's exclusive lock held. -func (q *sessionQueue) reserveStatus() reserveStatus { +func (q *sessionQueue) status() sessionQueueStatus { numPending := uint32(q.pendingQueue.Len()) maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates) if uint32(q.seqNum)+numPending < maxUpdates { - return reserveAvailable + return sessionQueueAvailable } - return reserveExhausted + return sessionQueueExhausted } @@ -667,19 +715,6 @@ func (q *sessionQueue) increaseBackoff() { } } -// signalUntilShutdown strobes the sessionQueue's condition variable until the -// main event loop exits. -func (q *sessionQueue) signalUntilShutdown() { - for { - select { - case <-time.After(time.Millisecond): - q.queueCond.Signal() - case <-q.shutdown: - return - } - } -} - // sessionQueueSet maintains a mapping of SessionIDs to their corresponding // sessionQueue. type sessionQueueSet struct { @@ -706,18 +741,18 @@ func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) { // StopAndRemove stops the given session queue and removes it from the // sessionQueueSet. -func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) { +func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) error { s.mu.Lock() defer s.mu.Unlock() queue, ok := s.queues[id] if !ok { - return + return nil } - queue.Stop() - delete(s.queues, id) + + return queue.Stop() } // Get fetches and returns the sessionQueue with the given ID. From af1506bff1a4e132b5afe8be57c75d8b19a5b18d Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 3 Feb 2023 12:32:14 +0200 Subject: [PATCH 09/13] wtclient: demo "tower has unacked updates" bug In this commit, we demonstrate the situation where a client has persisted CommittedUpdates but has not yet recieved Acks for them from the tower. If this happens and the client attempts to remove the tower, it will with the "tower has unacked updates" error. --- watchtower/wtclient/client_test.go | 76 ++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 7166381af..8fee27a4f 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -2223,6 +2223,82 @@ var clientTests = []clientTest{ ) }, }, + { + // Assert that a client is unable to remove a tower if there + // are persisted un-acked updates. This is a bug that will be + // fixed in a future commit. + name: "cant remove due to un-acked updates (no client restart)", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: defaultTxPolicy, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Generate numUpdates retributions and back a few of + // them up to the main tower. + hints := h.advanceChannelN(chanID, numUpdates) + h.backupStates(chanID, 0, numUpdates/2, nil) + + // Wait for all these updates to be populated in the + // server's database. + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) + + // Now stop the server and restart it with the + // NoAckUpdates set to true. + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = true + }) + + // Back up the remaining tasks. This will bind the + // backup tasks to the session with the server. The + // client will also persist the updates. + h.backupStates(chanID, numUpdates/2, numUpdates, nil) + + tower, err := h.clientDB.LoadTower( + h.server.addr.IdentityKey, + ) + require.NoError(h.t, err) + + // Wait till the updates have been persisted. + err = wait.Predicate(func() bool { + var numCommittedUpdates int + countUpdates := func(_ *wtdb.ClientSession, + update *wtdb.CommittedUpdate) { + + numCommittedUpdates++ + } + + _, err := h.clientDB.ListClientSessions( + &tower.ID, wtdb.WithPerCommittedUpdate( + countUpdates, + ), + ) + require.NoError(h.t, err) + + return numCommittedUpdates == 1 + + }, waitTime) + require.NoError(h.t, err) + + // Now attempt to remove the tower. This will fail due + // the tower having "un-acked" updates. This is a bug + // that will be fixed in a future commit. + err = h.client.RemoveTower( + h.server.addr.IdentityKey, nil, + ) + require.ErrorContains( + h.t, err, "tower has unacked updates", + ) + }, + }, } // TestClient executes the client test suite, asserting the ability to backup From cfb81c00a6d7331804a82a5d70ce3acc318a4724 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Mon, 13 Feb 2023 13:42:26 +0200 Subject: [PATCH 10/13] wtclient: tower with unacked updates cant be removed after restart This commit demonstrates that if a session has persisted committed updates and the client is restarted _after_ these committed updates have been persisted, then removing the tower will fail. --- watchtower/wtclient/client_test.go | 82 ++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 8fee27a4f..1b54d87cc 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -2299,6 +2299,88 @@ var clientTests = []clientTest{ ) }, }, + { + // Assert that a client is _unable_ to remove a tower if there + // are persisted un-acked updates _and_ the client is restarted + // before the tower is removed. + name: "cant remove due to un-acked updates (with client " + + "restart)", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: defaultTxPolicy, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 5 + chanID = 0 + ) + + // Generate numUpdates retributions. + hints := h.advanceChannelN(chanID, numUpdates) + + // Back half of the states up. + h.backupStates(chanID, 0, numUpdates/2, nil) + + // Wait for the updates to be populated in the server's + // database. + h.server.waitForUpdates(hints[:numUpdates/2], waitTime) + + // Now stop the server and restart it with the + // NoAckUpdates set to true. + h.server.restart(func(cfg *wtserver.Config) { + cfg.NoAckUpdates = true + }) + + // Back up the remaining tasks. This will bind the + // backup tasks to the session with the server. The + // client will also attempt to get the ack for one + // update which will cause a CommittedUpdate to be + // persisted. + h.backupStates(chanID, numUpdates/2, numUpdates, nil) + + tower, err := h.clientDB.LoadTower( + h.server.addr.IdentityKey, + ) + require.NoError(h.t, err) + + // Wait till the updates have been persisted. + err = wait.Predicate(func() bool { + var numCommittedUpdates int + countUpdates := func(_ *wtdb.ClientSession, + update *wtdb.CommittedUpdate) { + + numCommittedUpdates++ + } + + _, err := h.clientDB.ListClientSessions( + &tower.ID, wtdb.WithPerCommittedUpdate( + countUpdates, + ), + ) + require.NoError(h.t, err) + + return numCommittedUpdates == 1 + + }, waitTime) + require.NoError(h.t, err) + + // Now restart the client. This ensures that the + // updates are no longer in the pending queue. + require.NoError(h.t, h.client.Stop()) + h.startClient() + + // Now try removing the tower. This will fail due to + // the persisted CommittedUpdate. + err = h.client.RemoveTower( + h.server.addr.IdentityKey, nil, + ) + require.Error(h.t, err, "tower has unacked updates") + }, + }, } // TestClient executes the client test suite, asserting the ability to backup From c432899bf92a94e0d38d1f40fdd55f1603d832ec Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 23 Mar 2023 10:13:59 +0200 Subject: [PATCH 11/13] watchtower: add DeleteCommittedUpdate DB method Add a new DeleteCommittedUpdate method to the wtdb In preparation for an upcoming commit that will replay committed updates from one session to another. --- watchtower/wtclient/interface.go | 4 ++++ watchtower/wtdb/client_db.go | 36 +++++++++++++++++++++++++++++++ watchtower/wtdb/client_db_test.go | 28 +++++++++++++++++++++++- watchtower/wtmock/client_db.go | 31 ++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 1 deletion(-) diff --git a/watchtower/wtclient/interface.go b/watchtower/wtclient/interface.go index a2a20c987..0f7f1b539 100644 --- a/watchtower/wtclient/interface.go +++ b/watchtower/wtclient/interface.go @@ -135,6 +135,10 @@ type DB interface { // GetDBQueue returns a BackupID Queue instance under the given name // space. GetDBQueue(namespace []byte) wtdb.Queue[*wtdb.BackupID] + + // DeleteCommittedUpdate deletes the committed update belonging to the + // given session and with the given sequence number from the db. + DeleteCommittedUpdate(id *wtdb.SessionID, seqNum uint16) error } // AuthDialer connects to a remote node using an authenticated transport, such diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 9491015d6..41d80587d 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -2073,6 +2073,42 @@ func (c *ClientDB) GetDBQueue(namespace []byte) Queue[*BackupID] { ) } +// DeleteCommittedUpdate deletes the committed update with the given sequence +// number from the given session. +func (c *ClientDB) DeleteCommittedUpdate(id *SessionID, seqNum uint16) error { + return kvdb.Update(c.db, func(tx kvdb.RwTx) error { + sessions := tx.ReadWriteBucket(cSessionBkt) + if sessions == nil { + return ErrUninitializedDB + } + + sessionBkt := sessions.NestedReadWriteBucket(id[:]) + if sessionBkt == nil { + return fmt.Errorf("session bucket %s not found", + id.String()) + } + + // If the commits sub-bucket doesn't exist, there can't possibly + // be a corresponding update to remove. + sessionCommits := sessionBkt.NestedReadWriteBucket( + cSessionCommits, + ) + if sessionCommits == nil { + return ErrCommittedUpdateNotFound + } + + var seqNumBuf [2]byte + byteOrder.PutUint16(seqNumBuf[:], seqNum) + + if sessionCommits.Get(seqNumBuf[:]) == nil { + return ErrCommittedUpdateNotFound + } + + // Remove the corresponding committed update. + return sessionCommits.Delete(seqNumBuf[:]) + }, func() {}) +} + // putChannelToSessionMapping adds the given session ID to a channel's // cChanSessions bucket. func putChannelToSessionMapping(chanDetails kvdb.RwBucket, diff --git a/watchtower/wtdb/client_db_test.go b/watchtower/wtdb/client_db_test.go index 80557f1f8..5bfb4dab5 100644 --- a/watchtower/wtdb/client_db_test.go +++ b/watchtower/wtdb/client_db_test.go @@ -195,6 +195,15 @@ func (h *clientDBHarness) ackUpdate(id *wtdb.SessionID, seqNum uint16, require.ErrorIs(h.t, err, expErr) } +func (h *clientDBHarness) deleteCommittedUpdate(id *wtdb.SessionID, + seqNum uint16, expErr error) { + + h.t.Helper() + + err := h.db.DeleteCommittedUpdate(id, seqNum) + require.ErrorIs(h.t, err, expErr) +} + func (h *clientDBHarness) markChannelClosed(id lnwire.ChannelID, blockHeight uint32, expErr error) []wtdb.SessionID { @@ -567,7 +576,8 @@ func testChanSummaries(h *clientDBHarness) { h.registerChan(chanID, expPkScript, wtdb.ErrChannelAlreadyRegistered) } -// testCommitUpdate tests the behavior of CommitUpdate, ensuring that they can +// testCommitUpdate tests the behavior of CommitUpdate and +// DeleteCommittedUpdate. func testCommitUpdate(h *clientDBHarness) { const blobType = blob.TypeAltruistCommit @@ -648,6 +658,22 @@ func testCommitUpdate(h *clientDBHarness) { *update1, *update2, }, nil) + + // We will now also test that the DeleteCommittedUpdates method also + // works. + // First, try to delete a committed update that does not exist. + h.deleteCommittedUpdate( + &session.ID, update4.SeqNum, wtdb.ErrCommittedUpdateNotFound, + ) + + // Now delete an existing committed update and ensure that it succeeds. + h.deleteCommittedUpdate(&session.ID, update1.SeqNum, nil) + h.assertUpdates(session.ID, []wtdb.CommittedUpdate{ + *update2, + }, nil) + + h.deleteCommittedUpdate(&session.ID, update2.SeqNum, nil) + h.assertUpdates(session.ID, []wtdb.CommittedUpdate{}, nil) } // testMarkChannelClosed asserts the behaviour of MarkChannelClosed. diff --git a/watchtower/wtmock/client_db.go b/watchtower/wtmock/client_db.go index 60838ab42..f5625d35b 100644 --- a/watchtower/wtmock/client_db.go +++ b/watchtower/wtmock/client_db.go @@ -586,6 +586,37 @@ func (m *ClientDB) GetDBQueue(namespace []byte) wtdb.Queue[*wtdb.BackupID] { return q } +// DeleteCommittedUpdate deletes the committed update with the given sequence +// number from the given session. +func (m *ClientDB) DeleteCommittedUpdate(id *wtdb.SessionID, + seqNum uint16) error { + + m.mu.Lock() + defer m.mu.Unlock() + + // Fail if session doesn't exist. + session, ok := m.activeSessions[*id] + if !ok { + return wtdb.ErrClientSessionNotFound + } + + // Retrieve the committed update, failing if none is found. + updates := m.committedUpdates[session.ID] + for i, update := range updates { + if update.SeqNum != seqNum { + continue + } + + // Remove the committed update from "disk". + updates = append(updates[:i], updates[i+1:]...) + m.committedUpdates[session.ID] = updates + + return nil + } + + return wtdb.ErrCommittedUpdateNotFound +} + // ListClosableSessions fetches and returns the IDs for all sessions marked as // closable. func (m *ClientDB) ListClosableSessions() (map[wtdb.SessionID]uint32, error) { From 3ea67983b5eb6e6ae65f268b5a3b7aced455b9dd Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Mon, 13 Feb 2023 15:49:28 +0200 Subject: [PATCH 12/13] wtclient: replay un-acked updates onto pipeline In this commit, the bugs demonstrated in prior commits are fixed. In the case where an session has persisted a CommittedUpdate and the tower is being removed, the session will now replay that update on to the main task pipeline so that it can be backed up using a different session. --- watchtower/wtclient/client.go | 73 +++++++++++++++++----------- watchtower/wtclient/client_test.go | 49 +++++++++++++------ watchtower/wtclient/session_queue.go | 29 ++++++++++- 3 files changed, 105 insertions(+), 46 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 716db74aa..f3b8d3307 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -701,7 +701,7 @@ func (c *TowerClient) Stop() error { // task pipeline. c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { return func() { - err := s.Stop() + err := s.Stop(false) if err != nil { c.log.Errorf("could not stop session "+ "queue: %s: %v", s.ID(), err) @@ -1689,37 +1689,20 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { return err } - // We'll first update our in-memory state followed by our persisted - // state, with the stale tower. The removal of the tower address from - // the in-memory state will fail if the address is currently being used - // for a session negotiation. - err = c.candidateTowers.RemoveCandidate(dbTower.ID, msg.addr) + // If an address was provided, then we're only meant to remove the + // address from the tower. + if msg.addr != nil { + return c.removeTowerAddr(dbTower, msg.addr) + } + + // Otherwise, the tower should no longer be used for future session + // negotiations and backups. First, we'll update our in-memory state + // with the stale tower. + err = c.candidateTowers.RemoveCandidate(dbTower.ID, nil) if err != nil { return err } - if err := c.cfg.DB.RemoveTower(msg.pubKey, msg.addr); err != nil { - // If the persisted state update fails, re-add the address to - // our in-memory state. - tower, newTowerErr := NewTowerFromDBTower(dbTower) - if newTowerErr != nil { - log.Errorf("could not create new in-memory tower: %v", - newTowerErr) - } else { - c.candidateTowers.AddCandidate(tower) - } - - return err - } - - // If an address was provided, then we're only meant to remove the - // address from the tower, so there's nothing left for us to do. - if msg.addr != nil { - return nil - } - - // Otherwise, the tower should no longer be used for future session - // negotiations and backups. pubKey := msg.pubKey.SerializeCompressed() sessions, err := c.cfg.DB.ListClientSessions(&dbTower.ID) if err != nil { @@ -1748,6 +1731,40 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { } } + // Finally, we will update our persisted state with the stale tower. + return c.cfg.DB.RemoveTower(msg.pubKey, nil) +} + +// removeTowerAddr removes the given address from the tower. +func (c *TowerClient) removeTowerAddr(tower *wtdb.Tower, addr net.Addr) error { + if addr == nil { + return fmt.Errorf("an address must be provided") + } + + // We'll first update our in-memory state followed by our persisted + // state with the stale tower. The removal of the tower address from + // the in-memory state will fail if the address is currently being used + // for a session negotiation. + err := c.candidateTowers.RemoveCandidate(tower.ID, addr) + if err != nil { + return err + } + + err = c.cfg.DB.RemoveTower(tower.IdentityKey, addr) + if err != nil { + // If the persisted state update fails, re-add the address to + // our in-memory state. + tower, newTowerErr := NewTowerFromDBTower(tower) + if newTowerErr != nil { + log.Errorf("could not create new in-memory tower: %v", + newTowerErr) + } else { + c.candidateTowers.AddCandidate(tower) + } + + return err + } + return nil } diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 1b54d87cc..5cb998c9a 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -2224,10 +2224,11 @@ var clientTests = []clientTest{ }, }, { - // Assert that a client is unable to remove a tower if there - // are persisted un-acked updates. This is a bug that will be - // fixed in a future commit. - name: "cant remove due to un-acked updates (no client restart)", + // Assert that a client is able to remove a tower if there are + // persisted un-acked updates. This tests the case where the + // client is not-restarted meaning that the un-acked updates + // will still be in the pending queue. + name: "can remove due to un-acked updates (no client restart)", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, @@ -2288,23 +2289,29 @@ var clientTests = []clientTest{ }, waitTime) require.NoError(h.t, err) - // Now attempt to remove the tower. This will fail due - // the tower having "un-acked" updates. This is a bug - // that will be fixed in a future commit. + // Now remove the tower. err = h.client.RemoveTower( h.server.addr.IdentityKey, nil, ) - require.ErrorContains( - h.t, err, "tower has unacked updates", + require.NoError(h.t, err) + + // Add a new tower. + server2 := newServerHarness( + h.t, h.net, towerAddr2Str, nil, ) + server2.start() + h.addTower(server2.addr) + + // Now we assert that the backups are backed up to the + // new tower. + server2.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, { - // Assert that a client is _unable_ to remove a tower if there - // are persisted un-acked updates _and_ the client is restarted + // Assert that a client is able to remove a tower if there are + // persisted un-acked updates _and_ the client is restarted // before the tower is removed. - name: "cant remove due to un-acked updates (with client " + - "restart)", + name: "can remove tower with un-acked updates (with restart)", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, @@ -2373,12 +2380,22 @@ var clientTests = []clientTest{ require.NoError(h.t, h.client.Stop()) h.startClient() - // Now try removing the tower. This will fail due to - // the persisted CommittedUpdate. + // Now remove the tower. err = h.client.RemoveTower( h.server.addr.IdentityKey, nil, ) - require.Error(h.t, err, "tower has unacked updates") + require.NoError(h.t, err) + + // Add a new tower. + server2 := newServerHarness( + h.t, h.net, towerAddr2Str, nil, + ) + server2.start() + h.addTower(server2.addr) + + // Now we assert that the backups are backed up to the + // new tower. + server2.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, } diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index 1377ceb57..27c36c6fe 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -161,7 +161,10 @@ func (q *sessionQueue) Start() { // Stop idempotently stops the sessionQueue by initiating a clean shutdown that // will clear all pending tasks in the queue before returning to the caller. -func (q *sessionQueue) Stop() error { +// The final param should only be set to true if this is the last time that +// this session will be used. Otherwise, during normal shutdown, the final param +// should be false. +func (q *sessionQueue) Stop(final bool) error { var returnErr error q.stopped.Do(func() { q.log.Debugf("SessionQueue(%s) stopping ...", q.ID()) @@ -195,6 +198,28 @@ func (q *sessionQueue) Stop() error { unAckedUpdates := make(map[wtdb.BackupID]bool) for _, update := range updates { unAckedUpdates[update.BackupID] = true + + if !final { + continue + } + + err := q.cfg.TaskPipeline.QueueBackupID( + &update.BackupID, + ) + if err != nil { + log.Errorf("could not re-queue %s: %v", + update.BackupID, err) + continue + } + + err = q.cfg.DB.DeleteCommittedUpdate( + q.ID(), update.SeqNum, + ) + if err != nil { + log.Errorf("could not delete committed "+ + "update %d for session %s", + update.SeqNum, q.ID()) + } } // Push any task that was on the pending queue that there is @@ -752,7 +777,7 @@ func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) error { delete(s.queues, id) - return queue.Stop() + return queue.Stop(true) } // Get fetches and returns the sessionQueue with the given ID. From 7bf07d8875df6e0c98912fe4e34429149f5fc1a7 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 23 Mar 2023 10:16:00 +0200 Subject: [PATCH 13/13] docs: add release note for 6895 --- docs/release-notes/release-notes-0.17.0.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/release-notes/release-notes-0.17.0.md b/docs/release-notes/release-notes-0.17.0.md index 81d46839b..ff4711e7a 100644 --- a/docs/release-notes/release-notes-0.17.0.md +++ b/docs/release-notes/release-notes-0.17.0.md @@ -18,6 +18,8 @@ * [Replace in-mem task pipeline with a disk-overflow queue](https://github.com/lightningnetwork/lnd/pull/7380) +* [Replay pending and un-acked updates onto the main task pipeline if a tower + is being removed](https://github.com/lightningnetwork/lnd/pull/6895) * [Add defaults](https://github.com/lightningnetwork/lnd/pull/7771) to the wtclient and watchtower config structs and use these to populate the defaults