mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-26 17:52:25 +01:00
Merge pull request #6895 from ellemouton/towerHasUnackedUpdates
wtclient: replay pending and unacked updates
This commit is contained in:
commit
0d2f2aa802
@ -18,6 +18,8 @@
|
||||
|
||||
* [Replace in-mem task pipeline with a disk-overflow
|
||||
queue](https://github.com/lightningnetwork/lnd/pull/7380)
|
||||
* [Replay pending and un-acked updates onto the main task pipeline if a tower
|
||||
is being removed](https://github.com/lightningnetwork/lnd/pull/6895)
|
||||
|
||||
* [Add defaults](https://github.com/lightningnetwork/lnd/pull/7771) to the
|
||||
wtclient and watchtower config structs and use these to populate the defaults
|
||||
|
@ -252,9 +252,8 @@ type TowerClient interface {
|
||||
|
||||
// BackupState initiates a request to back up a particular revoked
|
||||
// state. If the method returns nil, the backup is guaranteed to be
|
||||
// successful unless the tower is unavailable and client is force quit,
|
||||
// or the justice transaction would create dust outputs when trying to
|
||||
// abide by the negotiated policy.
|
||||
// successful unless the justice transaction would create dust outputs
|
||||
// when trying to abide by the negotiated policy.
|
||||
BackupState(chanID *lnwire.ChannelID, stateNum uint64) error
|
||||
}
|
||||
|
||||
|
@ -1569,7 +1569,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
||||
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
|
||||
MinBackoff: 10 * time.Second,
|
||||
MaxBackoff: 5 * time.Minute,
|
||||
ForceQuitDelay: wtclient.DefaultForceQuitDelay,
|
||||
MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
|
||||
})
|
||||
if err != nil {
|
||||
@ -1603,7 +1602,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
||||
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
|
||||
MinBackoff: 10 * time.Second,
|
||||
MaxBackoff: 5 * time.Minute,
|
||||
ForceQuitDelay: wtclient.DefaultForceQuitDelay,
|
||||
MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -42,11 +42,6 @@ const (
|
||||
// metrics about the client's operation.
|
||||
DefaultStatInterval = time.Minute
|
||||
|
||||
// DefaultForceQuitDelay specifies the default duration after which the
|
||||
// client should abandon any pending updates or session negotiations
|
||||
// before terminating.
|
||||
DefaultForceQuitDelay = 10 * time.Second
|
||||
|
||||
// DefaultSessionCloseRange is the range over which we will generate a
|
||||
// random number of blocks to delay closing a session after its last
|
||||
// channel has been closed.
|
||||
@ -138,9 +133,8 @@ type Client interface {
|
||||
|
||||
// BackupState initiates a request to back up a particular revoked
|
||||
// state. If the method returns nil, the backup is guaranteed to be
|
||||
// successful unless the client is force quit, or the justice
|
||||
// transaction would create dust outputs when trying to abide by the
|
||||
// negotiated policy.
|
||||
// successful unless the justice transaction would create dust outputs
|
||||
// when trying to abide by the negotiated policy.
|
||||
BackupState(chanID *lnwire.ChannelID, stateNum uint64) error
|
||||
|
||||
// Start initializes the watchtower client, allowing it process requests
|
||||
@ -151,10 +145,6 @@ type Client interface {
|
||||
// so, it will attempt to flush the pipeline and deliver any queued
|
||||
// states to the tower before exiting.
|
||||
Stop() error
|
||||
|
||||
// ForceQuit will forcibly shutdown the watchtower client. Calling this
|
||||
// may lead to queued states being dropped.
|
||||
ForceQuit()
|
||||
}
|
||||
|
||||
// Config provides the TowerClient with access to the resources it requires to
|
||||
@ -213,13 +203,6 @@ type Config struct {
|
||||
// the tower must be watching to monitor for breaches.
|
||||
ChainHash chainhash.Hash
|
||||
|
||||
// ForceQuitDelay is the duration after attempting to shutdown that the
|
||||
// client will automatically abort any pending backups if an unclean
|
||||
// shutdown is detected. If the value is less than or equal to zero, a
|
||||
// call to Stop may block indefinitely. The client can always be
|
||||
// ForceQuit externally irrespective of the chosen parameter.
|
||||
ForceQuitDelay time.Duration
|
||||
|
||||
// ReadTimeout is the duration we will wait during a read before
|
||||
// breaking out of a blocking read. If the value is less than or equal
|
||||
// to zero, the default will be used instead.
|
||||
@ -295,7 +278,6 @@ type staleTowerMsg struct {
|
||||
type TowerClient struct {
|
||||
started sync.Once
|
||||
stopped sync.Once
|
||||
forced sync.Once
|
||||
|
||||
cfg *Config
|
||||
|
||||
@ -306,7 +288,7 @@ type TowerClient struct {
|
||||
negotiator SessionNegotiator
|
||||
candidateTowers TowerCandidateIterator
|
||||
candidateSessions map[wtdb.SessionID]*ClientSession
|
||||
activeSessions sessionQueueSet
|
||||
activeSessions *sessionQueueSet
|
||||
|
||||
sessionQueue *sessionQueue
|
||||
prevTask *wtdb.BackupID
|
||||
@ -323,9 +305,8 @@ type TowerClient struct {
|
||||
newTowers chan *newTowerMsg
|
||||
staleTowers chan *staleTowerMsg
|
||||
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
forceQuit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// Compile-time constraint to ensure *TowerClient implements the Client
|
||||
@ -378,14 +359,13 @@ func New(config *Config) (*TowerClient, error) {
|
||||
log: plog,
|
||||
pipeline: queue,
|
||||
chanCommitHeights: make(map[lnwire.ChannelID]uint64),
|
||||
activeSessions: make(sessionQueueSet),
|
||||
activeSessions: newSessionQueueSet(),
|
||||
summaries: chanSummaries,
|
||||
closableSessionQueue: newSessionCloseMinHeap(),
|
||||
statTicker: time.NewTicker(DefaultStatInterval),
|
||||
stats: new(ClientStats),
|
||||
newTowers: make(chan *newTowerMsg),
|
||||
staleTowers: make(chan *staleTowerMsg),
|
||||
forceQuit: make(chan struct{}),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
@ -697,58 +677,44 @@ func (c *TowerClient) Stop() error {
|
||||
c.stopped.Do(func() {
|
||||
c.log.Debugf("Stopping watchtower client")
|
||||
|
||||
// 1. To ensure we don't hang forever on shutdown due to
|
||||
// unintended failures, we'll delay a call to force quit the
|
||||
// pipeline if a ForceQuitDelay is specified. This will have no
|
||||
// effect if the pipeline shuts down cleanly before the delay
|
||||
// fires.
|
||||
//
|
||||
// For full safety, this can be set to 0 and wait out
|
||||
// indefinitely. However for mobile clients which may have a
|
||||
// limited amount of time to exit before the background process
|
||||
// is killed, this offers a way to ensure the process
|
||||
// terminates.
|
||||
if c.cfg.ForceQuitDelay > 0 {
|
||||
time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit)
|
||||
}
|
||||
|
||||
// 2. Shutdown the backup queue, which will prevent any further
|
||||
// updates from being accepted. In practice, the links should be
|
||||
// shutdown before the client has been stopped, so all updates
|
||||
// would have been added prior.
|
||||
err := c.pipeline.Stop()
|
||||
// 1. Stop the session negotiator.
|
||||
err := c.negotiator.Stop()
|
||||
if err != nil {
|
||||
returnErr = err
|
||||
}
|
||||
|
||||
// 3. Once the backup queue has shutdown, wait for the main
|
||||
// dispatcher to exit. The backup queue will signal it's
|
||||
// completion to the dispatcher, which releases the wait group
|
||||
// after all tasks have been assigned to session queues.
|
||||
// 2. Stop the backup dispatcher and any other goroutines.
|
||||
close(c.quit)
|
||||
c.wg.Wait()
|
||||
|
||||
// 4. Since all valid tasks have been assigned to session
|
||||
// queues, we no longer need to negotiate sessions.
|
||||
err = c.negotiator.Stop()
|
||||
if err != nil {
|
||||
returnErr = err
|
||||
// 3. If there was a left over 'prevTask' from the backup
|
||||
// dispatcher, replay that onto the pipeline.
|
||||
if c.prevTask != nil {
|
||||
err = c.pipeline.QueueBackupID(c.prevTask)
|
||||
if err != nil {
|
||||
returnErr = err
|
||||
}
|
||||
}
|
||||
|
||||
c.log.Debugf("Waiting for active session queues to finish "+
|
||||
"draining, stats: %s", c.stats)
|
||||
|
||||
// 5. Shutdown all active session queues in parallel. These will
|
||||
// exit once all updates have been acked by the watchtower.
|
||||
// 4. Shutdown all active session queues in parallel. These will
|
||||
// exit once all unhandled updates have been replayed to the
|
||||
// task pipeline.
|
||||
c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
|
||||
return s.Stop
|
||||
return func() {
|
||||
err := s.Stop(false)
|
||||
if err != nil {
|
||||
c.log.Errorf("could not stop session "+
|
||||
"queue: %s: %v", s.ID(), err)
|
||||
|
||||
returnErr = err
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Skip log if force quitting.
|
||||
select {
|
||||
case <-c.forceQuit:
|
||||
return
|
||||
default:
|
||||
// 5. Shutdown the backup queue, which will prevent any further
|
||||
// updates from being accepted.
|
||||
if err = c.pipeline.Stop(); err != nil {
|
||||
returnErr = err
|
||||
}
|
||||
|
||||
c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
|
||||
@ -757,43 +723,6 @@ func (c *TowerClient) Stop() error {
|
||||
return returnErr
|
||||
}
|
||||
|
||||
// ForceQuit idempotently initiates an unclean shutdown of the watchtower
|
||||
// client. This should only be executed if Stop is unable to exit cleanly.
|
||||
func (c *TowerClient) ForceQuit() {
|
||||
c.forced.Do(func() {
|
||||
c.log.Infof("Force quitting watchtower client")
|
||||
|
||||
// 1. Shutdown the backup queue, which will prevent any further
|
||||
// updates from being accepted. In practice, the links should be
|
||||
// shutdown before the client has been stopped, so all updates
|
||||
// would have been added prior.
|
||||
err := c.pipeline.Stop()
|
||||
if err != nil {
|
||||
c.log.Errorf("could not stop backup queue: %v", err)
|
||||
}
|
||||
|
||||
// 2. Once the backup queue has shutdown, wait for the main
|
||||
// dispatcher to exit. The backup queue will signal it's
|
||||
// completion to the dispatcher, which releases the wait group
|
||||
// after all tasks have been assigned to session queues.
|
||||
close(c.forceQuit)
|
||||
c.wg.Wait()
|
||||
|
||||
// 3. Since all valid tasks have been assigned to session
|
||||
// queues, we no longer need to negotiate sessions.
|
||||
c.negotiator.Stop()
|
||||
|
||||
// 4. Force quit all active session queues in parallel. These
|
||||
// will exit once all updates have been acked by the watchtower.
|
||||
c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
|
||||
return s.ForceQuit
|
||||
})
|
||||
|
||||
c.log.Infof("Watchtower client unclean shutdown complete, "+
|
||||
"stats: %s", c.stats)
|
||||
})
|
||||
}
|
||||
|
||||
// RegisterChannel persistently initializes any channel-dependent parameters
|
||||
// within the client. This should be called during link startup to ensure that
|
||||
// the client is able to support the link during operation.
|
||||
@ -832,7 +761,6 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {
|
||||
|
||||
// BackupState initiates a request to back up a particular revoked state. If the
|
||||
// method returns nil, the backup is guaranteed to be successful unless the:
|
||||
// - client is force quit,
|
||||
// - justice transaction would create dust outputs when trying to abide by the
|
||||
// negotiated policy, or
|
||||
// - breached outputs contain too little value to sweep at the target sweep
|
||||
@ -955,9 +883,6 @@ func (c *TowerClient) handleChannelCloses(chanSub subscribe.Subscription) {
|
||||
err)
|
||||
}
|
||||
|
||||
case <-c.forceQuit:
|
||||
return
|
||||
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
@ -1085,9 +1010,6 @@ func (c *TowerClient) handleClosableSessions(
|
||||
}
|
||||
}
|
||||
|
||||
case <-c.forceQuit:
|
||||
return
|
||||
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
@ -1246,8 +1168,7 @@ func (c *TowerClient) deleteSessionFromTower(sess *wtdb.ClientSession) error {
|
||||
|
||||
// backupDispatcher processes events coming from the taskPipeline and is
|
||||
// responsible for detecting when the client needs to renegotiate a session to
|
||||
// fulfill continuing demand. The event loop exits after all tasks have been
|
||||
// received from the upstream taskPipeline, or the taskPipeline is force quit.
|
||||
// fulfill continuing demand. The event loop exits if the TowerClient is quit.
|
||||
//
|
||||
// NOTE: This method MUST be run as a goroutine.
|
||||
func (c *TowerClient) backupDispatcher() {
|
||||
@ -1297,7 +1218,7 @@ func (c *TowerClient) backupDispatcher() {
|
||||
case msg := <-c.staleTowers:
|
||||
msg.errChan <- c.handleStaleTower(msg)
|
||||
|
||||
case <-c.forceQuit:
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
|
||||
@ -1381,6 +1302,9 @@ func (c *TowerClient) backupDispatcher() {
|
||||
// of its corresponding candidate sessions as inactive.
|
||||
case msg := <-c.staleTowers:
|
||||
msg.errChan <- c.handleStaleTower(msg)
|
||||
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1422,7 +1346,7 @@ func (c *TowerClient) processTask(task *wtdb.BackupID) {
|
||||
// sessionQueue will be removed if accepting the task left the sessionQueue in
|
||||
// an exhausted state.
|
||||
func (c *TowerClient) taskAccepted(task *wtdb.BackupID,
|
||||
newStatus reserveStatus) {
|
||||
newStatus sessionQueueStatus) {
|
||||
|
||||
c.log.Infof("Queued %v successfully for session %v", task,
|
||||
c.sessionQueue.ID())
|
||||
@ -1436,11 +1360,11 @@ func (c *TowerClient) taskAccepted(task *wtdb.BackupID,
|
||||
switch newStatus {
|
||||
|
||||
// The sessionQueue still has capacity after accepting this task.
|
||||
case reserveAvailable:
|
||||
case sessionQueueAvailable:
|
||||
|
||||
// The sessionQueue is full after accepting this task, so we will need
|
||||
// to request a new one before proceeding.
|
||||
case reserveExhausted:
|
||||
case sessionQueueExhausted:
|
||||
c.stats.sessionExhausted()
|
||||
|
||||
c.log.Debugf("Session %s exhausted", c.sessionQueue.ID())
|
||||
@ -1456,16 +1380,17 @@ func (c *TowerClient) taskAccepted(task *wtdb.BackupID,
|
||||
// the state the was in *before* the task was rejected. The client's prevTask
|
||||
// will cache the task if the sessionQueue was exhausted beforehand, and nil
|
||||
// the sessionQueue to find a new session. If the sessionQueue was not
|
||||
// exhausted, the client marks the task as ineligible, as this implies we
|
||||
// couldn't construct a valid justice transaction given the session's policy.
|
||||
// exhausted and not shutting down, the client marks the task as ineligible, as
|
||||
// this implies we couldn't construct a valid justice transaction given the
|
||||
// session's policy.
|
||||
func (c *TowerClient) taskRejected(task *wtdb.BackupID,
|
||||
curStatus reserveStatus) {
|
||||
curStatus sessionQueueStatus) {
|
||||
|
||||
switch curStatus {
|
||||
|
||||
// The sessionQueue has available capacity but the task was rejected,
|
||||
// this indicates that the task was ineligible for backup.
|
||||
case reserveAvailable:
|
||||
case sessionQueueAvailable:
|
||||
c.stats.taskIneligible()
|
||||
|
||||
c.log.Infof("Ignoring ineligible %v", task)
|
||||
@ -1491,7 +1416,7 @@ func (c *TowerClient) taskRejected(task *wtdb.BackupID,
|
||||
|
||||
// The sessionQueue rejected the task because it is full, we will stash
|
||||
// this task and try to add it to the next available sessionQueue.
|
||||
case reserveExhausted:
|
||||
case sessionQueueExhausted:
|
||||
c.stats.sessionExhausted()
|
||||
|
||||
c.log.Debugf("Session %v exhausted, %v queued for next session",
|
||||
@ -1501,6 +1426,18 @@ func (c *TowerClient) taskRejected(task *wtdb.BackupID,
|
||||
// once a new session queue is available.
|
||||
c.sessionQueue = nil
|
||||
c.prevTask = task
|
||||
|
||||
// The sessionQueue rejected the task because it is shutting down. We
|
||||
// will stash this task and try to add it to the next available
|
||||
// sessionQueue.
|
||||
case sessionQueueShuttingDown:
|
||||
c.log.Debugf("Session %v is shutting down, %v queued for "+
|
||||
"next session", c.sessionQueue.ID(), task)
|
||||
|
||||
// Cache the task that we pulled off, so that we can process it
|
||||
// once a new session queue is available.
|
||||
c.sessionQueue = nil
|
||||
c.prevTask = task
|
||||
}
|
||||
}
|
||||
|
||||
@ -1600,6 +1537,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession,
|
||||
MaxBackoff: c.cfg.MaxBackoff,
|
||||
Log: c.log,
|
||||
BuildBreachRetribution: c.cfg.BuildBreachRetribution,
|
||||
TaskPipeline: c.pipeline,
|
||||
}, updates)
|
||||
}
|
||||
|
||||
@ -1609,7 +1547,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession,
|
||||
func (c *TowerClient) getOrInitActiveQueue(s *ClientSession,
|
||||
updates []wtdb.CommittedUpdate) *sessionQueue {
|
||||
|
||||
if sq, ok := c.activeSessions[s.ID]; ok {
|
||||
if sq, ok := c.activeSessions.Get(s.ID); ok {
|
||||
return sq
|
||||
}
|
||||
|
||||
@ -1628,12 +1566,10 @@ func (c *TowerClient) initActiveQueue(s *ClientSession,
|
||||
sq := c.newSessionQueue(s, updates)
|
||||
|
||||
// Add the session queue as an active session so that we remember to
|
||||
// stop it on shutdown.
|
||||
c.activeSessions.Add(sq)
|
||||
|
||||
// Start the queue so that it can be active in processing newly assigned
|
||||
// tasks or to upload previously committed updates.
|
||||
sq.Start()
|
||||
// stop it on shutdown. This method will also start the queue so that it
|
||||
// can be active in processing newly assigned tasks or to upload
|
||||
// previously committed updates.
|
||||
c.activeSessions.AddAndStart(sq)
|
||||
|
||||
return sq
|
||||
}
|
||||
@ -1741,10 +1677,10 @@ func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey,
|
||||
}
|
||||
}
|
||||
|
||||
// handleNewTower handles a request for an existing tower to be removed. If none
|
||||
// of the tower's sessions have pending updates, then they will become inactive
|
||||
// and removed as candidates. If the active session queue corresponds to any of
|
||||
// these sessions, a new one will be negotiated.
|
||||
// handleStaleTower handles a request for an existing tower to be removed. If
|
||||
// none of the tower's sessions have pending updates, then they will become
|
||||
// inactive and removed as candidates. If the active session queue corresponds
|
||||
// to any of these sessions, a new one will be negotiated.
|
||||
func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error {
|
||||
// We'll load the tower before potentially removing it in order to
|
||||
// retrieve its ID within the database.
|
||||
@ -1753,37 +1689,20 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// We'll first update our in-memory state followed by our persisted
|
||||
// state, with the stale tower. The removal of the tower address from
|
||||
// the in-memory state will fail if the address is currently being used
|
||||
// for a session negotiation.
|
||||
err = c.candidateTowers.RemoveCandidate(dbTower.ID, msg.addr)
|
||||
// If an address was provided, then we're only meant to remove the
|
||||
// address from the tower.
|
||||
if msg.addr != nil {
|
||||
return c.removeTowerAddr(dbTower, msg.addr)
|
||||
}
|
||||
|
||||
// Otherwise, the tower should no longer be used for future session
|
||||
// negotiations and backups. First, we'll update our in-memory state
|
||||
// with the stale tower.
|
||||
err = c.candidateTowers.RemoveCandidate(dbTower.ID, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.cfg.DB.RemoveTower(msg.pubKey, msg.addr); err != nil {
|
||||
// If the persisted state update fails, re-add the address to
|
||||
// our in-memory state.
|
||||
tower, newTowerErr := NewTowerFromDBTower(dbTower)
|
||||
if newTowerErr != nil {
|
||||
log.Errorf("could not create new in-memory tower: %v",
|
||||
newTowerErr)
|
||||
} else {
|
||||
c.candidateTowers.AddCandidate(tower)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// If an address was provided, then we're only meant to remove the
|
||||
// address from the tower, so there's nothing left for us to do.
|
||||
if msg.addr != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Otherwise, the tower should no longer be used for future session
|
||||
// negotiations and backups.
|
||||
pubKey := msg.pubKey.SerializeCompressed()
|
||||
sessions, err := c.cfg.DB.ListClientSessions(&dbTower.ID)
|
||||
if err != nil {
|
||||
@ -1792,6 +1711,14 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error {
|
||||
}
|
||||
for sessionID := range sessions {
|
||||
delete(c.candidateSessions, sessionID)
|
||||
|
||||
// Shutdown the session so that any pending updates are
|
||||
// replayed back onto the main task pipeline.
|
||||
err = c.activeSessions.StopAndRemove(sessionID)
|
||||
if err != nil {
|
||||
c.log.Errorf("could not stop session %s: %w", sessionID,
|
||||
err)
|
||||
}
|
||||
}
|
||||
|
||||
// If our active session queue corresponds to the stale tower, we'll
|
||||
@ -1804,6 +1731,40 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, we will update our persisted state with the stale tower.
|
||||
return c.cfg.DB.RemoveTower(msg.pubKey, nil)
|
||||
}
|
||||
|
||||
// removeTowerAddr removes the given address from the tower.
|
||||
func (c *TowerClient) removeTowerAddr(tower *wtdb.Tower, addr net.Addr) error {
|
||||
if addr == nil {
|
||||
return fmt.Errorf("an address must be provided")
|
||||
}
|
||||
|
||||
// We'll first update our in-memory state followed by our persisted
|
||||
// state with the stale tower. The removal of the tower address from
|
||||
// the in-memory state will fail if the address is currently being used
|
||||
// for a session negotiation.
|
||||
err := c.candidateTowers.RemoveCandidate(tower.ID, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.cfg.DB.RemoveTower(tower.IdentityKey, addr)
|
||||
if err != nil {
|
||||
// If the persisted state update fails, re-add the address to
|
||||
// our in-memory state.
|
||||
tower, newTowerErr := NewTowerFromDBTower(tower)
|
||||
if newTowerErr != nil {
|
||||
log.Errorf("could not create new in-memory tower: %v",
|
||||
newTowerErr)
|
||||
} else {
|
||||
c.candidateTowers.AddCandidate(tower)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -135,6 +135,10 @@ type DB interface {
|
||||
// GetDBQueue returns a BackupID Queue instance under the given name
|
||||
// space.
|
||||
GetDBQueue(namespace []byte) wtdb.Queue[*wtdb.BackupID]
|
||||
|
||||
// DeleteCommittedUpdate deletes the committed update belonging to the
|
||||
// given session and with the given sequence number from the db.
|
||||
DeleteCommittedUpdate(id *wtdb.SessionID, seqNum uint16) error
|
||||
}
|
||||
|
||||
// AuthDialer connects to a remote node using an authenticated transport, such
|
||||
|
@ -16,17 +16,21 @@ import (
|
||||
"github.com/lightningnetwork/lnd/watchtower/wtwire"
|
||||
)
|
||||
|
||||
// reserveStatus is an enum that signals how full a particular session is.
|
||||
type reserveStatus uint8
|
||||
// sessionQueueStatus is an enum that signals how full a particular session is.
|
||||
type sessionQueueStatus uint8
|
||||
|
||||
const (
|
||||
// reserveAvailable indicates that the session has space for at least
|
||||
// one more backup.
|
||||
reserveAvailable reserveStatus = iota
|
||||
// sessionQueueAvailable indicates that the session has space for at
|
||||
// least one more backup.
|
||||
sessionQueueAvailable sessionQueueStatus = iota
|
||||
|
||||
// reserveExhausted indicates that all slots in the session have been
|
||||
// allocated.
|
||||
reserveExhausted
|
||||
// sessionQueueExhausted indicates that all slots in the session have
|
||||
// been allocated.
|
||||
sessionQueueExhausted
|
||||
|
||||
// sessionQueueShuttingDown indicates that the session queue is
|
||||
// shutting down and so is no longer accepting any more backups.
|
||||
sessionQueueShuttingDown
|
||||
)
|
||||
|
||||
// sessionQueueConfig bundles the resources required by the sessionQueue to
|
||||
@ -62,6 +66,10 @@ type sessionQueueConfig struct {
|
||||
// certain revoked commitment height.
|
||||
BuildBreachRetribution BreachRetributionBuilder
|
||||
|
||||
// TaskPipeline is a pipeline which the sessionQueue should use to send
|
||||
// any unhandled tasks on shutdown of the queue.
|
||||
TaskPipeline *DiskOverflowQueue[*wtdb.BackupID]
|
||||
|
||||
// DB provides access to the client's stable storage.
|
||||
DB DB
|
||||
|
||||
@ -85,10 +93,8 @@ type sessionQueueConfig struct {
|
||||
|
||||
// sessionQueue implements a reliable queue that will encrypt and send accepted
|
||||
// backups to the watchtower specified in the config's ClientSession. Calling
|
||||
// Quit will attempt to perform a clean shutdown by receiving an ACK from the
|
||||
// tower for all pending backups before exiting. The clean shutdown can be
|
||||
// aborted by using ForceQuit, which will attempt to shut down the queue
|
||||
// immediately.
|
||||
// Stop will attempt to perform a clean shutdown replaying any un-committed
|
||||
// pending updates to the TowerClient's main task pipeline.
|
||||
type sessionQueue struct {
|
||||
started sync.Once
|
||||
stopped sync.Once
|
||||
@ -109,9 +115,8 @@ type sessionQueue struct {
|
||||
|
||||
retryBackoff time.Duration
|
||||
|
||||
quit chan struct{}
|
||||
forceQuit chan struct{}
|
||||
shutdown chan struct{}
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// newSessionQueue initializes a fresh sessionQueue.
|
||||
@ -133,8 +138,6 @@ func newSessionQueue(cfg *sessionQueueConfig,
|
||||
seqNum: cfg.ClientSession.SeqNum,
|
||||
retryBackoff: cfg.MinBackoff,
|
||||
quit: make(chan struct{}),
|
||||
forceQuit: make(chan struct{}),
|
||||
shutdown: make(chan struct{}),
|
||||
}
|
||||
sq.queueCond = sync.NewCond(&sq.queueMtx)
|
||||
|
||||
@ -151,41 +154,102 @@ func newSessionQueue(cfg *sessionQueueConfig,
|
||||
// backups.
|
||||
func (q *sessionQueue) Start() {
|
||||
q.started.Do(func() {
|
||||
q.wg.Add(1)
|
||||
go q.sessionManager()
|
||||
})
|
||||
}
|
||||
|
||||
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
|
||||
// will clear all pending tasks in the queue before returning to the caller.
|
||||
func (q *sessionQueue) Stop() {
|
||||
// The final param should only be set to true if this is the last time that
|
||||
// this session will be used. Otherwise, during normal shutdown, the final param
|
||||
// should be false.
|
||||
func (q *sessionQueue) Stop(final bool) error {
|
||||
var returnErr error
|
||||
q.stopped.Do(func() {
|
||||
q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
|
||||
|
||||
close(q.quit)
|
||||
q.signalUntilShutdown()
|
||||
|
||||
// Skip log if we also force quit.
|
||||
select {
|
||||
case <-q.forceQuit:
|
||||
shutdown := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Millisecond):
|
||||
q.queueCond.Signal()
|
||||
case <-shutdown:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
q.wg.Wait()
|
||||
close(shutdown)
|
||||
|
||||
// Now, for any task in the pending queue that we have not yet
|
||||
// created a CommittedUpdate for, re-add the task to the main
|
||||
// task pipeline.
|
||||
updates, err := q.cfg.DB.FetchSessionCommittedUpdates(q.ID())
|
||||
if err != nil {
|
||||
returnErr = err
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
unAckedUpdates := make(map[wtdb.BackupID]bool)
|
||||
for _, update := range updates {
|
||||
unAckedUpdates[update.BackupID] = true
|
||||
|
||||
if !final {
|
||||
continue
|
||||
}
|
||||
|
||||
err := q.cfg.TaskPipeline.QueueBackupID(
|
||||
&update.BackupID,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("could not re-queue %s: %v",
|
||||
update.BackupID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = q.cfg.DB.DeleteCommittedUpdate(
|
||||
q.ID(), update.SeqNum,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("could not delete committed "+
|
||||
"update %d for session %s",
|
||||
update.SeqNum, q.ID())
|
||||
}
|
||||
}
|
||||
|
||||
// Push any task that was on the pending queue that there is
|
||||
// not yet a committed update for back to the main task
|
||||
// pipeline.
|
||||
q.queueCond.L.Lock()
|
||||
for q.pendingQueue.Len() > 0 {
|
||||
next := q.pendingQueue.Front()
|
||||
q.pendingQueue.Remove(next)
|
||||
|
||||
//nolint:forcetypeassert
|
||||
task := next.Value.(*backupTask)
|
||||
|
||||
if unAckedUpdates[task.id] {
|
||||
continue
|
||||
}
|
||||
|
||||
err := q.cfg.TaskPipeline.QueueBackupID(&task.id)
|
||||
if err != nil {
|
||||
log.Errorf("could not re-queue backup task: "+
|
||||
"%v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
q.log.Debugf("SessionQueue(%s) stopped", q.ID())
|
||||
})
|
||||
}
|
||||
|
||||
// ForceQuit idempotently aborts any clean shutdown in progress and returns to
|
||||
// he caller after all lingering goroutines have spun down.
|
||||
func (q *sessionQueue) ForceQuit() {
|
||||
q.forced.Do(func() {
|
||||
q.log.Infof("SessionQueue(%s) force quitting...", q.ID())
|
||||
|
||||
close(q.forceQuit)
|
||||
q.signalUntilShutdown()
|
||||
|
||||
q.log.Infof("SessionQueue(%s) force quit", q.ID())
|
||||
})
|
||||
return returnErr
|
||||
}
|
||||
|
||||
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
|
||||
@ -196,10 +260,28 @@ func (q *sessionQueue) ID() *wtdb.SessionID {
|
||||
|
||||
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
|
||||
// tower. The session will only be accepted if the queue is not already
|
||||
// exhausted and the task is successfully bound to the ClientSession.
|
||||
func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
// exhausted or shutting down and the task is successfully bound to the
|
||||
// ClientSession.
|
||||
func (q *sessionQueue) AcceptTask(task *backupTask) (sessionQueueStatus, bool) {
|
||||
// Exit early if the queue has started shutting down.
|
||||
select {
|
||||
case <-q.quit:
|
||||
return sessionQueueShuttingDown, false
|
||||
default:
|
||||
}
|
||||
|
||||
q.queueCond.L.Lock()
|
||||
|
||||
// There is a chance that sessionQueue started shutting down between
|
||||
// the last quit channel check and waiting for the lock. So check one
|
||||
// more time here.
|
||||
select {
|
||||
case <-q.quit:
|
||||
q.queueCond.L.Unlock()
|
||||
return sessionQueueShuttingDown, false
|
||||
default:
|
||||
}
|
||||
|
||||
numPending := uint32(q.pendingQueue.Len())
|
||||
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
|
||||
q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
|
||||
@ -207,14 +289,14 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
q.ID(), task.id, q.seqNum, numPending, maxUpdates)
|
||||
|
||||
// Examine the current reserve status of the session queue.
|
||||
curStatus := q.reserveStatus()
|
||||
curStatus := q.status()
|
||||
|
||||
switch curStatus {
|
||||
|
||||
// The session queue is exhausted, and cannot accept the task because it
|
||||
// is full. Reject the task such that it can be tried against a
|
||||
// different session.
|
||||
case reserveExhausted:
|
||||
case sessionQueueExhausted:
|
||||
q.queueCond.L.Unlock()
|
||||
return curStatus, false
|
||||
|
||||
@ -224,7 +306,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
// tried again.
|
||||
//
|
||||
// TODO(conner): queue backups and retry with different session params.
|
||||
case reserveAvailable:
|
||||
case sessionQueueAvailable:
|
||||
err := task.bindSession(
|
||||
&q.cfg.ClientSession.ClientSessionBody,
|
||||
q.cfg.BuildBreachRetribution,
|
||||
@ -244,7 +326,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
// Finally, compute the session's *new* reserve status. This will be
|
||||
// used by the client to determine if it can continue using this session
|
||||
// queue, or if it should negotiate a new one.
|
||||
newStatus := q.reserveStatus()
|
||||
newStatus := q.status()
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
q.queueCond.Signal()
|
||||
@ -255,7 +337,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
// sessionManager is the primary event loop for the sessionQueue, and is
|
||||
// responsible for encrypting and sending accepted tasks to the tower.
|
||||
func (q *sessionQueue) sessionManager() {
|
||||
defer close(q.shutdown)
|
||||
defer q.wg.Done()
|
||||
|
||||
for {
|
||||
q.queueCond.L.Lock()
|
||||
@ -266,12 +348,6 @@ func (q *sessionQueue) sessionManager() {
|
||||
|
||||
select {
|
||||
case <-q.quit:
|
||||
if q.commitQueue.Len() == 0 &&
|
||||
q.pendingQueue.Len() == 0 {
|
||||
q.queueCond.L.Unlock()
|
||||
return
|
||||
}
|
||||
case <-q.forceQuit:
|
||||
q.queueCond.L.Unlock()
|
||||
return
|
||||
default:
|
||||
@ -279,12 +355,9 @@ func (q *sessionQueue) sessionManager() {
|
||||
}
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
// Exit immediately if a force quit has been requested. If
|
||||
// either of the queues still has state updates to send to the
|
||||
// tower, we may never exit in the above case if we are unable
|
||||
// to reach the tower for some reason.
|
||||
// Exit immediately if the sessionQueue has been stopped.
|
||||
select {
|
||||
case <-q.forceQuit:
|
||||
case <-q.quit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
@ -333,7 +406,7 @@ func (q *sessionQueue) drainBackups() {
|
||||
q.increaseBackoff()
|
||||
select {
|
||||
case <-time.After(q.retryBackoff):
|
||||
case <-q.forceQuit:
|
||||
case <-q.quit:
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -366,7 +439,7 @@ func (q *sessionQueue) drainBackups() {
|
||||
q.increaseBackoff()
|
||||
select {
|
||||
case <-time.After(q.retryBackoff):
|
||||
case <-q.forceQuit:
|
||||
case <-q.quit:
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -388,7 +461,7 @@ func (q *sessionQueue) drainBackups() {
|
||||
// when we will do so.
|
||||
select {
|
||||
case <-time.After(time.Millisecond):
|
||||
case <-q.forceQuit:
|
||||
case <-q.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -635,21 +708,21 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
|
||||
return nil
|
||||
}
|
||||
|
||||
// reserveStatus returns a reserveStatus indicating whether the sessionQueue can
|
||||
// accept another task. reserveAvailable is returned when a task can be
|
||||
// accepted, and reserveExhausted is returned if the all slots in the session
|
||||
// have been allocated.
|
||||
// status returns a sessionQueueStatus indicating whether the sessionQueue can
|
||||
// accept another task. sessionQueueAvailable is returned when a task can be
|
||||
// accepted, and sessionQueueExhausted is returned if the all slots in the
|
||||
// session have been allocated.
|
||||
//
|
||||
// NOTE: This method MUST be called with queueCond's exclusive lock held.
|
||||
func (q *sessionQueue) reserveStatus() reserveStatus {
|
||||
func (q *sessionQueue) status() sessionQueueStatus {
|
||||
numPending := uint32(q.pendingQueue.Len())
|
||||
maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
|
||||
|
||||
if uint32(q.seqNum)+numPending < maxUpdates {
|
||||
return reserveAvailable
|
||||
return sessionQueueAvailable
|
||||
}
|
||||
|
||||
return reserveExhausted
|
||||
return sessionQueueExhausted
|
||||
|
||||
}
|
||||
|
||||
@ -667,34 +740,65 @@ func (q *sessionQueue) increaseBackoff() {
|
||||
}
|
||||
}
|
||||
|
||||
// signalUntilShutdown strobes the sessionQueue's condition variable until the
|
||||
// main event loop exits.
|
||||
func (q *sessionQueue) signalUntilShutdown() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Millisecond):
|
||||
q.queueCond.Signal()
|
||||
case <-q.shutdown:
|
||||
return
|
||||
}
|
||||
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
|
||||
// sessionQueue.
|
||||
type sessionQueueSet struct {
|
||||
queues map[wtdb.SessionID]*sessionQueue
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// newSessionQueueSet constructs a new sessionQueueSet.
|
||||
func newSessionQueueSet() *sessionQueueSet {
|
||||
return &sessionQueueSet{
|
||||
queues: make(map[wtdb.SessionID]*sessionQueue),
|
||||
}
|
||||
}
|
||||
|
||||
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
|
||||
// sessionQueue.
|
||||
type sessionQueueSet map[wtdb.SessionID]*sessionQueue
|
||||
// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it.
|
||||
func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Add inserts a sessionQueue into the sessionQueueSet.
|
||||
func (s *sessionQueueSet) Add(sessionQueue *sessionQueue) {
|
||||
(*s)[*sessionQueue.ID()] = sessionQueue
|
||||
s.queues[*sessionQueue.ID()] = sessionQueue
|
||||
|
||||
sessionQueue.Start()
|
||||
}
|
||||
|
||||
// StopAndRemove stops the given session queue and removes it from the
|
||||
// sessionQueueSet.
|
||||
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
queue, ok := s.queues[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
delete(s.queues, id)
|
||||
|
||||
return queue.Stop(true)
|
||||
}
|
||||
|
||||
// Get fetches and returns the sessionQueue with the given ID.
|
||||
func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
q, ok := s.queues[id]
|
||||
|
||||
return q, ok
|
||||
}
|
||||
|
||||
// ApplyAndWait executes the nil-adic function returned from getApply for each
|
||||
// sessionQueue in the set in parallel, then waits for all of them to finish
|
||||
// before returning to the caller.
|
||||
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, sessionq := range *s {
|
||||
for _, sessionq := range s.queues {
|
||||
wg.Add(1)
|
||||
go func(sq *sessionQueue) {
|
||||
defer wg.Done()
|
||||
|
@ -2073,6 +2073,42 @@ func (c *ClientDB) GetDBQueue(namespace []byte) Queue[*BackupID] {
|
||||
)
|
||||
}
|
||||
|
||||
// DeleteCommittedUpdate deletes the committed update with the given sequence
|
||||
// number from the given session.
|
||||
func (c *ClientDB) DeleteCommittedUpdate(id *SessionID, seqNum uint16) error {
|
||||
return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
|
||||
sessions := tx.ReadWriteBucket(cSessionBkt)
|
||||
if sessions == nil {
|
||||
return ErrUninitializedDB
|
||||
}
|
||||
|
||||
sessionBkt := sessions.NestedReadWriteBucket(id[:])
|
||||
if sessionBkt == nil {
|
||||
return fmt.Errorf("session bucket %s not found",
|
||||
id.String())
|
||||
}
|
||||
|
||||
// If the commits sub-bucket doesn't exist, there can't possibly
|
||||
// be a corresponding update to remove.
|
||||
sessionCommits := sessionBkt.NestedReadWriteBucket(
|
||||
cSessionCommits,
|
||||
)
|
||||
if sessionCommits == nil {
|
||||
return ErrCommittedUpdateNotFound
|
||||
}
|
||||
|
||||
var seqNumBuf [2]byte
|
||||
byteOrder.PutUint16(seqNumBuf[:], seqNum)
|
||||
|
||||
if sessionCommits.Get(seqNumBuf[:]) == nil {
|
||||
return ErrCommittedUpdateNotFound
|
||||
}
|
||||
|
||||
// Remove the corresponding committed update.
|
||||
return sessionCommits.Delete(seqNumBuf[:])
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// putChannelToSessionMapping adds the given session ID to a channel's
|
||||
// cChanSessions bucket.
|
||||
func putChannelToSessionMapping(chanDetails kvdb.RwBucket,
|
||||
|
@ -195,6 +195,15 @@ func (h *clientDBHarness) ackUpdate(id *wtdb.SessionID, seqNum uint16,
|
||||
require.ErrorIs(h.t, err, expErr)
|
||||
}
|
||||
|
||||
func (h *clientDBHarness) deleteCommittedUpdate(id *wtdb.SessionID,
|
||||
seqNum uint16, expErr error) {
|
||||
|
||||
h.t.Helper()
|
||||
|
||||
err := h.db.DeleteCommittedUpdate(id, seqNum)
|
||||
require.ErrorIs(h.t, err, expErr)
|
||||
}
|
||||
|
||||
func (h *clientDBHarness) markChannelClosed(id lnwire.ChannelID,
|
||||
blockHeight uint32, expErr error) []wtdb.SessionID {
|
||||
|
||||
@ -567,7 +576,8 @@ func testChanSummaries(h *clientDBHarness) {
|
||||
h.registerChan(chanID, expPkScript, wtdb.ErrChannelAlreadyRegistered)
|
||||
}
|
||||
|
||||
// testCommitUpdate tests the behavior of CommitUpdate, ensuring that they can
|
||||
// testCommitUpdate tests the behavior of CommitUpdate and
|
||||
// DeleteCommittedUpdate.
|
||||
func testCommitUpdate(h *clientDBHarness) {
|
||||
const blobType = blob.TypeAltruistCommit
|
||||
|
||||
@ -648,6 +658,22 @@ func testCommitUpdate(h *clientDBHarness) {
|
||||
*update1,
|
||||
*update2,
|
||||
}, nil)
|
||||
|
||||
// We will now also test that the DeleteCommittedUpdates method also
|
||||
// works.
|
||||
// First, try to delete a committed update that does not exist.
|
||||
h.deleteCommittedUpdate(
|
||||
&session.ID, update4.SeqNum, wtdb.ErrCommittedUpdateNotFound,
|
||||
)
|
||||
|
||||
// Now delete an existing committed update and ensure that it succeeds.
|
||||
h.deleteCommittedUpdate(&session.ID, update1.SeqNum, nil)
|
||||
h.assertUpdates(session.ID, []wtdb.CommittedUpdate{
|
||||
*update2,
|
||||
}, nil)
|
||||
|
||||
h.deleteCommittedUpdate(&session.ID, update2.SeqNum, nil)
|
||||
h.assertUpdates(session.ID, []wtdb.CommittedUpdate{}, nil)
|
||||
}
|
||||
|
||||
// testMarkChannelClosed asserts the behaviour of MarkChannelClosed.
|
||||
|
@ -586,6 +586,37 @@ func (m *ClientDB) GetDBQueue(namespace []byte) wtdb.Queue[*wtdb.BackupID] {
|
||||
return q
|
||||
}
|
||||
|
||||
// DeleteCommittedUpdate deletes the committed update with the given sequence
|
||||
// number from the given session.
|
||||
func (m *ClientDB) DeleteCommittedUpdate(id *wtdb.SessionID,
|
||||
seqNum uint16) error {
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Fail if session doesn't exist.
|
||||
session, ok := m.activeSessions[*id]
|
||||
if !ok {
|
||||
return wtdb.ErrClientSessionNotFound
|
||||
}
|
||||
|
||||
// Retrieve the committed update, failing if none is found.
|
||||
updates := m.committedUpdates[session.ID]
|
||||
for i, update := range updates {
|
||||
if update.SeqNum != seqNum {
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove the committed update from "disk".
|
||||
updates = append(updates[:i], updates[i+1:]...)
|
||||
m.committedUpdates[session.ID] = updates
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return wtdb.ErrCommittedUpdateNotFound
|
||||
}
|
||||
|
||||
// ListClosableSessions fetches and returns the IDs for all sessions marked as
|
||||
// closable.
|
||||
func (m *ClientDB) ListClosableSessions() (map[wtdb.SessionID]uint32, error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user