From 9157c88f9328f5a2a5b76aed6b5156a84b9ff5d1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 23 May 2019 20:49:18 -0700 Subject: [PATCH] watchtower/wtclient: dedup backups across restarts Now that the committed and acked updates are persisted across restarts, we will use them to filter out duplicate commit heights presented by the client. --- watchtower/wtclient/client.go | 70 +++++++++++++++++++++++++++--- watchtower/wtclient/client_test.go | 49 +++++++++++++++++++++ 2 files changed, 113 insertions(+), 6 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 6a0375708..8f0cbc9f6 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -150,8 +150,9 @@ type TowerClient struct { sessionQueue *sessionQueue prevTask *backupTask - summaryMu sync.RWMutex - summaries wtdb.ChannelSummaries + backupMu sync.Mutex + summaries wtdb.ChannelSummaries + chanCommitHeights map[lnwire.ChannelID]uint64 statTicker *time.Ticker stats clientStats @@ -243,6 +244,10 @@ func New(config *Config) (*TowerClient, error) { s.SessionPrivKey = sessionPriv } + // Reconstruct the highest commit height processed for each channel + // under the client's current policy. + c.buildHighestCommitHeights() + // Finally, load the sweep pkscripts that have been generated for all // previously registered channels. c.summaries, err = c.cfg.DB.FetchChanSummaries() @@ -253,6 +258,44 @@ func New(config *Config) (*TowerClient, error) { return c, nil } +// buildHighestCommitHeights inspects the full set of candidate client sessions +// loaded from disk, and determines the highest known commit height for each +// channel. This allows the client to reject backups that it has already +// processed for it's active policy. +func (c *TowerClient) buildHighestCommitHeights() { + chanCommitHeights := make(map[lnwire.ChannelID]uint64) + for _, s := range c.candidateSessions { + // We only want to consider accepted updates that have been + // accepted under an identical policy to the client's current + // policy. + if s.Policy != c.cfg.Policy { + continue + } + + // Take the highest commit height found in the session's + // committed updates. + for _, committedUpdate := range s.CommittedUpdates { + bid := committedUpdate.BackupID + + height, ok := chanCommitHeights[bid.ChanID] + if !ok || bid.CommitHeight > height { + chanCommitHeights[bid.ChanID] = bid.CommitHeight + } + } + + // Take the heights commit height found in the session's acked + // updates. + for _, bid := range s.AckedUpdates { + height, ok := chanCommitHeights[bid.ChanID] + if !ok || bid.CommitHeight > height { + chanCommitHeights[bid.ChanID] = bid.CommitHeight + } + } + } + + c.chanCommitHeights = chanCommitHeights +} + // Start initializes the watchtower client by loading or negotiating an active // session and then begins processing backup tasks from the request pipeline. func (c *TowerClient) Start() error { @@ -388,8 +431,8 @@ func (c *TowerClient) ForceQuit() { // within the client. This should be called during link startup to ensure that // the client is able to support the link during operation. func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error { - c.summaryMu.Lock() - defer c.summaryMu.Unlock() + c.backupMu.Lock() + defer c.backupMu.Unlock() // If a pkscript for this channel already exists, the channel has been // previously registered. @@ -431,13 +474,28 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID, breachInfo *lnwallet.BreachRetribution) error { // Retrieve the cached sweep pkscript used for this channel. - c.summaryMu.RLock() + c.backupMu.Lock() summary, ok := c.summaries[*chanID] - c.summaryMu.RUnlock() if !ok { + c.backupMu.Unlock() return ErrUnregisteredChannel } + // Ignore backups that have already been presented to the client. + height, ok := c.chanCommitHeights[*chanID] + if ok && breachInfo.RevokedStateNum <= height { + c.backupMu.Unlock() + log.Debugf("Ignoring duplicate backup for chanid=%v at height=%d", + chanID, breachInfo.RevokedStateNum) + return nil + } + + // This backup has a higher commit height than any known backup for this + // channel. We'll update our tip so that we won't accept it again if the + // link flaps. + c.chanCommitHeights[*chanID] = breachInfo.RevokedStateNum + c.backupMu.Unlock() + task := newBackupTask(chanID, breachInfo, summary.SweepPkScript) return c.pipeline.QueueBackupTask(task) diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index ac4ebf2db..b5a9bbbd3 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -1246,6 +1246,55 @@ var clientTests = []clientTest{ h.assertUpdatesForPolicy(hints, h.clientCfg.Policy) }, }, + { + // Asserts that the client will deduplicate backups presented by + // a channel both in memory and after a restart. The client + // should only accept backups with a commit height greater than + // any processed already processed for a given policy. + name: "dedup backups", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + BlobType: blob.TypeDefault, + MaxUpdates: 5, + SweepFeeRate: 1, + }, + }, + fn: func(h *testHarness) { + const ( + numUpdates = 10 + chanID = 0 + ) + + // Generate the retributions that will be backed up. + hints := h.advanceChannelN(chanID, numUpdates) + + // Queue the first half of the retributions twice, the + // second batch should be entirely deduped by the + // client's in-memory tracking. + h.backupStates(chanID, 0, numUpdates/2, nil) + h.backupStates(chanID, 0, numUpdates/2, nil) + + // Wait for the first half of the updates to be + // populated in the server's database. + h.waitServerUpdates(hints[:len(hints)/2], 5*time.Second) + + // Restart the client, so we can ensure the deduping is + // maintained across restarts. + h.client.Stop() + h.startClient() + defer h.client.ForceQuit() + + // Try to back up the full range of retributions. Only + // the second half should actually be sent. + h.backupStates(chanID, 0, numUpdates, nil) + + // Wait for all of the updates to be populated in the + // server's database. + h.waitServerUpdates(hints, 5*time.Second) + }, + }, } // TestClient executes the client test suite, asserting the ability to backup