mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-07-28 05:42:37 +02:00
wtclient: replay pending tasks on sessionQueue stop
This commit does a few things: - First, it gives the sessionQueue access to the TowerClient task pipeline so that it can replay backup tasks onto the pipeline on Stop. - Given that the above is done, the ForceQuit functionality of the sessionQueue and TowerClient can be removed. - The bug demonstrated in a prior commit is now fixed due to the above changes.
This commit is contained in:
@@ -252,9 +252,8 @@ type TowerClient interface {
|
|||||||
|
|
||||||
// BackupState initiates a request to back up a particular revoked
|
// BackupState initiates a request to back up a particular revoked
|
||||||
// state. If the method returns nil, the backup is guaranteed to be
|
// state. If the method returns nil, the backup is guaranteed to be
|
||||||
// successful unless the tower is unavailable and client is force quit,
|
// successful unless the justice transaction would create dust outputs
|
||||||
// or the justice transaction would create dust outputs when trying to
|
// when trying to abide by the negotiated policy.
|
||||||
// abide by the negotiated policy.
|
|
||||||
BackupState(chanID *lnwire.ChannelID, stateNum uint64) error
|
BackupState(chanID *lnwire.ChannelID, stateNum uint64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1569,7 +1569,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
|||||||
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
|
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
|
||||||
MinBackoff: 10 * time.Second,
|
MinBackoff: 10 * time.Second,
|
||||||
MaxBackoff: 5 * time.Minute,
|
MaxBackoff: 5 * time.Minute,
|
||||||
ForceQuitDelay: wtclient.DefaultForceQuitDelay,
|
|
||||||
MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
|
MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1603,7 +1602,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
|||||||
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
|
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
|
||||||
MinBackoff: 10 * time.Second,
|
MinBackoff: 10 * time.Second,
|
||||||
MaxBackoff: 5 * time.Minute,
|
MaxBackoff: 5 * time.Minute,
|
||||||
ForceQuitDelay: wtclient.DefaultForceQuitDelay,
|
|
||||||
MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
|
MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -42,11 +42,6 @@ const (
|
|||||||
// metrics about the client's operation.
|
// metrics about the client's operation.
|
||||||
DefaultStatInterval = time.Minute
|
DefaultStatInterval = time.Minute
|
||||||
|
|
||||||
// DefaultForceQuitDelay specifies the default duration after which the
|
|
||||||
// client should abandon any pending updates or session negotiations
|
|
||||||
// before terminating.
|
|
||||||
DefaultForceQuitDelay = 10 * time.Second
|
|
||||||
|
|
||||||
// DefaultSessionCloseRange is the range over which we will generate a
|
// DefaultSessionCloseRange is the range over which we will generate a
|
||||||
// random number of blocks to delay closing a session after its last
|
// random number of blocks to delay closing a session after its last
|
||||||
// channel has been closed.
|
// channel has been closed.
|
||||||
@@ -138,9 +133,8 @@ type Client interface {
|
|||||||
|
|
||||||
// BackupState initiates a request to back up a particular revoked
|
// BackupState initiates a request to back up a particular revoked
|
||||||
// state. If the method returns nil, the backup is guaranteed to be
|
// state. If the method returns nil, the backup is guaranteed to be
|
||||||
// successful unless the client is force quit, or the justice
|
// successful unless the justice transaction would create dust outputs
|
||||||
// transaction would create dust outputs when trying to abide by the
|
// when trying to abide by the negotiated policy.
|
||||||
// negotiated policy.
|
|
||||||
BackupState(chanID *lnwire.ChannelID, stateNum uint64) error
|
BackupState(chanID *lnwire.ChannelID, stateNum uint64) error
|
||||||
|
|
||||||
// Start initializes the watchtower client, allowing it process requests
|
// Start initializes the watchtower client, allowing it process requests
|
||||||
@@ -151,10 +145,6 @@ type Client interface {
|
|||||||
// so, it will attempt to flush the pipeline and deliver any queued
|
// so, it will attempt to flush the pipeline and deliver any queued
|
||||||
// states to the tower before exiting.
|
// states to the tower before exiting.
|
||||||
Stop() error
|
Stop() error
|
||||||
|
|
||||||
// ForceQuit will forcibly shutdown the watchtower client. Calling this
|
|
||||||
// may lead to queued states being dropped.
|
|
||||||
ForceQuit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config provides the TowerClient with access to the resources it requires to
|
// Config provides the TowerClient with access to the resources it requires to
|
||||||
@@ -213,13 +203,6 @@ type Config struct {
|
|||||||
// the tower must be watching to monitor for breaches.
|
// the tower must be watching to monitor for breaches.
|
||||||
ChainHash chainhash.Hash
|
ChainHash chainhash.Hash
|
||||||
|
|
||||||
// ForceQuitDelay is the duration after attempting to shutdown that the
|
|
||||||
// client will automatically abort any pending backups if an unclean
|
|
||||||
// shutdown is detected. If the value is less than or equal to zero, a
|
|
||||||
// call to Stop may block indefinitely. The client can always be
|
|
||||||
// ForceQuit externally irrespective of the chosen parameter.
|
|
||||||
ForceQuitDelay time.Duration
|
|
||||||
|
|
||||||
// ReadTimeout is the duration we will wait during a read before
|
// ReadTimeout is the duration we will wait during a read before
|
||||||
// breaking out of a blocking read. If the value is less than or equal
|
// breaking out of a blocking read. If the value is less than or equal
|
||||||
// to zero, the default will be used instead.
|
// to zero, the default will be used instead.
|
||||||
@@ -295,7 +278,6 @@ type staleTowerMsg struct {
|
|||||||
type TowerClient struct {
|
type TowerClient struct {
|
||||||
started sync.Once
|
started sync.Once
|
||||||
stopped sync.Once
|
stopped sync.Once
|
||||||
forced sync.Once
|
|
||||||
|
|
||||||
cfg *Config
|
cfg *Config
|
||||||
|
|
||||||
@@ -323,9 +305,8 @@ type TowerClient struct {
|
|||||||
newTowers chan *newTowerMsg
|
newTowers chan *newTowerMsg
|
||||||
staleTowers chan *staleTowerMsg
|
staleTowers chan *staleTowerMsg
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
forceQuit chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compile-time constraint to ensure *TowerClient implements the Client
|
// Compile-time constraint to ensure *TowerClient implements the Client
|
||||||
@@ -385,7 +366,6 @@ func New(config *Config) (*TowerClient, error) {
|
|||||||
stats: new(ClientStats),
|
stats: new(ClientStats),
|
||||||
newTowers: make(chan *newTowerMsg),
|
newTowers: make(chan *newTowerMsg),
|
||||||
staleTowers: make(chan *staleTowerMsg),
|
staleTowers: make(chan *staleTowerMsg),
|
||||||
forceQuit: make(chan struct{}),
|
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -697,58 +677,44 @@ func (c *TowerClient) Stop() error {
|
|||||||
c.stopped.Do(func() {
|
c.stopped.Do(func() {
|
||||||
c.log.Debugf("Stopping watchtower client")
|
c.log.Debugf("Stopping watchtower client")
|
||||||
|
|
||||||
// 1. To ensure we don't hang forever on shutdown due to
|
// 1. Stop the session negotiator.
|
||||||
// unintended failures, we'll delay a call to force quit the
|
err := c.negotiator.Stop()
|
||||||
// pipeline if a ForceQuitDelay is specified. This will have no
|
|
||||||
// effect if the pipeline shuts down cleanly before the delay
|
|
||||||
// fires.
|
|
||||||
//
|
|
||||||
// For full safety, this can be set to 0 and wait out
|
|
||||||
// indefinitely. However for mobile clients which may have a
|
|
||||||
// limited amount of time to exit before the background process
|
|
||||||
// is killed, this offers a way to ensure the process
|
|
||||||
// terminates.
|
|
||||||
if c.cfg.ForceQuitDelay > 0 {
|
|
||||||
time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Shutdown the backup queue, which will prevent any further
|
|
||||||
// updates from being accepted. In practice, the links should be
|
|
||||||
// shutdown before the client has been stopped, so all updates
|
|
||||||
// would have been added prior.
|
|
||||||
err := c.pipeline.Stop()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
returnErr = err
|
returnErr = err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Once the backup queue has shutdown, wait for the main
|
// 2. Stop the backup dispatcher and any other goroutines.
|
||||||
// dispatcher to exit. The backup queue will signal it's
|
|
||||||
// completion to the dispatcher, which releases the wait group
|
|
||||||
// after all tasks have been assigned to session queues.
|
|
||||||
close(c.quit)
|
close(c.quit)
|
||||||
c.wg.Wait()
|
c.wg.Wait()
|
||||||
|
|
||||||
// 4. Since all valid tasks have been assigned to session
|
// 3. If there was a left over 'prevTask' from the backup
|
||||||
// queues, we no longer need to negotiate sessions.
|
// dispatcher, replay that onto the pipeline.
|
||||||
err = c.negotiator.Stop()
|
if c.prevTask != nil {
|
||||||
if err != nil {
|
err = c.pipeline.QueueBackupID(c.prevTask)
|
||||||
returnErr = err
|
if err != nil {
|
||||||
|
returnErr = err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Debugf("Waiting for active session queues to finish "+
|
// 4. Shutdown all active session queues in parallel. These will
|
||||||
"draining, stats: %s", c.stats)
|
// exit once all unhandled updates have been replayed to the
|
||||||
|
// task pipeline.
|
||||||
// 5. Shutdown all active session queues in parallel. These will
|
|
||||||
// exit once all updates have been acked by the watchtower.
|
|
||||||
c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
|
c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
|
||||||
return s.Stop
|
return func() {
|
||||||
|
err := s.Stop()
|
||||||
|
if err != nil {
|
||||||
|
c.log.Errorf("could not stop session "+
|
||||||
|
"queue: %s: %v", s.ID(), err)
|
||||||
|
|
||||||
|
returnErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Skip log if force quitting.
|
// 5. Shutdown the backup queue, which will prevent any further
|
||||||
select {
|
// updates from being accepted.
|
||||||
case <-c.forceQuit:
|
if err = c.pipeline.Stop(); err != nil {
|
||||||
return
|
returnErr = err
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
|
c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
|
||||||
@@ -757,43 +723,6 @@ func (c *TowerClient) Stop() error {
|
|||||||
return returnErr
|
return returnErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForceQuit idempotently initiates an unclean shutdown of the watchtower
|
|
||||||
// client. This should only be executed if Stop is unable to exit cleanly.
|
|
||||||
func (c *TowerClient) ForceQuit() {
|
|
||||||
c.forced.Do(func() {
|
|
||||||
c.log.Infof("Force quitting watchtower client")
|
|
||||||
|
|
||||||
// 1. Shutdown the backup queue, which will prevent any further
|
|
||||||
// updates from being accepted. In practice, the links should be
|
|
||||||
// shutdown before the client has been stopped, so all updates
|
|
||||||
// would have been added prior.
|
|
||||||
err := c.pipeline.Stop()
|
|
||||||
if err != nil {
|
|
||||||
c.log.Errorf("could not stop backup queue: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Once the backup queue has shutdown, wait for the main
|
|
||||||
// dispatcher to exit. The backup queue will signal it's
|
|
||||||
// completion to the dispatcher, which releases the wait group
|
|
||||||
// after all tasks have been assigned to session queues.
|
|
||||||
close(c.forceQuit)
|
|
||||||
c.wg.Wait()
|
|
||||||
|
|
||||||
// 3. Since all valid tasks have been assigned to session
|
|
||||||
// queues, we no longer need to negotiate sessions.
|
|
||||||
c.negotiator.Stop()
|
|
||||||
|
|
||||||
// 4. Force quit all active session queues in parallel. These
|
|
||||||
// will exit once all updates have been acked by the watchtower.
|
|
||||||
c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
|
|
||||||
return s.ForceQuit
|
|
||||||
})
|
|
||||||
|
|
||||||
c.log.Infof("Watchtower client unclean shutdown complete, "+
|
|
||||||
"stats: %s", c.stats)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterChannel persistently initializes any channel-dependent parameters
|
// RegisterChannel persistently initializes any channel-dependent parameters
|
||||||
// within the client. This should be called during link startup to ensure that
|
// within the client. This should be called during link startup to ensure that
|
||||||
// the client is able to support the link during operation.
|
// the client is able to support the link during operation.
|
||||||
@@ -832,7 +761,6 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {
|
|||||||
|
|
||||||
// BackupState initiates a request to back up a particular revoked state. If the
|
// BackupState initiates a request to back up a particular revoked state. If the
|
||||||
// method returns nil, the backup is guaranteed to be successful unless the:
|
// method returns nil, the backup is guaranteed to be successful unless the:
|
||||||
// - client is force quit,
|
|
||||||
// - justice transaction would create dust outputs when trying to abide by the
|
// - justice transaction would create dust outputs when trying to abide by the
|
||||||
// negotiated policy, or
|
// negotiated policy, or
|
||||||
// - breached outputs contain too little value to sweep at the target sweep
|
// - breached outputs contain too little value to sweep at the target sweep
|
||||||
@@ -955,9 +883,6 @@ func (c *TowerClient) handleChannelCloses(chanSub subscribe.Subscription) {
|
|||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-c.forceQuit:
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-c.quit:
|
case <-c.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -1085,9 +1010,6 @@ func (c *TowerClient) handleClosableSessions(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-c.forceQuit:
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-c.quit:
|
case <-c.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -1246,8 +1168,7 @@ func (c *TowerClient) deleteSessionFromTower(sess *wtdb.ClientSession) error {
|
|||||||
|
|
||||||
// backupDispatcher processes events coming from the taskPipeline and is
|
// backupDispatcher processes events coming from the taskPipeline and is
|
||||||
// responsible for detecting when the client needs to renegotiate a session to
|
// responsible for detecting when the client needs to renegotiate a session to
|
||||||
// fulfill continuing demand. The event loop exits after all tasks have been
|
// fulfill continuing demand. The event loop exits if the TowerClient is quit.
|
||||||
// received from the upstream taskPipeline, or the taskPipeline is force quit.
|
|
||||||
//
|
//
|
||||||
// NOTE: This method MUST be run as a goroutine.
|
// NOTE: This method MUST be run as a goroutine.
|
||||||
func (c *TowerClient) backupDispatcher() {
|
func (c *TowerClient) backupDispatcher() {
|
||||||
@@ -1297,7 +1218,7 @@ func (c *TowerClient) backupDispatcher() {
|
|||||||
case msg := <-c.staleTowers:
|
case msg := <-c.staleTowers:
|
||||||
msg.errChan <- c.handleStaleTower(msg)
|
msg.errChan <- c.handleStaleTower(msg)
|
||||||
|
|
||||||
case <-c.forceQuit:
|
case <-c.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1381,6 +1302,9 @@ func (c *TowerClient) backupDispatcher() {
|
|||||||
// of its corresponding candidate sessions as inactive.
|
// of its corresponding candidate sessions as inactive.
|
||||||
case msg := <-c.staleTowers:
|
case msg := <-c.staleTowers:
|
||||||
msg.errChan <- c.handleStaleTower(msg)
|
msg.errChan <- c.handleStaleTower(msg)
|
||||||
|
|
||||||
|
case <-c.quit:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1422,7 +1346,7 @@ func (c *TowerClient) processTask(task *wtdb.BackupID) {
|
|||||||
// sessionQueue will be removed if accepting the task left the sessionQueue in
|
// sessionQueue will be removed if accepting the task left the sessionQueue in
|
||||||
// an exhausted state.
|
// an exhausted state.
|
||||||
func (c *TowerClient) taskAccepted(task *wtdb.BackupID,
|
func (c *TowerClient) taskAccepted(task *wtdb.BackupID,
|
||||||
newStatus reserveStatus) {
|
newStatus sessionQueueStatus) {
|
||||||
|
|
||||||
c.log.Infof("Queued %v successfully for session %v", task,
|
c.log.Infof("Queued %v successfully for session %v", task,
|
||||||
c.sessionQueue.ID())
|
c.sessionQueue.ID())
|
||||||
@@ -1436,11 +1360,11 @@ func (c *TowerClient) taskAccepted(task *wtdb.BackupID,
|
|||||||
switch newStatus {
|
switch newStatus {
|
||||||
|
|
||||||
// The sessionQueue still has capacity after accepting this task.
|
// The sessionQueue still has capacity after accepting this task.
|
||||||
case reserveAvailable:
|
case sessionQueueAvailable:
|
||||||
|
|
||||||
// The sessionQueue is full after accepting this task, so we will need
|
// The sessionQueue is full after accepting this task, so we will need
|
||||||
// to request a new one before proceeding.
|
// to request a new one before proceeding.
|
||||||
case reserveExhausted:
|
case sessionQueueExhausted:
|
||||||
c.stats.sessionExhausted()
|
c.stats.sessionExhausted()
|
||||||
|
|
||||||
c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
|
c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
|
||||||
@@ -1456,16 +1380,17 @@ func (c *TowerClient) taskAccepted(task *wtdb.BackupID,
|
|||||||
// the state the was in *before* the task was rejected. The client's prevTask
|
// the state the was in *before* the task was rejected. The client's prevTask
|
||||||
// will cache the task if the sessionQueue was exhausted beforehand, and nil
|
// will cache the task if the sessionQueue was exhausted beforehand, and nil
|
||||||
// the sessionQueue to find a new session. If the sessionQueue was not
|
// the sessionQueue to find a new session. If the sessionQueue was not
|
||||||
// exhausted, the client marks the task as ineligible, as this implies we
|
// exhausted and not shutting down, the client marks the task as ineligible, as
|
||||||
// couldn't construct a valid justice transaction given the session's policy.
|
// this implies we couldn't construct a valid justice transaction given the
|
||||||
|
// session's policy.
|
||||||
func (c *TowerClient) taskRejected(task *wtdb.BackupID,
|
func (c *TowerClient) taskRejected(task *wtdb.BackupID,
|
||||||
curStatus reserveStatus) {
|
curStatus sessionQueueStatus) {
|
||||||
|
|
||||||
switch curStatus {
|
switch curStatus {
|
||||||
|
|
||||||
// The sessionQueue has available capacity but the task was rejected,
|
// The sessionQueue has available capacity but the task was rejected,
|
||||||
// this indicates that the task was ineligible for backup.
|
// this indicates that the task was ineligible for backup.
|
||||||
case reserveAvailable:
|
case sessionQueueAvailable:
|
||||||
c.stats.taskIneligible()
|
c.stats.taskIneligible()
|
||||||
|
|
||||||
c.log.Infof("Ignoring ineligible %v", task)
|
c.log.Infof("Ignoring ineligible %v", task)
|
||||||
@@ -1491,7 +1416,7 @@ func (c *TowerClient) taskRejected(task *wtdb.BackupID,
|
|||||||
|
|
||||||
// The sessionQueue rejected the task because it is full, we will stash
|
// The sessionQueue rejected the task because it is full, we will stash
|
||||||
// this task and try to add it to the next available sessionQueue.
|
// this task and try to add it to the next available sessionQueue.
|
||||||
case reserveExhausted:
|
case sessionQueueExhausted:
|
||||||
c.stats.sessionExhausted()
|
c.stats.sessionExhausted()
|
||||||
|
|
||||||
c.log.Debugf("Session %v exhausted, %v queued for next session",
|
c.log.Debugf("Session %v exhausted, %v queued for next session",
|
||||||
@@ -1501,6 +1426,18 @@ func (c *TowerClient) taskRejected(task *wtdb.BackupID,
|
|||||||
// once a new session queue is available.
|
// once a new session queue is available.
|
||||||
c.sessionQueue = nil
|
c.sessionQueue = nil
|
||||||
c.prevTask = task
|
c.prevTask = task
|
||||||
|
|
||||||
|
// The sessionQueue rejected the task because it is shutting down. We
|
||||||
|
// will stash this task and try to add it to the next available
|
||||||
|
// sessionQueue.
|
||||||
|
case sessionQueueShuttingDown:
|
||||||
|
c.log.Debugf("Session %v is shutting down, %v queued for "+
|
||||||
|
"next session", c.sessionQueue.ID(), task)
|
||||||
|
|
||||||
|
// Cache the task that we pulled off, so that we can process it
|
||||||
|
// once a new session queue is available.
|
||||||
|
c.sessionQueue = nil
|
||||||
|
c.prevTask = task
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1600,6 +1537,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession,
|
|||||||
MaxBackoff: c.cfg.MaxBackoff,
|
MaxBackoff: c.cfg.MaxBackoff,
|
||||||
Log: c.log,
|
Log: c.log,
|
||||||
BuildBreachRetribution: c.cfg.BuildBreachRetribution,
|
BuildBreachRetribution: c.cfg.BuildBreachRetribution,
|
||||||
|
TaskPipeline: c.pipeline,
|
||||||
}, updates)
|
}, updates)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1790,6 +1728,14 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error {
|
|||||||
}
|
}
|
||||||
for sessionID := range sessions {
|
for sessionID := range sessions {
|
||||||
delete(c.candidateSessions, sessionID)
|
delete(c.candidateSessions, sessionID)
|
||||||
|
|
||||||
|
// Shutdown the session so that any pending updates are
|
||||||
|
// replayed back onto the main task pipeline.
|
||||||
|
err = c.activeSessions.StopAndRemove(sessionID)
|
||||||
|
if err != nil {
|
||||||
|
c.log.Errorf("could not stop session %s: %w", sessionID,
|
||||||
|
err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If our active session queue corresponds to the stale tower, we'll
|
// If our active session queue corresponds to the stale tower, we'll
|
||||||
|
@@ -488,7 +488,6 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
|
|||||||
WriteTimeout: timeout,
|
WriteTimeout: timeout,
|
||||||
MinBackoff: time.Millisecond,
|
MinBackoff: time.Millisecond,
|
||||||
MaxBackoff: time.Second,
|
MaxBackoff: time.Second,
|
||||||
ForceQuitDelay: 10 * time.Second,
|
|
||||||
SessionCloseRange: 1,
|
SessionCloseRange: 1,
|
||||||
MaxTasksInMemQueue: 2,
|
MaxTasksInMemQueue: 2,
|
||||||
}
|
}
|
||||||
@@ -508,7 +507,9 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
h.startClient()
|
h.startClient()
|
||||||
t.Cleanup(h.client.ForceQuit)
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, h.client.Stop())
|
||||||
|
})
|
||||||
|
|
||||||
h.makeChannel(0, h.cfg.localBalance, h.cfg.remoteBalance)
|
h.makeChannel(0, h.cfg.localBalance, h.cfg.remoteBalance)
|
||||||
if !cfg.noRegisterChan0 {
|
if !cfg.noRegisterChan0 {
|
||||||
@@ -952,27 +953,6 @@ func (s *serverHarness) restart(op func(cfg *wtserver.Config)) {
|
|||||||
op(s.cfg)
|
op(s.cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// assertUpdatesNotFound asserts that a set of hints are not found in the
|
|
||||||
// server's DB.
|
|
||||||
func (s *serverHarness) assertUpdatesNotFound(hints []blob.BreachHint) {
|
|
||||||
s.t.Helper()
|
|
||||||
|
|
||||||
hintSet := make(map[blob.BreachHint]struct{})
|
|
||||||
for _, hint := range hints {
|
|
||||||
hintSet[hint] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
|
|
||||||
matches, err := s.db.QueryMatches(hints)
|
|
||||||
require.NoError(s.t, err, "unable to query for hints")
|
|
||||||
|
|
||||||
for _, match := range matches {
|
|
||||||
_, ok := hintSet[match.Hint]
|
|
||||||
require.False(s.t, ok, "breach hint was found in server DB")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForUpdates blocks until the breach hints provided all appear in the
|
// waitForUpdates blocks until the breach hints provided all appear in the
|
||||||
// watchtower's database or the timeout expires. This is used to test that the
|
// watchtower's database or the timeout expires. This is used to test that the
|
||||||
// client in fact sends the updates to the server, even if it is offline.
|
// client in fact sends the updates to the server, even if it is offline.
|
||||||
@@ -1238,12 +1218,9 @@ var clientTests = []clientTest{
|
|||||||
h.backupState(chanID, numSent, nil)
|
h.backupState(chanID, numSent, nil)
|
||||||
numSent++
|
numSent++
|
||||||
|
|
||||||
// Force quit the client to abort the state updates it
|
// Stop the client to abort the state updates it has
|
||||||
// has queued. The sleep ensures that the session queues
|
// queued.
|
||||||
// have enough time to commit the state updates before
|
require.NoError(h.t, h.client.Stop())
|
||||||
// the client is killed.
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
h.client.ForceQuit()
|
|
||||||
|
|
||||||
// Restart the server and allow it to ack the updates
|
// Restart the server and allow it to ack the updates
|
||||||
// after the client retransmits the unacked update.
|
// after the client retransmits the unacked update.
|
||||||
@@ -1437,8 +1414,8 @@ var clientTests = []clientTest{
|
|||||||
// server should have no updates.
|
// server should have no updates.
|
||||||
h.server.waitForUpdates(nil, waitTime)
|
h.server.waitForUpdates(nil, waitTime)
|
||||||
|
|
||||||
// Force quit the client since it has queued backups.
|
// Stop the client since it has queued backups.
|
||||||
h.client.ForceQuit()
|
require.NoError(h.t, h.client.Stop())
|
||||||
|
|
||||||
// Restart the server and allow it to ack session
|
// Restart the server and allow it to ack session
|
||||||
// creation.
|
// creation.
|
||||||
@@ -1489,8 +1466,8 @@ var clientTests = []clientTest{
|
|||||||
// server should have no updates.
|
// server should have no updates.
|
||||||
h.server.waitForUpdates(nil, waitTime)
|
h.server.waitForUpdates(nil, waitTime)
|
||||||
|
|
||||||
// Force quit the client since it has queued backups.
|
// Stop the client since it has queued backups.
|
||||||
h.client.ForceQuit()
|
require.NoError(h.t, h.client.Stop())
|
||||||
|
|
||||||
// Restart the server and allow it to ack session
|
// Restart the server and allow it to ack session
|
||||||
// creation.
|
// creation.
|
||||||
@@ -1672,56 +1649,6 @@ var clientTests = []clientTest{
|
|||||||
h.server.waitForUpdates(hints[numUpdates/2:], waitTime)
|
h.server.waitForUpdates(hints[numUpdates/2:], waitTime)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
// Asserts that the client's force quite delay will properly
|
|
||||||
// shutdown the client if it is unable to completely drain the
|
|
||||||
// task pipeline.
|
|
||||||
name: "force unclean shutdown",
|
|
||||||
cfg: harnessCfg{
|
|
||||||
localBalance: localBalance,
|
|
||||||
remoteBalance: remoteBalance,
|
|
||||||
policy: wtpolicy.Policy{
|
|
||||||
TxPolicy: defaultTxPolicy,
|
|
||||||
MaxUpdates: 5,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
fn: func(h *testHarness) {
|
|
||||||
const (
|
|
||||||
chanID = 0
|
|
||||||
numUpdates = 6
|
|
||||||
maxUpdates = 5
|
|
||||||
)
|
|
||||||
|
|
||||||
// Advance the channel to create all states.
|
|
||||||
hints := h.advanceChannelN(chanID, numUpdates)
|
|
||||||
|
|
||||||
// Back up 4 of the 5 states for the negotiated session.
|
|
||||||
h.backupStates(chanID, 0, maxUpdates-1, nil)
|
|
||||||
h.server.waitForUpdates(hints[:maxUpdates-1], waitTime)
|
|
||||||
|
|
||||||
// Now, restart the tower and prevent it from acking any
|
|
||||||
// new sessions. We do this here as once the last slot
|
|
||||||
// is exhausted the client will attempt to renegotiate.
|
|
||||||
h.server.restart(func(cfg *wtserver.Config) {
|
|
||||||
cfg.NoAckCreateSession = true
|
|
||||||
})
|
|
||||||
|
|
||||||
// Back up the remaining two states. Once the first is
|
|
||||||
// processed, the session will be exhausted but the
|
|
||||||
// client won't be able to renegotiate a session for
|
|
||||||
// the final state. We'll only wait for the first five
|
|
||||||
// states to arrive at the tower.
|
|
||||||
h.backupStates(chanID, maxUpdates-1, numUpdates, nil)
|
|
||||||
h.server.waitForUpdates(hints[:maxUpdates], waitTime)
|
|
||||||
|
|
||||||
// Finally, stop the client which will continue to
|
|
||||||
// attempt session negotiation since it has one more
|
|
||||||
// state to process. After the force quite delay
|
|
||||||
// expires, the client should force quite itself and
|
|
||||||
// allow the test to complete.
|
|
||||||
h.server.stop()
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
// Assert that if a client changes the address for a server and
|
// Assert that if a client changes the address for a server and
|
||||||
// then tries to back up updates then the client will switch to
|
// then tries to back up updates then the client will switch to
|
||||||
@@ -1937,7 +1864,7 @@ var clientTests = []clientTest{
|
|||||||
require.False(h.t, h.isSessionClosable(sessionIDs[0]))
|
require.False(h.t, h.isSessionClosable(sessionIDs[0]))
|
||||||
|
|
||||||
// Restart the client.
|
// Restart the client.
|
||||||
h.client.ForceQuit()
|
require.NoError(h.t, h.client.Stop())
|
||||||
h.startClient()
|
h.startClient()
|
||||||
|
|
||||||
// The session should now have been marked as closable.
|
// The session should now have been marked as closable.
|
||||||
@@ -2176,9 +2103,8 @@ var clientTests = []clientTest{
|
|||||||
|
|
||||||
h.backupStates(chanID, 0, numUpdates/2, nil)
|
h.backupStates(chanID, 0, numUpdates/2, nil)
|
||||||
|
|
||||||
// Restart the Client (force quit). And also now start
|
// Restart the Client. And also now start the server.
|
||||||
// the server.
|
require.NoError(h.t, h.client.Stop())
|
||||||
h.client.ForceQuit()
|
|
||||||
h.server.start()
|
h.server.start()
|
||||||
h.startClient()
|
h.startClient()
|
||||||
|
|
||||||
@@ -2237,8 +2163,7 @@ var clientTests = []clientTest{
|
|||||||
{
|
{
|
||||||
// Show that if a client switches to a new tower _after_ backup
|
// Show that if a client switches to a new tower _after_ backup
|
||||||
// tasks have been bound to the session with the first old tower
|
// tasks have been bound to the session with the first old tower
|
||||||
// then these updates are _not_ replayed onto the new tower.
|
// then these updates are replayed onto the new tower.
|
||||||
// This is a bug that will be fixed in a future commit.
|
|
||||||
name: "switch to new tower after tasks are bound",
|
name: "switch to new tower after tasks are bound",
|
||||||
cfg: harnessCfg{
|
cfg: harnessCfg{
|
||||||
localBalance: localBalance,
|
localBalance: localBalance,
|
||||||
@@ -2290,18 +2215,11 @@ var clientTests = []clientTest{
|
|||||||
// Back up the final task.
|
// Back up the final task.
|
||||||
h.backupStates(chanID, numUpdates-1, numUpdates, nil)
|
h.backupStates(chanID, numUpdates-1, numUpdates, nil)
|
||||||
|
|
||||||
// Show that only the latest backup is backed up to the
|
// Show that all the backups (the ones added while no
|
||||||
// server and that the ones backed up while no tower was
|
// towers were online and the one added after adding the
|
||||||
// online were _not_ backed up to either server. This is
|
// second tower) are backed up to the second tower.
|
||||||
// a bug that will be fixed in a future commit.
|
|
||||||
server2.waitForUpdates(
|
server2.waitForUpdates(
|
||||||
hints[numUpdates-1:], time.Second,
|
hints[numUpdates/2:numUpdates], waitTime,
|
||||||
)
|
|
||||||
server2.assertUpdatesNotFound(
|
|
||||||
hints[numUpdates/2 : numUpdates-1],
|
|
||||||
)
|
|
||||||
h.server.assertUpdatesNotFound(
|
|
||||||
hints[numUpdates/2 : numUpdates-1],
|
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@@ -16,17 +16,21 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/watchtower/wtwire"
|
"github.com/lightningnetwork/lnd/watchtower/wtwire"
|
||||||
)
|
)
|
||||||
|
|
||||||
// reserveStatus is an enum that signals how full a particular session is.
|
// sessionQueueStatus is an enum that signals how full a particular session is.
|
||||||
type reserveStatus uint8
|
type sessionQueueStatus uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// reserveAvailable indicates that the session has space for at least
|
// sessionQueueAvailable indicates that the session has space for at
|
||||||
// one more backup.
|
// least one more backup.
|
||||||
reserveAvailable reserveStatus = iota
|
sessionQueueAvailable sessionQueueStatus = iota
|
||||||
|
|
||||||
// reserveExhausted indicates that all slots in the session have been
|
// sessionQueueExhausted indicates that all slots in the session have
|
||||||
// allocated.
|
// been allocated.
|
||||||
reserveExhausted
|
sessionQueueExhausted
|
||||||
|
|
||||||
|
// sessionQueueShuttingDown indicates that the session queue is
|
||||||
|
// shutting down and so is no longer accepting any more backups.
|
||||||
|
sessionQueueShuttingDown
|
||||||
)
|
)
|
||||||
|
|
||||||
// sessionQueueConfig bundles the resources required by the sessionQueue to
|
// sessionQueueConfig bundles the resources required by the sessionQueue to
|
||||||
@@ -62,6 +66,10 @@ type sessionQueueConfig struct {
|
|||||||
// certain revoked commitment height.
|
// certain revoked commitment height.
|
||||||
BuildBreachRetribution BreachRetributionBuilder
|
BuildBreachRetribution BreachRetributionBuilder
|
||||||
|
|
||||||
|
// TaskPipeline is a pipeline which the sessionQueue should use to send
|
||||||
|
// any unhandled tasks on shutdown of the queue.
|
||||||
|
TaskPipeline *DiskOverflowQueue[*wtdb.BackupID]
|
||||||
|
|
||||||
// DB provides access to the client's stable storage.
|
// DB provides access to the client's stable storage.
|
||||||
DB DB
|
DB DB
|
||||||
|
|
||||||
@@ -85,10 +93,8 @@ type sessionQueueConfig struct {
|
|||||||
|
|
||||||
// sessionQueue implements a reliable queue that will encrypt and send accepted
|
// sessionQueue implements a reliable queue that will encrypt and send accepted
|
||||||
// backups to the watchtower specified in the config's ClientSession. Calling
|
// backups to the watchtower specified in the config's ClientSession. Calling
|
||||||
// Quit will attempt to perform a clean shutdown by receiving an ACK from the
|
// Stop will attempt to perform a clean shutdown replaying any un-committed
|
||||||
// tower for all pending backups before exiting. The clean shutdown can be
|
// pending updates to the TowerClient's main task pipeline.
|
||||||
// aborted by using ForceQuit, which will attempt to shut down the queue
|
|
||||||
// immediately.
|
|
||||||
type sessionQueue struct {
|
type sessionQueue struct {
|
||||||
started sync.Once
|
started sync.Once
|
||||||
stopped sync.Once
|
stopped sync.Once
|
||||||
@@ -109,9 +115,8 @@ type sessionQueue struct {
|
|||||||
|
|
||||||
retryBackoff time.Duration
|
retryBackoff time.Duration
|
||||||
|
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
forceQuit chan struct{}
|
wg sync.WaitGroup
|
||||||
shutdown chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSessionQueue initializes a fresh sessionQueue.
|
// newSessionQueue initializes a fresh sessionQueue.
|
||||||
@@ -133,8 +138,6 @@ func newSessionQueue(cfg *sessionQueueConfig,
|
|||||||
seqNum: cfg.ClientSession.SeqNum,
|
seqNum: cfg.ClientSession.SeqNum,
|
||||||
retryBackoff: cfg.MinBackoff,
|
retryBackoff: cfg.MinBackoff,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
forceQuit: make(chan struct{}),
|
|
||||||
shutdown: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
sq.queueCond = sync.NewCond(&sq.queueMtx)
|
sq.queueCond = sync.NewCond(&sq.queueMtx)
|
||||||
|
|
||||||
@@ -151,41 +154,77 @@ func newSessionQueue(cfg *sessionQueueConfig,
|
|||||||
// backups.
|
// backups.
|
||||||
func (q *sessionQueue) Start() {
|
func (q *sessionQueue) Start() {
|
||||||
q.started.Do(func() {
|
q.started.Do(func() {
|
||||||
|
q.wg.Add(1)
|
||||||
go q.sessionManager()
|
go q.sessionManager()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
|
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
|
||||||
// will clear all pending tasks in the queue before returning to the caller.
|
// will clear all pending tasks in the queue before returning to the caller.
|
||||||
func (q *sessionQueue) Stop() {
|
func (q *sessionQueue) Stop() error {
|
||||||
|
var returnErr error
|
||||||
q.stopped.Do(func() {
|
q.stopped.Do(func() {
|
||||||
q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
|
q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
|
||||||
|
|
||||||
close(q.quit)
|
close(q.quit)
|
||||||
q.signalUntilShutdown()
|
|
||||||
|
|
||||||
// Skip log if we also force quit.
|
shutdown := make(chan struct{})
|
||||||
select {
|
go func() {
|
||||||
case <-q.forceQuit:
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Millisecond):
|
||||||
|
q.queueCond.Signal()
|
||||||
|
case <-shutdown:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
q.wg.Wait()
|
||||||
|
close(shutdown)
|
||||||
|
|
||||||
|
// Now, for any task in the pending queue that we have not yet
|
||||||
|
// created a CommittedUpdate for, re-add the task to the main
|
||||||
|
// task pipeline.
|
||||||
|
updates, err := q.cfg.DB.FetchSessionCommittedUpdates(q.ID())
|
||||||
|
if err != nil {
|
||||||
|
returnErr = err
|
||||||
return
|
return
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unAckedUpdates := make(map[wtdb.BackupID]bool)
|
||||||
|
for _, update := range updates {
|
||||||
|
unAckedUpdates[update.BackupID] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push any task that was on the pending queue that there is
|
||||||
|
// not yet a committed update for back to the main task
|
||||||
|
// pipeline.
|
||||||
|
q.queueCond.L.Lock()
|
||||||
|
for q.pendingQueue.Len() > 0 {
|
||||||
|
next := q.pendingQueue.Front()
|
||||||
|
q.pendingQueue.Remove(next)
|
||||||
|
|
||||||
|
//nolint:forcetypeassert
|
||||||
|
task := next.Value.(*backupTask)
|
||||||
|
|
||||||
|
if unAckedUpdates[task.id] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := q.cfg.TaskPipeline.QueueBackupID(&task.id)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("could not re-queue backup task: "+
|
||||||
|
"%v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
q.queueCond.L.Unlock()
|
||||||
|
|
||||||
q.log.Debugf("SessionQueue(%s) stopped", q.ID())
|
q.log.Debugf("SessionQueue(%s) stopped", q.ID())
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
// ForceQuit idempotently aborts any clean shutdown in progress and returns to
|
return returnErr
|
||||||
// he caller after all lingering goroutines have spun down.
|
|
||||||
func (q *sessionQueue) ForceQuit() {
|
|
||||||
q.forced.Do(func() {
|
|
||||||
q.log.Infof("SessionQueue(%s) force quitting...", q.ID())
|
|
||||||
|
|
||||||
close(q.forceQuit)
|
|
||||||
q.signalUntilShutdown()
|
|
||||||
|
|
||||||
q.log.Infof("SessionQueue(%s) force quit", q.ID())
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
|
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
|
||||||
@@ -196,10 +235,28 @@ func (q *sessionQueue) ID() *wtdb.SessionID {
|
|||||||
|
|
||||||
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
|
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
|
||||||
// tower. The session will only be accepted if the queue is not already
|
// tower. The session will only be accepted if the queue is not already
|
||||||
// exhausted and the task is successfully bound to the ClientSession.
|
// exhausted or shutting down and the task is successfully bound to the
|
||||||
func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
// ClientSession.
|
||||||
|
func (q *sessionQueue) AcceptTask(task *backupTask) (sessionQueueStatus, bool) {
|
||||||
|
// Exit early if the queue has started shutting down.
|
||||||
|
select {
|
||||||
|
case <-q.quit:
|
||||||
|
return sessionQueueShuttingDown, false
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
q.queueCond.L.Lock()
|
q.queueCond.L.Lock()
|
||||||
|
|
||||||
|
// There is a chance that sessionQueue started shutting down between
|
||||||
|
// the last quit channel check and waiting for the lock. So check one
|
||||||
|
// more time here.
|
||||||
|
select {
|
||||||
|
case <-q.quit:
|
||||||
|
q.queueCond.L.Unlock()
|
||||||
|
return sessionQueueShuttingDown, false
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
numPending := uint32(q.pendingQueue.Len())
|
numPending := uint32(q.pendingQueue.Len())
|
||||||
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
|
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
|
||||||
q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
|
q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
|
||||||
@@ -207,14 +264,14 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
|||||||
q.ID(), task.id, q.seqNum, numPending, maxUpdates)
|
q.ID(), task.id, q.seqNum, numPending, maxUpdates)
|
||||||
|
|
||||||
// Examine the current reserve status of the session queue.
|
// Examine the current reserve status of the session queue.
|
||||||
curStatus := q.reserveStatus()
|
curStatus := q.status()
|
||||||
|
|
||||||
switch curStatus {
|
switch curStatus {
|
||||||
|
|
||||||
// The session queue is exhausted, and cannot accept the task because it
|
// The session queue is exhausted, and cannot accept the task because it
|
||||||
// is full. Reject the task such that it can be tried against a
|
// is full. Reject the task such that it can be tried against a
|
||||||
// different session.
|
// different session.
|
||||||
case reserveExhausted:
|
case sessionQueueExhausted:
|
||||||
q.queueCond.L.Unlock()
|
q.queueCond.L.Unlock()
|
||||||
return curStatus, false
|
return curStatus, false
|
||||||
|
|
||||||
@@ -224,7 +281,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
|||||||
// tried again.
|
// tried again.
|
||||||
//
|
//
|
||||||
// TODO(conner): queue backups and retry with different session params.
|
// TODO(conner): queue backups and retry with different session params.
|
||||||
case reserveAvailable:
|
case sessionQueueAvailable:
|
||||||
err := task.bindSession(
|
err := task.bindSession(
|
||||||
&q.cfg.ClientSession.ClientSessionBody,
|
&q.cfg.ClientSession.ClientSessionBody,
|
||||||
q.cfg.BuildBreachRetribution,
|
q.cfg.BuildBreachRetribution,
|
||||||
@@ -244,7 +301,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
|||||||
// Finally, compute the session's *new* reserve status. This will be
|
// Finally, compute the session's *new* reserve status. This will be
|
||||||
// used by the client to determine if it can continue using this session
|
// used by the client to determine if it can continue using this session
|
||||||
// queue, or if it should negotiate a new one.
|
// queue, or if it should negotiate a new one.
|
||||||
newStatus := q.reserveStatus()
|
newStatus := q.status()
|
||||||
q.queueCond.L.Unlock()
|
q.queueCond.L.Unlock()
|
||||||
|
|
||||||
q.queueCond.Signal()
|
q.queueCond.Signal()
|
||||||
@@ -255,7 +312,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
|||||||
// sessionManager is the primary event loop for the sessionQueue, and is
|
// sessionManager is the primary event loop for the sessionQueue, and is
|
||||||
// responsible for encrypting and sending accepted tasks to the tower.
|
// responsible for encrypting and sending accepted tasks to the tower.
|
||||||
func (q *sessionQueue) sessionManager() {
|
func (q *sessionQueue) sessionManager() {
|
||||||
defer close(q.shutdown)
|
defer q.wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
q.queueCond.L.Lock()
|
q.queueCond.L.Lock()
|
||||||
@@ -266,12 +323,6 @@ func (q *sessionQueue) sessionManager() {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-q.quit:
|
case <-q.quit:
|
||||||
if q.commitQueue.Len() == 0 &&
|
|
||||||
q.pendingQueue.Len() == 0 {
|
|
||||||
q.queueCond.L.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-q.forceQuit:
|
|
||||||
q.queueCond.L.Unlock()
|
q.queueCond.L.Unlock()
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
@@ -279,12 +330,9 @@ func (q *sessionQueue) sessionManager() {
|
|||||||
}
|
}
|
||||||
q.queueCond.L.Unlock()
|
q.queueCond.L.Unlock()
|
||||||
|
|
||||||
// Exit immediately if a force quit has been requested. If
|
// Exit immediately if the sessionQueue has been stopped.
|
||||||
// either of the queues still has state updates to send to the
|
|
||||||
// tower, we may never exit in the above case if we are unable
|
|
||||||
// to reach the tower for some reason.
|
|
||||||
select {
|
select {
|
||||||
case <-q.forceQuit:
|
case <-q.quit:
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@@ -333,7 +381,7 @@ func (q *sessionQueue) drainBackups() {
|
|||||||
q.increaseBackoff()
|
q.increaseBackoff()
|
||||||
select {
|
select {
|
||||||
case <-time.After(q.retryBackoff):
|
case <-time.After(q.retryBackoff):
|
||||||
case <-q.forceQuit:
|
case <-q.quit:
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -366,7 +414,7 @@ func (q *sessionQueue) drainBackups() {
|
|||||||
q.increaseBackoff()
|
q.increaseBackoff()
|
||||||
select {
|
select {
|
||||||
case <-time.After(q.retryBackoff):
|
case <-time.After(q.retryBackoff):
|
||||||
case <-q.forceQuit:
|
case <-q.quit:
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -388,7 +436,7 @@ func (q *sessionQueue) drainBackups() {
|
|||||||
// when we will do so.
|
// when we will do so.
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Millisecond):
|
case <-time.After(time.Millisecond):
|
||||||
case <-q.forceQuit:
|
case <-q.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -635,21 +683,21 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// reserveStatus returns a reserveStatus indicating whether the sessionQueue can
|
// status returns a sessionQueueStatus indicating whether the sessionQueue can
|
||||||
// accept another task. reserveAvailable is returned when a task can be
|
// accept another task. sessionQueueAvailable is returned when a task can be
|
||||||
// accepted, and reserveExhausted is returned if the all slots in the session
|
// accepted, and sessionQueueExhausted is returned if the all slots in the
|
||||||
// have been allocated.
|
// session have been allocated.
|
||||||
//
|
//
|
||||||
// NOTE: This method MUST be called with queueCond's exclusive lock held.
|
// NOTE: This method MUST be called with queueCond's exclusive lock held.
|
||||||
func (q *sessionQueue) reserveStatus() reserveStatus {
|
func (q *sessionQueue) status() sessionQueueStatus {
|
||||||
numPending := uint32(q.pendingQueue.Len())
|
numPending := uint32(q.pendingQueue.Len())
|
||||||
maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
|
maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
|
||||||
|
|
||||||
if uint32(q.seqNum)+numPending < maxUpdates {
|
if uint32(q.seqNum)+numPending < maxUpdates {
|
||||||
return reserveAvailable
|
return sessionQueueAvailable
|
||||||
}
|
}
|
||||||
|
|
||||||
return reserveExhausted
|
return sessionQueueExhausted
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -667,19 +715,6 @@ func (q *sessionQueue) increaseBackoff() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// signalUntilShutdown strobes the sessionQueue's condition variable until the
|
|
||||||
// main event loop exits.
|
|
||||||
func (q *sessionQueue) signalUntilShutdown() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Millisecond):
|
|
||||||
q.queueCond.Signal()
|
|
||||||
case <-q.shutdown:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
|
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
|
||||||
// sessionQueue.
|
// sessionQueue.
|
||||||
type sessionQueueSet struct {
|
type sessionQueueSet struct {
|
||||||
@@ -706,18 +741,18 @@ func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) {
|
|||||||
|
|
||||||
// StopAndRemove stops the given session queue and removes it from the
|
// StopAndRemove stops the given session queue and removes it from the
|
||||||
// sessionQueueSet.
|
// sessionQueueSet.
|
||||||
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) {
|
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) error {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
queue, ok := s.queues[id]
|
queue, ok := s.queues[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.Stop()
|
|
||||||
|
|
||||||
delete(s.queues, id)
|
delete(s.queues, id)
|
||||||
|
|
||||||
|
return queue.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get fetches and returns the sessionQueue with the given ID.
|
// Get fetches and returns the sessionQueue with the given ID.
|
||||||
|
Reference in New Issue
Block a user