wtclient: spin session key indices if required

If the tower returns CreateSessionCodeAlreadyExists in response to the
CreateSession message from the client, then skip forward a few key
indices until we find one that the server does not return the error
for. This will allow a client to recover after a data loss incident.
This commit is contained in:
Elle Mouton 2023-03-29 12:40:25 +02:00
parent 24016c35c7
commit ba33f6a697
No known key found for this signature in database
GPG Key ID: D7D916376026F177
3 changed files with 59 additions and 36 deletions

View File

@ -2014,14 +2014,12 @@ var clientTests = []clientTest{
}, },
}, },
{ {
// Demonstrate that the client is unable to upload state updates // Demonstrate that the client is unable to recover after
// to a tower if the client deletes its database after already // deleting its database by skipping through key indices until
// having created and started to use a session with a tower. // it gets to one that does not result in the
// This happens because the session key is generated // CreateSessionCodeAlreadyExists error code being returned from
// deterministically and will only be unique for new sessions // the server.
// if the same DB is used. The server therefore rejects these name: "continue after client database deletion",
// updates with the StateUpdateCodeClientBehind error.
name: "demonstrate the StateUpdateCodeClientBehind error",
cfg: harnessCfg{ cfg: harnessCfg{
localBalance: localBalance, localBalance: localBalance,
remoteBalance: remoteBalance, remoteBalance: remoteBalance,
@ -2063,9 +2061,8 @@ var clientTests = []clientTest{
// Attempt to back up the remaining tasks. // Attempt to back up the remaining tasks.
h.backupStates(chanID, numUpdates/2, numUpdates, nil) h.backupStates(chanID, numUpdates/2, numUpdates, nil)
// Show that the server does not get the remaining // Show that the server does get the remaining updates.
// updates. h.waitServerUpdates(hints[numUpdates/2:], waitTime)
h.waitServerUpdates(nil, waitTime)
}, },
}, },
} }

View File

@ -34,4 +34,9 @@ var (
// revoked state because the channel had not been previously registered // revoked state because the channel had not been previously registered
// with the client. // with the client.
ErrUnregisteredChannel = errors.New("channel is not registered") ErrUnregisteredChannel = errors.New("channel is not registered")
// ErrSessionKeyAlreadyUsed indicates that the client attempted to
// create a new session with a tower with a session key that has already
// been used in the past.
ErrSessionKeyAlreadyUsed = errors.New("session key already used")
) )

View File

@ -1,6 +1,7 @@
package wtclient package wtclient
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -272,6 +273,7 @@ retryWithBackoff:
} }
} }
tryNextCandidate:
for { for {
select { select {
case <-n.quit: case <-n.quit:
@ -302,28 +304,39 @@ retryWithBackoff:
n.log.Debugf("Attempting session negotiation with tower=%x", n.log.Debugf("Attempting session negotiation with tower=%x",
towerPub) towerPub)
// Before proceeding, we will reserve a session key index to use var forceNextKey bool
// with this specific tower. If one is already reserved, the for {
// existing index will be returned. // Before proceeding, we will reserve a session key
// index to use with this specific tower. If one is
// already reserved, the existing index will be
// returned.
keyIndex, err := n.cfg.DB.NextSessionKeyIndex( keyIndex, err := n.cfg.DB.NextSessionKeyIndex(
tower.ID, n.cfg.Policy.BlobType, false, tower.ID, n.cfg.Policy.BlobType, forceNextKey,
) )
if err != nil { if err != nil {
n.log.Debugf("Unable to reserve session key index "+ n.log.Debugf("Unable to reserve session key "+
"for tower=%x: %v", towerPub, err) "index for tower=%x: %v", towerPub, err)
goto tryNextCandidate
}
// We'll now attempt the CreateSession dance with the
// tower to get a new session, trying all addresses if
// necessary.
err = n.createSession(tower, keyIndex)
if err == nil {
return
} else if errors.Is(err, ErrSessionKeyAlreadyUsed) {
forceNextKey = true
continue continue
} }
// We'll now attempt the CreateSession dance with the tower to // An unexpected error occurred, update our backoff.
// get a new session, trying all addresses if necessary.
err = n.createSession(tower, keyIndex)
if err != nil {
// An unexpected error occurred, updpate our backoff.
updateBackoff() updateBackoff()
n.log.Debugf("Session negotiation with tower=%x "+ n.log.Debugf("Session negotiation with tower=%x "+
"failed, trying again -- reason: %v", "failed, trying again -- reason: %v", towerPub,
tower.IdentityKey.SerializeCompressed(), err) err)
goto retryWithBackoff goto retryWithBackoff
} }
@ -360,7 +373,10 @@ func (n *sessionNegotiator) createSession(tower *Tower, keyIndex uint32) error {
err = n.tryAddress(sessionKey, keyIndex, tower, lnAddr) err = n.tryAddress(sessionKey, keyIndex, tower, lnAddr)
tower.Addresses.ReleaseLock(addr) tower.Addresses.ReleaseLock(addr)
switch { switch {
case err == ErrPermanentTowerFailure: case errors.Is(err, ErrSessionKeyAlreadyUsed):
return err
case errors.Is(err, ErrPermanentTowerFailure):
// TODO(conner): report to iterator? can then be reset // TODO(conner): report to iterator? can then be reset
// with restart // with restart
fallthrough fallthrough
@ -454,12 +470,7 @@ func (n *sessionNegotiator) tryAddress(sessionKey keychain.SingleKeyECDH,
} }
switch createSessionReply.Code { switch createSessionReply.Code {
case wtwire.CodeOK, wtwire.CreateSessionCodeAlreadyExists: case wtwire.CodeOK:
// TODO(conner): add last-applied to create session reply to
// handle case where we lose state, session already exists, and
// we want to possibly resume using the session
// TODO(conner): validate reward address // TODO(conner): validate reward address
rewardPkScript := createSessionReply.Data rewardPkScript := createSessionReply.Data
@ -500,6 +511,16 @@ func (n *sessionNegotiator) tryAddress(sessionKey keychain.SingleKeyECDH,
return ErrNegotiatorExiting return ErrNegotiatorExiting
} }
case wtwire.CreateSessionCodeAlreadyExists:
// TODO(conner): use the last-applied in the create session
// reply to handle case where we lose state, session already
// exists, and we want to possibly resume using the session.
// NOTE that this should not be done until the server code
// has been adapted to first check that the CreateSession
// request is for the same blob-type as the initial session.
return ErrSessionKeyAlreadyUsed
// TODO(conner): handle error codes properly // TODO(conner): handle error codes properly
case wtwire.CreateSessionCodeRejectBlobType: case wtwire.CreateSessionCodeRejectBlobType:
return fmt.Errorf("tower rejected blob type: %v", return fmt.Errorf("tower rejected blob type: %v",