From b35a5b8892c6c4fd6824b70e252f98fe1caf104e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 23 May 2019 20:48:50 -0700 Subject: [PATCH] watchtower/wtclient: integrate ClientChannelSummaries In this commit, we utilize the more generic ClientChanSummary instead of exposing methods that only allow us to set and fetch sweep pkscripts. --- watchtower/wtclient/client.go | 26 +++++++++-------- watchtower/wtclient/client_test.go | 2 ++ watchtower/wtclient/interface.go | 17 ++++++----- watchtower/wtdb/client_chan_summary.go | 7 +++++ watchtower/wtmock/client_db.go | 39 ++++++++++++++++---------- 5 files changed, 57 insertions(+), 34 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 1e614cc29..6a0375708 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -150,8 +150,8 @@ type TowerClient struct { sessionQueue *sessionQueue prevTask *backupTask - sweepPkScriptMu sync.RWMutex - sweepPkScripts map[lnwire.ChannelID][]byte + summaryMu sync.RWMutex + summaries wtdb.ChannelSummaries statTicker *time.Ticker stats clientStats @@ -245,7 +245,7 @@ func New(config *Config) (*TowerClient, error) { // Finally, load the sweep pkscripts that have been generated for all // previously registered channels. - c.sweepPkScripts, err = c.cfg.DB.FetchChanPkScripts() + c.summaries, err = c.cfg.DB.FetchChanSummaries() if err != nil { return nil, err } @@ -388,12 +388,12 @@ func (c *TowerClient) ForceQuit() { // within the client. This should be called during link startup to ensure that // the client is able to support the link during operation. func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error { - c.sweepPkScriptMu.Lock() - defer c.sweepPkScriptMu.Unlock() + c.summaryMu.Lock() + defer c.summaryMu.Unlock() // If a pkscript for this channel already exists, the channel has been // previously registered. - if _, ok := c.sweepPkScripts[chanID]; ok { + if _, ok := c.summaries[chanID]; ok { return nil } @@ -406,14 +406,16 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error { // Persist the sweep pkscript so that restarts will not introduce // address inflation when the channel is reregistered after a restart. - err = c.cfg.DB.AddChanPkScript(chanID, pkScript) + err = c.cfg.DB.RegisterChannel(chanID, pkScript) if err != nil { return err } // Finally, cache the pkscript in our in-memory cache to avoid db // lookups for the remainder of the daemon's execution. - c.sweepPkScripts[chanID] = pkScript + c.summaries[chanID] = wtdb.ClientChanSummary{ + SweepPkScript: pkScript, + } return nil } @@ -429,14 +431,14 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID, breachInfo *lnwallet.BreachRetribution) error { // Retrieve the cached sweep pkscript used for this channel. - c.sweepPkScriptMu.RLock() - sweepPkScript, ok := c.sweepPkScripts[*chanID] - c.sweepPkScriptMu.RUnlock() + c.summaryMu.RLock() + summary, ok := c.summaries[*chanID] + c.summaryMu.RUnlock() if !ok { return ErrUnregisteredChannel } - task := newBackupTask(chanID, breachInfo, sweepPkScript) + task := newBackupTask(chanID, breachInfo, summary.SweepPkScript) return c.pipeline.QueueBackupTask(task) } diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 86811bf02..ac4ebf2db 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -605,6 +605,8 @@ func (h *testHarness) backupStates(id, from, to uint64, expErr error) { // backupStates instructs the channel identified by id to send a backup for // state i. func (h *testHarness) backupState(id, i uint64, expErr error) { + h.t.Helper() + _, retribution := h.channel(id).getState(i) chanID := chanIDFromInt(id) diff --git a/watchtower/wtclient/interface.go b/watchtower/wtclient/interface.go index 4de81acb9..e8a8b865f 100644 --- a/watchtower/wtclient/interface.go +++ b/watchtower/wtclient/interface.go @@ -41,14 +41,17 @@ type DB interface { // still be able to accept state updates. ListClientSessions() (map[wtdb.SessionID]*wtdb.ClientSession, error) - // FetchChanPkScripts returns a map of all sweep pkscripts for - // registered channels. This is used on startup to cache the sweep - // pkscripts of registered channels in memory. - FetchChanPkScripts() (map[lnwire.ChannelID][]byte, error) + // FetchChanSummaries loads a mapping from all registered channels to + // their channel summaries. + FetchChanSummaries() (wtdb.ChannelSummaries, error) - // AddChanPkScript inserts a newly generated sweep pkscript for the - // given channel. - AddChanPkScript(lnwire.ChannelID, []byte) error + // RegisterChannel registers a channel for use within the client + // database. For now, all that is stored in the channel summary is the + // sweep pkscript that we'd like any tower sweeps to pay into. In the + // future, this will be extended to contain more info to allow the + // client efficiently request historical states to be backed up under + // the client's active policy. + RegisterChannel(lnwire.ChannelID, []byte) error // MarkBackupIneligible records that the state identified by the // (channel id, commit height) tuple was ineligible for being backed up diff --git a/watchtower/wtdb/client_chan_summary.go b/watchtower/wtdb/client_chan_summary.go index d4b3c3c38..0925150a2 100644 --- a/watchtower/wtdb/client_chan_summary.go +++ b/watchtower/wtdb/client_chan_summary.go @@ -1,11 +1,18 @@ package wtdb import ( + "errors" "io" "github.com/lightningnetwork/lnd/lnwire" ) +var ( + // ErrChannelAlreadyRegistered signals a duplicate attempt to + // register a channel with the client database. + ErrChannelAlreadyRegistered = errors.New("channel already registered") +) + // ChannelSummaries is a map for a given channel id to it's ClientChanSummary. type ChannelSummaries map[lnwire.ChannelID]ClientChanSummary diff --git a/watchtower/wtmock/client_db.go b/watchtower/wtmock/client_db.go index b903e78a9..88cde50fa 100644 --- a/watchtower/wtmock/client_db.go +++ b/watchtower/wtmock/client_db.go @@ -1,7 +1,6 @@ package wtmock import ( - "fmt" "net" "sync" "sync/atomic" @@ -18,7 +17,7 @@ type ClientDB struct { nextTowerID uint64 // to be used atomically mu sync.Mutex - sweepPkScripts map[lnwire.ChannelID][]byte + summaries map[lnwire.ChannelID]wtdb.ClientChanSummary activeSessions map[wtdb.SessionID]*wtdb.ClientSession towerIndex map[towerPK]wtdb.TowerID towers map[wtdb.TowerID]*wtdb.Tower @@ -30,7 +29,7 @@ type ClientDB struct { // NewClientDB initializes a new mock ClientDB. func NewClientDB() *ClientDB { return &ClientDB{ - sweepPkScripts: make(map[lnwire.ChannelID][]byte), + summaries: make(map[lnwire.ChannelID]wtdb.ClientChanSummary), activeSessions: make(map[wtdb.SessionID]*wtdb.ClientSession), towerIndex: make(map[towerPK]wtdb.TowerID), towers: make(map[wtdb.TowerID]*wtdb.Tower), @@ -252,30 +251,40 @@ func (m *ClientDB) AckUpdate(id *wtdb.SessionID, seqNum, lastApplied uint16) err return wtdb.ErrCommittedUpdateNotFound } -// FetchChanPkScripts returns the set of sweep pkscripts known for all channels. -// This allows the client to cache them in memory on startup. -func (m *ClientDB) FetchChanPkScripts() (map[lnwire.ChannelID][]byte, error) { +// FetchChanSummaries loads a mapping from all registered channels to their +// channel summaries. +func (m *ClientDB) FetchChanSummaries() (wtdb.ChannelSummaries, error) { m.mu.Lock() defer m.mu.Unlock() - sweepPkScripts := make(map[lnwire.ChannelID][]byte) - for chanID, pkScript := range m.sweepPkScripts { - sweepPkScripts[chanID] = cloneBytes(pkScript) + summaries := make(map[lnwire.ChannelID]wtdb.ClientChanSummary) + for chanID, summary := range m.summaries { + summaries[chanID] = wtdb.ClientChanSummary{ + SweepPkScript: cloneBytes(summary.SweepPkScript), + } } - return sweepPkScripts, nil + return summaries, nil } -// AddChanPkScript sets a pkscript or sweeping funds from the channel or chanID. -func (m *ClientDB) AddChanPkScript(chanID lnwire.ChannelID, pkScript []byte) error { +// RegisterChannel registers a channel for use within the client database. For +// now, all that is stored in the channel summary is the sweep pkscript that +// we'd like any tower sweeps to pay into. In the future, this will be extended +// to contain more info to allow the client efficiently request historical +// states to be backed up under the client's active policy. +func (m *ClientDB) RegisterChannel(chanID lnwire.ChannelID, + sweepPkScript []byte) error { + m.mu.Lock() defer m.mu.Unlock() - if _, ok := m.sweepPkScripts[chanID]; ok { - return fmt.Errorf("pkscript for %x already exists", pkScript) + if _, ok := m.summaries[chanID]; ok { + return wtdb.ErrChannelAlreadyRegistered } - m.sweepPkScripts[chanID] = cloneBytes(pkScript) + m.summaries[chanID] = wtdb.ClientChanSummary{ + SweepPkScript: cloneBytes(sweepPkScript), + } return nil }