diff --git a/server.go b/server.go index d698535e9..c5ad81f21 100644 --- a/server.go +++ b/server.go @@ -1568,17 +1568,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return s.channelNotifier. SubscribeChannelEvents() }, - Signer: cc.Wallet.Cfg.Signer, - NewAddress: newSweepPkScriptGen(cc.Wallet), - SecretKeyRing: s.cc.KeyRing, - Dial: cfg.net.Dial, - AuthDial: authDial, - DB: dbs.TowerClientDB, - Policy: policy, - ChainHash: *s.cfg.ActiveNetParams.GenesisHash, - MinBackoff: 10 * time.Second, - MaxBackoff: 5 * time.Minute, - ForceQuitDelay: wtclient.DefaultForceQuitDelay, + Signer: cc.Wallet.Cfg.Signer, + NewAddress: newSweepPkScriptGen(cc.Wallet), + SecretKeyRing: s.cc.KeyRing, + Dial: cfg.net.Dial, + AuthDial: authDial, + DB: dbs.TowerClientDB, + Policy: policy, + ChainHash: *s.cfg.ActiveNetParams.GenesisHash, + MinBackoff: 10 * time.Second, + MaxBackoff: 5 * time.Minute, + ForceQuitDelay: wtclient.DefaultForceQuitDelay, + MaxTasksInMemQueue: wtclient.DefaultMaxTasksInMemQueue, }) if err != nil { return nil, err @@ -1601,17 +1602,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return s.channelNotifier. SubscribeChannelEvents() }, - Signer: cc.Wallet.Cfg.Signer, - NewAddress: newSweepPkScriptGen(cc.Wallet), - SecretKeyRing: s.cc.KeyRing, - Dial: cfg.net.Dial, - AuthDial: authDial, - DB: dbs.TowerClientDB, - Policy: anchorPolicy, - ChainHash: *s.cfg.ActiveNetParams.GenesisHash, - MinBackoff: 10 * time.Second, - MaxBackoff: 5 * time.Minute, - ForceQuitDelay: wtclient.DefaultForceQuitDelay, + Signer: cc.Wallet.Cfg.Signer, + NewAddress: newSweepPkScriptGen(cc.Wallet), + SecretKeyRing: s.cc.KeyRing, + Dial: cfg.net.Dial, + AuthDial: authDial, + DB: dbs.TowerClientDB, + Policy: anchorPolicy, + ChainHash: *s.cfg.ActiveNetParams.GenesisHash, + MinBackoff: 10 * time.Second, + MaxBackoff: 5 * time.Minute, + ForceQuitDelay: wtclient.DefaultForceQuitDelay, + MaxTasksInMemQueue: wtclient.DefaultMaxTasksInMemQueue, }) if err != nil { return nil, err diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 2f380cf2d..9c190f810 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -244,6 +244,10 @@ type Config struct { // number of blocks to delay closing a session after its last channel // has been closed. SessionCloseRange uint32 + + // MaxTasksInMemQueue is the maximum number of backup tasks that should + // be kept in-memory. Any more tasks will overflow to disk. + MaxTasksInMemQueue uint64 } // BreachRetributionBuilder is a function that can be used to construct a @@ -297,7 +301,7 @@ type TowerClient struct { log btclog.Logger - pipeline *taskPipeline + pipeline *DiskOverflowQueue[*wtdb.BackupID] negotiator SessionNegotiator candidateTowers TowerCandidateIterator @@ -359,10 +363,21 @@ func New(config *Config) (*TowerClient, error) { return nil, err } + var ( + policy = cfg.Policy.BlobType.String() + queueDB = cfg.DB.GetDBQueue([]byte(policy)) + ) + queue, err := NewDiskOverflowQueue[*wtdb.BackupID]( + queueDB, cfg.MaxTasksInMemQueue, plog, + ) + if err != nil { + return nil, err + } + c := &TowerClient{ cfg: cfg, log: plog, - pipeline: newTaskPipeline(plog), + pipeline: queue, chanCommitHeights: make(map[lnwire.ChannelID]uint64), activeSessions: make(sessionQueueSet), summaries: chanSummaries, @@ -675,6 +690,7 @@ func (c *TowerClient) Start() error { // Stop idempotently initiates a graceful shutdown of the watchtower client. func (c *TowerClient) Stop() error { + var returnErr error c.stopped.Do(func() { c.log.Debugf("Stopping watchtower client") @@ -697,7 +713,10 @@ func (c *TowerClient) Stop() error { // updates from being accepted. In practice, the links should be // shutdown before the client has been stopped, so all updates // would have been added prior. - c.pipeline.Stop() + err := c.pipeline.Stop() + if err != nil { + returnErr = err + } // 3. Once the backup queue has shutdown, wait for the main // dispatcher to exit. The backup queue will signal it's @@ -728,7 +747,8 @@ func (c *TowerClient) Stop() error { c.log.Debugf("Client successfully stopped, stats: %s", c.stats) }) - return nil + + return returnErr } // ForceQuit idempotently initiates an unclean shutdown of the watchtower @@ -741,7 +761,10 @@ func (c *TowerClient) ForceQuit() { // updates from being accepted. In practice, the links should be // shutdown before the client has been stopped, so all updates // would have been added prior. - c.pipeline.ForceQuit() + err := c.pipeline.Stop() + if err != nil { + c.log.Errorf("could not stop backup queue: %v", err) + } // 2. Once the backup queue has shutdown, wait for the main // dispatcher to exit. The backup queue will signal it's @@ -840,7 +863,7 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID, CommitHeight: stateNum, } - return c.pipeline.QueueBackupTask(id) + return c.pipeline.QueueBackupID(id) } // nextSessionQueue attempts to fetch an active session from our set of @@ -1327,7 +1350,7 @@ func (c *TowerClient) backupDispatcher() { // Process each backup task serially from the queue of // revoked states. - case task, ok := <-c.pipeline.NewBackupTasks(): + case task, ok := <-c.pipeline.NextBackupID(): // All backups in the pipeline have been // processed, it is now safe to exit. if !ok { @@ -1639,8 +1662,6 @@ func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error { }: case <-c.pipeline.quit: return ErrClientExiting - case <-c.pipeline.forceQuit: - return ErrClientExiting } select { @@ -1648,8 +1669,6 @@ func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error { return err case <-c.pipeline.quit: return ErrClientExiting - case <-c.pipeline.forceQuit: - return ErrClientExiting } } @@ -1706,8 +1725,6 @@ func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey, }: case <-c.pipeline.quit: return ErrClientExiting - case <-c.pipeline.forceQuit: - return ErrClientExiting } select { @@ -1715,8 +1732,6 @@ func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey, return err case <-c.pipeline.quit: return ErrClientExiting - case <-c.pipeline.forceQuit: - return ErrClientExiting } } diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index eca99f2b7..8dc956a2c 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -509,12 +509,13 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { NewAddress: func() ([]byte, error) { return addrScript, nil }, - ReadTimeout: timeout, - WriteTimeout: timeout, - MinBackoff: time.Millisecond, - MaxBackoff: time.Second, - ForceQuitDelay: 10 * time.Second, - SessionCloseRange: 1, + ReadTimeout: timeout, + WriteTimeout: timeout, + MinBackoff: time.Millisecond, + MaxBackoff: time.Second, + ForceQuitDelay: 10 * time.Second, + SessionCloseRange: 1, + MaxTasksInMemQueue: 2, } h.clientCfg.BuildBreachRetribution = func(id lnwire.ChannelID, @@ -1094,10 +1095,6 @@ var clientTests = []clientTest{ hints := h.advanceChannelN(chanID, numUpdates) h.backupStates(chanID, 0, numUpdates, nil) - // Stop the client in the background, to assert the - // pipeline is always flushed before it exits. - go h.client.Stop() - // Wait for all the updates to be populated in the // server's database. h.waitServerUpdates(hints, time.Second) @@ -1238,10 +1235,6 @@ var clientTests = []clientTest{ // Now, queue the retributions for backup. h.backupStates(chanID, 0, numUpdates, nil) - // Stop the client in the background, to assert the - // pipeline is always flushed before it exits. - go h.client.Stop() - // Give the client time to saturate a large number of // session queues for which the server has not acked the // state updates that it has received. @@ -1346,9 +1339,6 @@ var clientTests = []clientTest{ h.backupStates(id, 0, numUpdates, nil) } - // Test reliable flush under multi-client scenario. - go h.client.Stop() - // Wait for all the updates to be populated in the // server's database. h.waitServerUpdates(hints, 10*time.Second) @@ -1395,9 +1385,6 @@ var clientTests = []clientTest{ // identical one. h.startClient() - // Now, queue the retributions for backup. - h.backupStates(chanID, 0, numUpdates, nil) - // Wait for all the updates to be populated in the // server's database. h.waitServerUpdates(hints, waitTime) @@ -1449,9 +1436,6 @@ var clientTests = []clientTest{ h.clientCfg.Policy.SweepFeeRate *= 2 h.startClient() - // Now, queue the retributions for backup. - h.backupStates(chanID, 0, numUpdates, nil) - // Wait for all the updates to be populated in the // server's database. h.waitServerUpdates(hints, waitTime) @@ -2090,9 +2074,9 @@ var clientTests = []clientTest{ }, { // This test demonstrates that if there is no active session, - // the updates are queued in memory and then lost on restart. - // This behaviour will be fixed in upcoming commits. - name: "lose updates in task pipeline on restart", + // the updates are persisted to disk on restart and reliably + // sent. + name: "in-mem updates not lost on restart", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, @@ -2112,36 +2096,25 @@ var clientTests = []clientTest{ numUpdates = 5 ) - // Advance the channel to create all states. + // Try back up the first few states of the client's + // channel. Since the server has not yet started, the + // client should have no active session yet and so these + // updates will just be kept in an in-memory queue. hints := h.advanceChannelN(chanID, numUpdates) - firstBatch := hints[:numUpdates/2] - secondBatch := hints[numUpdates/2 : numUpdates] - // Attempt to back up the first batch of states of the - // client's channel. Since the server has not yet - // started, the client should have no active session - // yet and so these updates will just be kept in an - // in-memory queue. h.backupStates(chanID, 0, numUpdates/2, nil) // Restart the Client (force quit). And also now start - // the server. The client should now be able to create - // a session with the server. + // the server. h.client.ForceQuit() h.startServer() h.startClient() - // Attempt to now back up the second batch of states. + // Back up a few more states. h.backupStates(chanID, numUpdates/2, numUpdates, nil) - // Assert that the server does receive the updates. - h.waitServerUpdates(secondBatch, waitTime) - - // Assert that the server definitely still has not - // received the initial set of updates. - matches, err := h.serverDB.QueryMatches(firstBatch) - require.NoError(h.t, err) - require.Empty(h.t, matches) + // Assert that the server does receive ALL the updates. + h.waitServerUpdates(hints[0:numUpdates], waitTime) }, }, } diff --git a/watchtower/wtclient/task_pipeline.go b/watchtower/wtclient/task_pipeline.go deleted file mode 100644 index 9415e1d5b..000000000 --- a/watchtower/wtclient/task_pipeline.go +++ /dev/null @@ -1,198 +0,0 @@ -package wtclient - -import ( - "container/list" - "sync" - "time" - - "github.com/btcsuite/btclog" - "github.com/lightningnetwork/lnd/watchtower/wtdb" -) - -// taskPipeline implements a reliable, in-order queue that ensures its queue -// fully drained before exiting. Stopping the taskPipeline prevents the pipeline -// from accepting any further tasks, and will cause the pipeline to exit after -// all updates have been delivered to the downstream receiver. If this process -// hangs and is unable to make progress, users can optionally call ForceQuit to -// abandon the reliable draining of the queue in order to permit shutdown. -type taskPipeline struct { - started sync.Once - stopped sync.Once - forced sync.Once - - log btclog.Logger - - queueMtx sync.Mutex - queueCond *sync.Cond - queue *list.List - - newBackupTasks chan *wtdb.BackupID - - quit chan struct{} - forceQuit chan struct{} - shutdown chan struct{} -} - -// newTaskPipeline initializes a new taskPipeline. -func newTaskPipeline(log btclog.Logger) *taskPipeline { - rq := &taskPipeline{ - log: log, - queue: list.New(), - newBackupTasks: make(chan *wtdb.BackupID), - quit: make(chan struct{}), - forceQuit: make(chan struct{}), - shutdown: make(chan struct{}), - } - rq.queueCond = sync.NewCond(&rq.queueMtx) - - return rq -} - -// Start spins up the taskPipeline, making it eligible to begin receiving backup -// tasks and deliver them to the receiver of NewBackupTasks. -func (q *taskPipeline) Start() { - q.started.Do(func() { - go q.queueManager() - }) -} - -// Stop begins a graceful shutdown of the taskPipeline. This method returns once -// all backupTasks have been delivered via NewBackupTasks, or a ForceQuit causes -// the delivery of pending tasks to be interrupted. -func (q *taskPipeline) Stop() { - q.stopped.Do(func() { - q.log.Debugf("Stopping task pipeline") - - close(q.quit) - q.signalUntilShutdown() - - // Skip log if we also force quit. - select { - case <-q.forceQuit: - default: - q.log.Debugf("Task pipeline stopped successfully") - } - }) -} - -// ForceQuit signals the taskPipeline to immediately exit, dropping any -// backupTasks that have not been delivered via NewBackupTasks. -func (q *taskPipeline) ForceQuit() { - q.forced.Do(func() { - q.log.Infof("Force quitting task pipeline") - - close(q.forceQuit) - q.signalUntilShutdown() - - q.log.Infof("Task pipeline unclean shutdown complete") - }) -} - -// NewBackupTasks returns a read-only channel for enqueue backupTasks. The -// channel will be closed after a call to Stop and all pending tasks have been -// delivered, or if a call to ForceQuit is called before the pending entries -// have been drained. -func (q *taskPipeline) NewBackupTasks() <-chan *wtdb.BackupID { - return q.newBackupTasks -} - -// QueueBackupTask enqueues a backupTask for reliable delivery to the consumer -// of NewBackupTasks. If the taskPipeline is shutting down, ErrClientExiting is -// returned. Otherwise, if QueueBackupTask returns nil it is guaranteed to be -// delivered via NewBackupTasks unless ForceQuit is called before completion. -func (q *taskPipeline) QueueBackupTask(task *wtdb.BackupID) error { - q.queueCond.L.Lock() - select { - - // Reject new tasks after quit has been signaled. - case <-q.quit: - q.queueCond.L.Unlock() - return ErrClientExiting - - // Reject new tasks after force quit has been signaled. - case <-q.forceQuit: - q.queueCond.L.Unlock() - return ErrClientExiting - - default: - } - - // Queue the new task and signal the queue's condition variable to wake - // up the queueManager for processing. - q.queue.PushBack(task) - q.queueCond.L.Unlock() - - q.queueCond.Signal() - - return nil -} - -// queueManager processes all incoming backup requests that get added via -// QueueBackupTask. The manager will exit -// -// NOTE: This method MUST be run as a goroutine. -func (q *taskPipeline) queueManager() { - defer close(q.shutdown) - defer close(q.newBackupTasks) - - for { - q.queueCond.L.Lock() - for q.queue.Front() == nil { - q.queueCond.Wait() - - select { - case <-q.quit: - // Exit only after the queue has been fully - // drained. - if q.queue.Len() == 0 { - q.queueCond.L.Unlock() - q.log.Debugf("Revoked state pipeline " + - "flushed.") - - return - } - - case <-q.forceQuit: - q.queueCond.L.Unlock() - q.log.Debugf("Revoked state pipeline force " + - "quit.") - - return - - default: - } - } - - // Pop the first element from the queue. - e := q.queue.Front() - - //nolint:forcetypeassert - task := q.queue.Remove(e).(*wtdb.BackupID) - q.queueCond.L.Unlock() - - select { - - // Backup task submitted to dispatcher. We don't select on quit - // to ensure that we still drain tasks while shutting down. - case q.newBackupTasks <- task: - - // Force quit, return immediately to allow the client to exit. - case <-q.forceQuit: - q.log.Debugf("Revoked state pipeline force quit.") - return - } - } -} - -// signalUntilShutdown strobes the queue's condition variable to ensure the -// queueManager reliably unblocks to check for the exit condition. -func (q *taskPipeline) signalUntilShutdown() { - for { - select { - case <-time.After(time.Millisecond): - q.queueCond.Signal() - case <-q.shutdown: - return - } - } -}