watchtower: replace taskpipeline with disk overflow queue

This commit is contained in:
Elle Mouton
2023-03-22 14:30:16 +02:00
parent e91fe50878
commit 56cd825695
4 changed files with 72 additions and 280 deletions

View File

@@ -1568,17 +1568,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return s.channelNotifier. return s.channelNotifier.
SubscribeChannelEvents() SubscribeChannelEvents()
}, },
Signer: cc.Wallet.Cfg.Signer, Signer: cc.Wallet.Cfg.Signer,
NewAddress: newSweepPkScriptGen(cc.Wallet), NewAddress: newSweepPkScriptGen(cc.Wallet),
SecretKeyRing: s.cc.KeyRing, SecretKeyRing: s.cc.KeyRing,
Dial: cfg.net.Dial, Dial: cfg.net.Dial,
AuthDial: authDial, AuthDial: authDial,
DB: dbs.TowerClientDB, DB: dbs.TowerClientDB,
Policy: policy, Policy: policy,
ChainHash: *s.cfg.ActiveNetParams.GenesisHash, ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
MinBackoff: 10 * time.Second, MinBackoff: 10 * time.Second,
MaxBackoff: 5 * time.Minute, MaxBackoff: 5 * time.Minute,
ForceQuitDelay: wtclient.DefaultForceQuitDelay, ForceQuitDelay: wtclient.DefaultForceQuitDelay,
MaxTasksInMemQueue: wtclient.DefaultMaxTasksInMemQueue,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -1601,17 +1602,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return s.channelNotifier. return s.channelNotifier.
SubscribeChannelEvents() SubscribeChannelEvents()
}, },
Signer: cc.Wallet.Cfg.Signer, Signer: cc.Wallet.Cfg.Signer,
NewAddress: newSweepPkScriptGen(cc.Wallet), NewAddress: newSweepPkScriptGen(cc.Wallet),
SecretKeyRing: s.cc.KeyRing, SecretKeyRing: s.cc.KeyRing,
Dial: cfg.net.Dial, Dial: cfg.net.Dial,
AuthDial: authDial, AuthDial: authDial,
DB: dbs.TowerClientDB, DB: dbs.TowerClientDB,
Policy: anchorPolicy, Policy: anchorPolicy,
ChainHash: *s.cfg.ActiveNetParams.GenesisHash, ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
MinBackoff: 10 * time.Second, MinBackoff: 10 * time.Second,
MaxBackoff: 5 * time.Minute, MaxBackoff: 5 * time.Minute,
ForceQuitDelay: wtclient.DefaultForceQuitDelay, ForceQuitDelay: wtclient.DefaultForceQuitDelay,
MaxTasksInMemQueue: wtclient.DefaultMaxTasksInMemQueue,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -244,6 +244,10 @@ type Config struct {
// number of blocks to delay closing a session after its last channel // number of blocks to delay closing a session after its last channel
// has been closed. // has been closed.
SessionCloseRange uint32 SessionCloseRange uint32
// MaxTasksInMemQueue is the maximum number of backup tasks that should
// be kept in-memory. Any more tasks will overflow to disk.
MaxTasksInMemQueue uint64
} }
// BreachRetributionBuilder is a function that can be used to construct a // BreachRetributionBuilder is a function that can be used to construct a
@@ -297,7 +301,7 @@ type TowerClient struct {
log btclog.Logger log btclog.Logger
pipeline *taskPipeline pipeline *DiskOverflowQueue[*wtdb.BackupID]
negotiator SessionNegotiator negotiator SessionNegotiator
candidateTowers TowerCandidateIterator candidateTowers TowerCandidateIterator
@@ -359,10 +363,21 @@ func New(config *Config) (*TowerClient, error) {
return nil, err return nil, err
} }
var (
policy = cfg.Policy.BlobType.String()
queueDB = cfg.DB.GetDBQueue([]byte(policy))
)
queue, err := NewDiskOverflowQueue[*wtdb.BackupID](
queueDB, cfg.MaxTasksInMemQueue, plog,
)
if err != nil {
return nil, err
}
c := &TowerClient{ c := &TowerClient{
cfg: cfg, cfg: cfg,
log: plog, log: plog,
pipeline: newTaskPipeline(plog), pipeline: queue,
chanCommitHeights: make(map[lnwire.ChannelID]uint64), chanCommitHeights: make(map[lnwire.ChannelID]uint64),
activeSessions: make(sessionQueueSet), activeSessions: make(sessionQueueSet),
summaries: chanSummaries, summaries: chanSummaries,
@@ -675,6 +690,7 @@ func (c *TowerClient) Start() error {
// Stop idempotently initiates a graceful shutdown of the watchtower client. // Stop idempotently initiates a graceful shutdown of the watchtower client.
func (c *TowerClient) Stop() error { func (c *TowerClient) Stop() error {
var returnErr error
c.stopped.Do(func() { c.stopped.Do(func() {
c.log.Debugf("Stopping watchtower client") c.log.Debugf("Stopping watchtower client")
@@ -697,7 +713,10 @@ func (c *TowerClient) Stop() error {
// updates from being accepted. In practice, the links should be // updates from being accepted. In practice, the links should be
// shutdown before the client has been stopped, so all updates // shutdown before the client has been stopped, so all updates
// would have been added prior. // would have been added prior.
c.pipeline.Stop() err := c.pipeline.Stop()
if err != nil {
returnErr = err
}
// 3. Once the backup queue has shutdown, wait for the main // 3. Once the backup queue has shutdown, wait for the main
// dispatcher to exit. The backup queue will signal it's // dispatcher to exit. The backup queue will signal it's
@@ -728,7 +747,8 @@ func (c *TowerClient) Stop() error {
c.log.Debugf("Client successfully stopped, stats: %s", c.stats) c.log.Debugf("Client successfully stopped, stats: %s", c.stats)
}) })
return nil
return returnErr
} }
// ForceQuit idempotently initiates an unclean shutdown of the watchtower // ForceQuit idempotently initiates an unclean shutdown of the watchtower
@@ -741,7 +761,10 @@ func (c *TowerClient) ForceQuit() {
// updates from being accepted. In practice, the links should be // updates from being accepted. In practice, the links should be
// shutdown before the client has been stopped, so all updates // shutdown before the client has been stopped, so all updates
// would have been added prior. // would have been added prior.
c.pipeline.ForceQuit() err := c.pipeline.Stop()
if err != nil {
c.log.Errorf("could not stop backup queue: %v", err)
}
// 2. Once the backup queue has shutdown, wait for the main // 2. Once the backup queue has shutdown, wait for the main
// dispatcher to exit. The backup queue will signal it's // dispatcher to exit. The backup queue will signal it's
@@ -840,7 +863,7 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,
CommitHeight: stateNum, CommitHeight: stateNum,
} }
return c.pipeline.QueueBackupTask(id) return c.pipeline.QueueBackupID(id)
} }
// nextSessionQueue attempts to fetch an active session from our set of // nextSessionQueue attempts to fetch an active session from our set of
@@ -1327,7 +1350,7 @@ func (c *TowerClient) backupDispatcher() {
// Process each backup task serially from the queue of // Process each backup task serially from the queue of
// revoked states. // revoked states.
case task, ok := <-c.pipeline.NewBackupTasks(): case task, ok := <-c.pipeline.NextBackupID():
// All backups in the pipeline have been // All backups in the pipeline have been
// processed, it is now safe to exit. // processed, it is now safe to exit.
if !ok { if !ok {
@@ -1639,8 +1662,6 @@ func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error {
}: }:
case <-c.pipeline.quit: case <-c.pipeline.quit:
return ErrClientExiting return ErrClientExiting
case <-c.pipeline.forceQuit:
return ErrClientExiting
} }
select { select {
@@ -1648,8 +1669,6 @@ func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error {
return err return err
case <-c.pipeline.quit: case <-c.pipeline.quit:
return ErrClientExiting return ErrClientExiting
case <-c.pipeline.forceQuit:
return ErrClientExiting
} }
} }
@@ -1706,8 +1725,6 @@ func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey,
}: }:
case <-c.pipeline.quit: case <-c.pipeline.quit:
return ErrClientExiting return ErrClientExiting
case <-c.pipeline.forceQuit:
return ErrClientExiting
} }
select { select {
@@ -1715,8 +1732,6 @@ func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey,
return err return err
case <-c.pipeline.quit: case <-c.pipeline.quit:
return ErrClientExiting return ErrClientExiting
case <-c.pipeline.forceQuit:
return ErrClientExiting
} }
} }

View File

@@ -509,12 +509,13 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
NewAddress: func() ([]byte, error) { NewAddress: func() ([]byte, error) {
return addrScript, nil return addrScript, nil
}, },
ReadTimeout: timeout, ReadTimeout: timeout,
WriteTimeout: timeout, WriteTimeout: timeout,
MinBackoff: time.Millisecond, MinBackoff: time.Millisecond,
MaxBackoff: time.Second, MaxBackoff: time.Second,
ForceQuitDelay: 10 * time.Second, ForceQuitDelay: 10 * time.Second,
SessionCloseRange: 1, SessionCloseRange: 1,
MaxTasksInMemQueue: 2,
} }
h.clientCfg.BuildBreachRetribution = func(id lnwire.ChannelID, h.clientCfg.BuildBreachRetribution = func(id lnwire.ChannelID,
@@ -1094,10 +1095,6 @@ var clientTests = []clientTest{
hints := h.advanceChannelN(chanID, numUpdates) hints := h.advanceChannelN(chanID, numUpdates)
h.backupStates(chanID, 0, numUpdates, nil) h.backupStates(chanID, 0, numUpdates, nil)
// Stop the client in the background, to assert the
// pipeline is always flushed before it exits.
go h.client.Stop()
// Wait for all the updates to be populated in the // Wait for all the updates to be populated in the
// server's database. // server's database.
h.waitServerUpdates(hints, time.Second) h.waitServerUpdates(hints, time.Second)
@@ -1238,10 +1235,6 @@ var clientTests = []clientTest{
// Now, queue the retributions for backup. // Now, queue the retributions for backup.
h.backupStates(chanID, 0, numUpdates, nil) h.backupStates(chanID, 0, numUpdates, nil)
// Stop the client in the background, to assert the
// pipeline is always flushed before it exits.
go h.client.Stop()
// Give the client time to saturate a large number of // Give the client time to saturate a large number of
// session queues for which the server has not acked the // session queues for which the server has not acked the
// state updates that it has received. // state updates that it has received.
@@ -1346,9 +1339,6 @@ var clientTests = []clientTest{
h.backupStates(id, 0, numUpdates, nil) h.backupStates(id, 0, numUpdates, nil)
} }
// Test reliable flush under multi-client scenario.
go h.client.Stop()
// Wait for all the updates to be populated in the // Wait for all the updates to be populated in the
// server's database. // server's database.
h.waitServerUpdates(hints, 10*time.Second) h.waitServerUpdates(hints, 10*time.Second)
@@ -1395,9 +1385,6 @@ var clientTests = []clientTest{
// identical one. // identical one.
h.startClient() h.startClient()
// Now, queue the retributions for backup.
h.backupStates(chanID, 0, numUpdates, nil)
// Wait for all the updates to be populated in the // Wait for all the updates to be populated in the
// server's database. // server's database.
h.waitServerUpdates(hints, waitTime) h.waitServerUpdates(hints, waitTime)
@@ -1449,9 +1436,6 @@ var clientTests = []clientTest{
h.clientCfg.Policy.SweepFeeRate *= 2 h.clientCfg.Policy.SweepFeeRate *= 2
h.startClient() h.startClient()
// Now, queue the retributions for backup.
h.backupStates(chanID, 0, numUpdates, nil)
// Wait for all the updates to be populated in the // Wait for all the updates to be populated in the
// server's database. // server's database.
h.waitServerUpdates(hints, waitTime) h.waitServerUpdates(hints, waitTime)
@@ -2090,9 +2074,9 @@ var clientTests = []clientTest{
}, },
{ {
// This test demonstrates that if there is no active session, // This test demonstrates that if there is no active session,
// the updates are queued in memory and then lost on restart. // the updates are persisted to disk on restart and reliably
// This behaviour will be fixed in upcoming commits. // sent.
name: "lose updates in task pipeline on restart", name: "in-mem updates not lost on restart",
cfg: harnessCfg{ cfg: harnessCfg{
localBalance: localBalance, localBalance: localBalance,
remoteBalance: remoteBalance, remoteBalance: remoteBalance,
@@ -2112,36 +2096,25 @@ var clientTests = []clientTest{
numUpdates = 5 numUpdates = 5
) )
// Advance the channel to create all states. // Try back up the first few states of the client's
// channel. Since the server has not yet started, the
// client should have no active session yet and so these
// updates will just be kept in an in-memory queue.
hints := h.advanceChannelN(chanID, numUpdates) hints := h.advanceChannelN(chanID, numUpdates)
firstBatch := hints[:numUpdates/2]
secondBatch := hints[numUpdates/2 : numUpdates]
// Attempt to back up the first batch of states of the
// client's channel. Since the server has not yet
// started, the client should have no active session
// yet and so these updates will just be kept in an
// in-memory queue.
h.backupStates(chanID, 0, numUpdates/2, nil) h.backupStates(chanID, 0, numUpdates/2, nil)
// Restart the Client (force quit). And also now start // Restart the Client (force quit). And also now start
// the server. The client should now be able to create // the server.
// a session with the server.
h.client.ForceQuit() h.client.ForceQuit()
h.startServer() h.startServer()
h.startClient() h.startClient()
// Attempt to now back up the second batch of states. // Back up a few more states.
h.backupStates(chanID, numUpdates/2, numUpdates, nil) h.backupStates(chanID, numUpdates/2, numUpdates, nil)
// Assert that the server does receive the updates. // Assert that the server does receive ALL the updates.
h.waitServerUpdates(secondBatch, waitTime) h.waitServerUpdates(hints[0:numUpdates], waitTime)
// Assert that the server definitely still has not
// received the initial set of updates.
matches, err := h.serverDB.QueryMatches(firstBatch)
require.NoError(h.t, err)
require.Empty(h.t, matches)
}, },
}, },
} }

View File

@@ -1,198 +0,0 @@
package wtclient
import (
"container/list"
"sync"
"time"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
)
// taskPipeline implements a reliable, in-order queue that ensures its queue
// fully drained before exiting. Stopping the taskPipeline prevents the pipeline
// from accepting any further tasks, and will cause the pipeline to exit after
// all updates have been delivered to the downstream receiver. If this process
// hangs and is unable to make progress, users can optionally call ForceQuit to
// abandon the reliable draining of the queue in order to permit shutdown.
type taskPipeline struct {
started sync.Once
stopped sync.Once
forced sync.Once
log btclog.Logger
queueMtx sync.Mutex
queueCond *sync.Cond
queue *list.List
newBackupTasks chan *wtdb.BackupID
quit chan struct{}
forceQuit chan struct{}
shutdown chan struct{}
}
// newTaskPipeline initializes a new taskPipeline.
func newTaskPipeline(log btclog.Logger) *taskPipeline {
rq := &taskPipeline{
log: log,
queue: list.New(),
newBackupTasks: make(chan *wtdb.BackupID),
quit: make(chan struct{}),
forceQuit: make(chan struct{}),
shutdown: make(chan struct{}),
}
rq.queueCond = sync.NewCond(&rq.queueMtx)
return rq
}
// Start spins up the taskPipeline, making it eligible to begin receiving backup
// tasks and deliver them to the receiver of NewBackupTasks.
func (q *taskPipeline) Start() {
q.started.Do(func() {
go q.queueManager()
})
}
// Stop begins a graceful shutdown of the taskPipeline. This method returns once
// all backupTasks have been delivered via NewBackupTasks, or a ForceQuit causes
// the delivery of pending tasks to be interrupted.
func (q *taskPipeline) Stop() {
q.stopped.Do(func() {
q.log.Debugf("Stopping task pipeline")
close(q.quit)
q.signalUntilShutdown()
// Skip log if we also force quit.
select {
case <-q.forceQuit:
default:
q.log.Debugf("Task pipeline stopped successfully")
}
})
}
// ForceQuit signals the taskPipeline to immediately exit, dropping any
// backupTasks that have not been delivered via NewBackupTasks.
func (q *taskPipeline) ForceQuit() {
q.forced.Do(func() {
q.log.Infof("Force quitting task pipeline")
close(q.forceQuit)
q.signalUntilShutdown()
q.log.Infof("Task pipeline unclean shutdown complete")
})
}
// NewBackupTasks returns a read-only channel for enqueue backupTasks. The
// channel will be closed after a call to Stop and all pending tasks have been
// delivered, or if a call to ForceQuit is called before the pending entries
// have been drained.
func (q *taskPipeline) NewBackupTasks() <-chan *wtdb.BackupID {
return q.newBackupTasks
}
// QueueBackupTask enqueues a backupTask for reliable delivery to the consumer
// of NewBackupTasks. If the taskPipeline is shutting down, ErrClientExiting is
// returned. Otherwise, if QueueBackupTask returns nil it is guaranteed to be
// delivered via NewBackupTasks unless ForceQuit is called before completion.
func (q *taskPipeline) QueueBackupTask(task *wtdb.BackupID) error {
q.queueCond.L.Lock()
select {
// Reject new tasks after quit has been signaled.
case <-q.quit:
q.queueCond.L.Unlock()
return ErrClientExiting
// Reject new tasks after force quit has been signaled.
case <-q.forceQuit:
q.queueCond.L.Unlock()
return ErrClientExiting
default:
}
// Queue the new task and signal the queue's condition variable to wake
// up the queueManager for processing.
q.queue.PushBack(task)
q.queueCond.L.Unlock()
q.queueCond.Signal()
return nil
}
// queueManager processes all incoming backup requests that get added via
// QueueBackupTask. The manager will exit
//
// NOTE: This method MUST be run as a goroutine.
func (q *taskPipeline) queueManager() {
defer close(q.shutdown)
defer close(q.newBackupTasks)
for {
q.queueCond.L.Lock()
for q.queue.Front() == nil {
q.queueCond.Wait()
select {
case <-q.quit:
// Exit only after the queue has been fully
// drained.
if q.queue.Len() == 0 {
q.queueCond.L.Unlock()
q.log.Debugf("Revoked state pipeline " +
"flushed.")
return
}
case <-q.forceQuit:
q.queueCond.L.Unlock()
q.log.Debugf("Revoked state pipeline force " +
"quit.")
return
default:
}
}
// Pop the first element from the queue.
e := q.queue.Front()
//nolint:forcetypeassert
task := q.queue.Remove(e).(*wtdb.BackupID)
q.queueCond.L.Unlock()
select {
// Backup task submitted to dispatcher. We don't select on quit
// to ensure that we still drain tasks while shutting down.
case q.newBackupTasks <- task:
// Force quit, return immediately to allow the client to exit.
case <-q.forceQuit:
q.log.Debugf("Revoked state pipeline force quit.")
return
}
}
}
// signalUntilShutdown strobes the queue's condition variable to ensure the
// queueManager reliably unblocks to check for the exit condition.
func (q *taskPipeline) signalUntilShutdown() {
for {
select {
case <-time.After(time.Millisecond):
q.queueCond.Signal()
case <-q.shutdown:
return
}
}
}