wtclient: move server specific methods to serverHarness

This commit is contained in:
Elle Mouton
2023-03-23 09:39:04 +02:00
parent 2ed5788de3
commit d979f59331

View File

@@ -729,92 +729,6 @@ func (h *testHarness) recvPayments(id, from, to uint64,
return hints
}
// waitServerUpdates blocks until the breach hints provided all appear in the
// watchtower's database or the timeout expires. This is used to test that the
// client in fact sends the updates to the server, even if it is offline.
func (h *testHarness) waitServerUpdates(hints []blob.BreachHint,
timeout time.Duration) {
h.t.Helper()
// If no breach hints are provided, we will wait out the full timeout to
// assert that no updates appear.
wantUpdates := len(hints) > 0
hintSet := make(map[blob.BreachHint]struct{})
for _, hint := range hints {
hintSet[hint] = struct{}{}
}
require.Lenf(h.t, hints, len(hintSet), "breach hints are not unique, "+
"list-len: %d set-len: %d", len(hints), len(hintSet))
// Closure to assert the server's matches are consistent with the hint
// set.
serverHasHints := func(matches []wtdb.Match) bool {
if len(hintSet) != len(matches) {
return false
}
for _, match := range matches {
_, ok := hintSet[match.Hint]
require.Truef(h.t, ok, "match %v in db is not in "+
"hint set", match.Hint)
}
return true
}
failTimeout := time.After(timeout)
for {
select {
case <-time.After(time.Second):
matches, err := h.server.db.QueryMatches(hints)
require.NoError(h.t, err, "unable to query for hints")
if wantUpdates && serverHasHints(matches) {
return
}
if wantUpdates {
h.t.Logf("Received %d/%d\n", len(matches),
len(hints))
}
case <-failTimeout:
matches, err := h.server.db.QueryMatches(hints)
require.NoError(h.t, err, "unable to query for hints")
require.Truef(h.t, serverHasHints(matches), "breach "+
"hints not received, only got %d/%d",
len(matches), len(hints))
return
}
}
}
// assertUpdatesForPolicy queries the server db for matches using the provided
// breach hints, then asserts that each match has a session with the expected
// policy.
func (h *testHarness) assertUpdatesForPolicy(hints []blob.BreachHint,
expPolicy wtpolicy.Policy) {
// Query for matches on the provided hints.
matches, err := h.server.db.QueryMatches(hints)
require.NoError(h.t, err)
// Assert that the number of matches is exactly the number of provided
// hints.
require.Lenf(h.t, matches, len(hints), "expected: %d matches, got: %d",
len(hints), len(matches))
// Assert that all the matches correspond to a session with the
// expected policy.
for _, match := range matches {
matchPolicy := match.SessionInfo.Policy
require.Equal(h.t, expPolicy, matchPolicy)
}
}
// addTower adds a tower found at `addr` to the client.
func (h *testHarness) addTower(addr *lnwire.NetAddress) {
h.t.Helper()
@@ -1038,6 +952,92 @@ func (s *serverHarness) restart(op func(cfg *wtserver.Config)) {
op(s.cfg)
}
// waitForUpdates blocks until the breach hints provided all appear in the
// watchtower's database or the timeout expires. This is used to test that the
// client in fact sends the updates to the server, even if it is offline.
func (s *serverHarness) waitForUpdates(hints []blob.BreachHint,
timeout time.Duration) {
s.t.Helper()
// If no breach hints are provided, we will wait out the full timeout to
// assert that no updates appear.
wantUpdates := len(hints) > 0
hintSet := make(map[blob.BreachHint]struct{})
for _, hint := range hints {
hintSet[hint] = struct{}{}
}
require.Lenf(s.t, hints, len(hintSet), "breach hints are not unique, "+
"list-len: %d set-len: %d", len(hints), len(hintSet))
// Closure to assert the server's matches are consistent with the hint
// set.
serverHasHints := func(matches []wtdb.Match) bool {
if len(hintSet) != len(matches) {
return false
}
for _, match := range matches {
_, ok := hintSet[match.Hint]
require.Truef(s.t, ok, "match %v in db is not in "+
"hint set", match.Hint)
}
return true
}
failTimeout := time.After(timeout)
for {
select {
case <-time.After(time.Second):
matches, err := s.db.QueryMatches(hints)
require.NoError(s.t, err, "unable to query for hints")
if wantUpdates && serverHasHints(matches) {
return
}
if wantUpdates {
s.t.Logf("Received %d/%d\n", len(matches),
len(hints))
}
case <-failTimeout:
matches, err := s.db.QueryMatches(hints)
require.NoError(s.t, err, "unable to query for hints")
require.Truef(s.t, serverHasHints(matches), "breach "+
"hints not received, only got %d/%d",
len(matches), len(hints))
return
}
}
}
// assertUpdatesForPolicy queries the server db for matches using the provided
// breach hints, then asserts that each match has a session with the expected
// policy.
func (s *serverHarness) assertUpdatesForPolicy(hints []blob.BreachHint,
expPolicy wtpolicy.Policy) {
// Query for matches on the provided hints.
matches, err := s.db.QueryMatches(hints)
require.NoError(s.t, err)
// Assert that the number of matches is exactly the number of provided
// hints.
require.Lenf(s.t, matches, len(hints), "expected: %d matches, got: %d",
len(hints), len(matches))
// Assert that all the matches correspond to a session with the
// expected policy.
for _, match := range matches {
matchPolicy := match.SessionInfo.Policy
require.Equal(s.t, expPolicy, matchPolicy)
}
}
const (
localBalance = lnwire.MilliSatoshi(100000000)
remoteBalance = lnwire.MilliSatoshi(200000000)
@@ -1139,7 +1139,7 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, time.Second)
h.server.waitForUpdates(hints, time.Second)
},
},
{
@@ -1167,7 +1167,7 @@ var clientTests = []clientTest{
// Ensure that no updates are received by the server,
// since they should all be marked as ineligible.
h.waitServerUpdates(nil, time.Second)
h.server.waitForUpdates(nil, time.Second)
},
},
{
@@ -1199,7 +1199,7 @@ var clientTests = []clientTest{
// Wait for both to be reflected in the server's
// database.
h.waitServerUpdates(hints[:numSent], time.Second)
h.server.waitForUpdates(hints[:numSent], time.Second)
// Now, restart the server and prevent it from acking
// state updates.
@@ -1233,7 +1233,7 @@ var clientTests = []clientTest{
// Wait for the committed update to be accepted by the
// tower.
h.waitServerUpdates(hints[:numSent], time.Second)
h.server.waitForUpdates(hints[:numSent], time.Second)
// Finally, send the rest of the updates and wait for
// the tower to receive the remaining states.
@@ -1241,7 +1241,7 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, time.Second)
h.server.waitForUpdates(hints, time.Second)
},
},
@@ -1290,7 +1290,7 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, waitTime)
h.server.waitForUpdates(hints, waitTime)
},
},
{
@@ -1341,7 +1341,7 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, 3*time.Second)
h.server.waitForUpdates(hints, 3*time.Second)
},
},
{
@@ -1383,7 +1383,7 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, 10*time.Second)
h.server.waitForUpdates(hints, 10*time.Second)
},
},
{
@@ -1411,7 +1411,7 @@ var clientTests = []clientTest{
// Since the client is unable to create a session, the
// server should have no updates.
h.waitServerUpdates(nil, time.Second)
h.server.waitForUpdates(nil, time.Second)
// Force quit the client since it has queued backups.
h.client.ForceQuit()
@@ -1429,11 +1429,13 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, waitTime)
h.server.waitForUpdates(hints, waitTime)
// Assert that the server has updates for the clients
// most recent policy.
h.assertUpdatesForPolicy(hints, h.clientCfg.Policy)
h.server.assertUpdatesForPolicy(
hints, h.clientCfg.Policy,
)
},
},
{
@@ -1461,7 +1463,7 @@ var clientTests = []clientTest{
// Since the client is unable to create a session, the
// server should have no updates.
h.waitServerUpdates(nil, time.Second)
h.server.waitForUpdates(nil, time.Second)
// Force quit the client since it has queued backups.
h.client.ForceQuit()
@@ -1480,11 +1482,13 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, waitTime)
h.server.waitForUpdates(hints, waitTime)
// Assert that the server has updates for the clients
// most recent policy.
h.assertUpdatesForPolicy(hints, h.clientCfg.Policy)
h.server.assertUpdatesForPolicy(
hints, h.clientCfg.Policy,
)
},
},
{
@@ -1515,7 +1519,9 @@ var clientTests = []clientTest{
h.backupStates(chanID, 0, numUpdates/2, nil)
// Wait for the server to collect the first half.
h.waitServerUpdates(hints[:numUpdates/2], time.Second)
h.server.waitForUpdates(
hints[:numUpdates/2], time.Second,
)
// Stop the client, which should have no more backups.
require.NoError(h.t, h.client.Stop())
@@ -1537,11 +1543,11 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, waitTime)
h.server.waitForUpdates(hints, waitTime)
// Assert that the server has updates for the client's
// original policy.
h.assertUpdatesForPolicy(hints, expPolicy)
h.server.assertUpdatesForPolicy(hints, expPolicy)
},
},
{
@@ -1575,7 +1581,7 @@ var clientTests = []clientTest{
// Wait for the first half of the updates to be
// populated in the server's database.
h.waitServerUpdates(hints[:len(hints)/2], waitTime)
h.server.waitForUpdates(hints[:len(hints)/2], waitTime)
// Restart the client, so we can ensure the deduping is
// maintained across restarts.
@@ -1588,7 +1594,7 @@ var clientTests = []clientTest{
// Wait for all the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, waitTime)
h.server.waitForUpdates(hints, waitTime)
},
},
{
@@ -1613,7 +1619,7 @@ var clientTests = []clientTest{
// first two.
hints := h.advanceChannelN(chanID, numUpdates)
h.backupStates(chanID, 0, numUpdates/2, nil)
h.waitServerUpdates(hints[:numUpdates/2], waitTime)
h.server.waitForUpdates(hints[:numUpdates/2], waitTime)
// Fully remove the tower, causing its existing sessions
// to be marked inactive.
@@ -1622,7 +1628,7 @@ var clientTests = []clientTest{
// Back up the remaining states. Since the tower has
// been removed, it shouldn't receive any updates.
h.backupStates(chanID, numUpdates/2, numUpdates, nil)
h.waitServerUpdates(nil, time.Second)
h.server.waitForUpdates(nil, time.Second)
// Re-add the tower. We prevent the tower from acking
// session creation to ensure the inactive sessions are
@@ -1632,7 +1638,7 @@ var clientTests = []clientTest{
})
h.addTower(h.server.addr)
h.waitServerUpdates(nil, time.Second)
h.server.waitForUpdates(nil, time.Second)
// Finally, allow the tower to ack session creation,
// allowing the state updates to be sent through the new
@@ -1641,7 +1647,7 @@ var clientTests = []clientTest{
cfg.NoAckCreateSession = false
})
h.waitServerUpdates(hints[numUpdates/2:], waitTime)
h.server.waitForUpdates(hints[numUpdates/2:], waitTime)
},
},
{
@@ -1669,7 +1675,7 @@ var clientTests = []clientTest{
// Back up 4 of the 5 states for the negotiated session.
h.backupStates(chanID, 0, maxUpdates-1, nil)
h.waitServerUpdates(hints[:maxUpdates-1], waitTime)
h.server.waitForUpdates(hints[:maxUpdates-1], waitTime)
// Now, restart the tower and prevent it from acking any
// new sessions. We do this here as once the last slot
@@ -1684,7 +1690,7 @@ var clientTests = []clientTest{
// the final state. We'll only wait for the first five
// states to arrive at the tower.
h.backupStates(chanID, maxUpdates-1, numUpdates, nil)
h.waitServerUpdates(hints[:maxUpdates], waitTime)
h.server.waitForUpdates(hints[:maxUpdates], waitTime)
// Finally, stop the client which will continue to
// attempt session negotiation since it has one more
@@ -1721,7 +1727,7 @@ var clientTests = []clientTest{
// Wait for the first half of the updates to be
// populated in the server's database.
h.waitServerUpdates(hints[:len(hints)/2], waitTime)
h.server.waitForUpdates(hints[:len(hints)/2], waitTime)
// Stop the server.
h.server.stop()
@@ -1756,7 +1762,7 @@ var clientTests = []clientTest{
h.backupStates(chanID, numUpdates/2, maxUpdates, nil)
// Assert that the server does receive the updates.
h.waitServerUpdates(hints[:maxUpdates], waitTime)
h.server.waitForUpdates(hints[:maxUpdates], waitTime)
},
},
{
@@ -1890,7 +1896,7 @@ var clientTests = []clientTest{
// considered closable when channel 0 is closed.
hints := h.advanceChannelN(0, numUpdates)
h.backupStates(0, 0, numUpdates, nil)
h.waitServerUpdates(hints, waitTime)
h.server.waitForUpdates(hints, waitTime)
// We expect only 1 session to have updates for this
// channel.
@@ -1930,7 +1936,7 @@ var clientTests = []clientTest{
hints = h.advanceChannelN(1, numUpdates)
h.backupStates(1, 0, numUpdates, nil)
h.waitServerUpdates(hints, waitTime)
h.server.waitForUpdates(hints, waitTime)
// Determine the ID of the session of interest.
sessionIDs = h.relevantSessions(1)
@@ -1963,7 +1969,7 @@ var clientTests = []clientTest{
// Fill up only half of the session updates.
hints = h.advanceChannelN(2, numUpdates)
h.backupStates(2, 0, numUpdates/2, nil)
h.waitServerUpdates(hints[:numUpdates/2], waitTime)
h.server.waitForUpdates(hints[:numUpdates/2], waitTime)
// Determine the ID of the session of interest.
sessionIDs = h.relevantSessions(2)
@@ -1987,7 +1993,7 @@ var clientTests = []clientTest{
hints = h.advanceChannelN(3, numUpdates)
h.backupStates(3, 0, numUpdates, nil)
h.waitServerUpdates(hints, waitTime)
h.server.waitForUpdates(hints, waitTime)
// Close it.
h.closeChannel(3, 1)
@@ -2093,7 +2099,7 @@ var clientTests = []clientTest{
// Wait for the updates to be populated in the server's
// database.
h.waitServerUpdates(hints[:numUpdates/2], waitTime)
h.server.waitForUpdates(hints[:numUpdates/2], waitTime)
// Now stop the client and reset its database.
require.NoError(h.t, h.client.Stop())
@@ -2113,7 +2119,7 @@ var clientTests = []clientTest{
h.backupStates(chanID, numUpdates/2, numUpdates, nil)
// Show that the server does get the remaining updates.
h.waitServerUpdates(hints[numUpdates/2:], waitTime)
h.server.waitForUpdates(hints[numUpdates/2:], waitTime)
},
},
{
@@ -2158,7 +2164,7 @@ var clientTests = []clientTest{
h.backupStates(chanID, numUpdates/2, numUpdates, nil)
// Assert that the server does receive ALL the updates.
h.waitServerUpdates(hints[0:numUpdates], waitTime)
h.server.waitForUpdates(hints[0:numUpdates], waitTime)
},
},
}