diff --git a/lnrpc/wtclientrpc/config.go b/lnrpc/wtclientrpc/config.go index 9127c0846..ed0401b13 100644 --- a/lnrpc/wtclientrpc/config.go +++ b/lnrpc/wtclientrpc/config.go @@ -23,6 +23,10 @@ type Config struct { // we'll interact through the watchtower RPC subserver. AnchorClient wtclient.Client + // ClientMgr is a tower client manager that manages a set of tower + // clients. + ClientMgr wtclient.TowerClientManager + // Resolver is a custom resolver that will be used to resolve watchtower // addresses to ensure we don't leak any information when running over // non-clear networks, e.g. Tor, etc. diff --git a/lnrpc/wtclientrpc/wtclient.go b/lnrpc/wtclientrpc/wtclient.go index aa7e1ae21..ad99f0541 100644 --- a/lnrpc/wtclientrpc/wtclient.go +++ b/lnrpc/wtclientrpc/wtclient.go @@ -208,11 +208,7 @@ func (c *WatchtowerClient) AddTower(ctx context.Context, Address: addr, } - // TODO(conner): make atomic via multiplexed client - if err := c.cfg.Client.AddTower(towerAddr); err != nil { - return nil, err - } - if err := c.cfg.AnchorClient.AddTower(towerAddr); err != nil { + if err := c.cfg.ClientMgr.AddTower(towerAddr); err != nil { return nil, err } diff --git a/rpcserver.go b/rpcserver.go index fa23f0485..4685b731b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -744,7 +744,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter, routerBackend, s.nodeSigner, s.graphDB, s.chanStateDB, s.sweeper, tower, s.towerClient, s.anchorTowerClient, - r.cfg.net.ResolveTCPAddr, genInvoiceFeatures, + s.towerClientMgr, r.cfg.net.ResolveTCPAddr, genInvoiceFeatures, genAmpInvoiceFeatures, s.getNodeAnnouncement, s.updateAndBrodcastSelfNode, parseAddr, rpcsLog, s.aliasMgr.GetPeerAlias, diff --git a/subrpcserver_config.go b/subrpcserver_config.go index 7706dfd27..27bef9f92 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -115,6 +115,7 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, tower *watchtower.Standalone, towerClient wtclient.Client, anchorTowerClient wtclient.Client, + towerClientMgr wtclient.TowerClientManager, tcpResolver lncfg.TCPResolver, genInvoiceFeatures func() *lnwire.FeatureVector, genAmpInvoiceFeatures func() *lnwire.FeatureVector, @@ -297,6 +298,9 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, subCfgValue.FieldByName("AnchorClient").Set( reflect.ValueOf(anchorTowerClient), ) + subCfgValue.FieldByName("ClientMgr").Set( + reflect.ValueOf(towerClientMgr), + ) } subCfgValue.FieldByName("Resolver").Set( reflect.ValueOf(tcpResolver), diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 3441891ee..e1749374a 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -95,12 +95,6 @@ type RegisteredTower struct { // Client is the primary interface used by the daemon to control a client's // lifecycle and backup revoked states. type Client interface { - // AddTower adds a new watchtower reachable at the given address and - // considers it for new sessions. If the watchtower already exists, then - // any new addresses included will be considered when dialing it for - // session negotiations and backups. - AddTower(*lnwire.NetAddress) error - // RemoveTower removes a watchtower from being considered for future // session negotiations and from being used for any subsequent backups // until it's added again. If an address is provided, then this call @@ -145,9 +139,9 @@ type BreachRetributionBuilder func(id lnwire.ChannelID, // newTowerMsg is an internal message we'll use within the TowerClient to signal // that a new tower can be considered. type newTowerMsg struct { - // addr is the tower's reachable address that we'll use to establish a - // connection with. - addr *lnwire.NetAddress + // tower holds the info about the new Tower or new tower address + // required to connect to it. + tower *Tower // errChan is the channel through which we'll send a response back to // the caller when handling their request. @@ -1071,7 +1065,7 @@ func (c *TowerClient) backupDispatcher() { // its corresponding sessions, if any, as new // candidates. case msg := <-c.newTowers: - msg.errChan <- c.handleNewTower(msg) + msg.errChan <- c.handleNewTower(msg.tower) // A tower has been requested to be removed. We'll // only allow removal of it if the address in question @@ -1155,7 +1149,7 @@ func (c *TowerClient) backupDispatcher() { // its corresponding sessions, if any, as new // candidates. case msg := <-c.newTowers: - msg.errChan <- c.handleNewTower(msg) + msg.errChan <- c.handleNewTower(msg.tower) // A tower has been removed, so we'll remove certain // information that's persisted and also in our @@ -1451,16 +1445,16 @@ func (c *TowerClient) isChannelClosed(id lnwire.ChannelID) (bool, uint32, return true, chanSum.CloseHeight, nil } -// AddTower adds a new watchtower reachable at the given address and considers +// addTower adds a new watchtower reachable at the given address and considers // it for new sessions. If the watchtower already exists, then any new addresses // included will be considered when dialing it for session negotiations and // backups. -func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error { +func (c *TowerClient) addTower(tower *Tower) error { errChan := make(chan error, 1) select { case c.newTowers <- &newTowerMsg{ - addr: addr, + tower: tower, errChan: errChan, }: case <-c.pipeline.quit: @@ -1478,20 +1472,7 @@ func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error { // handleNewTower handles a request for a new tower to be added. If the tower // already exists, then its corresponding sessions, if any, will be set // considered as candidates. -func (c *TowerClient) handleNewTower(msg *newTowerMsg) error { - // We'll start by updating our persisted state, followed by our - // in-memory state, with the new tower. This might not actually be a new - // tower, but it might include a new address at which it can be reached. - dbTower, err := c.cfg.DB.CreateTower(msg.addr) - if err != nil { - return err - } - - tower, err := NewTowerFromDBTower(dbTower) - if err != nil { - return err - } - +func (c *TowerClient) handleNewTower(tower *Tower) error { c.candidateTowers.AddCandidate(tower) // Include all of its corresponding sessions to our set of candidates. diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index ee383b495..6be1bf496 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -567,7 +567,7 @@ func (h *testHarness) startClient() { h.client, err = h.clientMgr.NewClient(h.clientPolicy) require.NoError(h.t, err) require.NoError(h.t, h.clientMgr.Start()) - require.NoError(h.t, h.client.AddTower(towerAddr)) + require.NoError(h.t, h.clientMgr.AddTower(towerAddr)) } // chanIDFromInt creates a unique channel id given a unique integral id. @@ -757,7 +757,7 @@ func (h *testHarness) recvPayments(id, from, to uint64, func (h *testHarness) addTower(addr *lnwire.NetAddress) { h.t.Helper() - err := h.client.AddTower(addr) + err := h.clientMgr.AddTower(addr) require.NoError(h.t, err) } @@ -1714,7 +1714,7 @@ var clientTests = []clientTest{ h.server.addr = towerAddr // Add the new tower address to the client. - err = h.client.AddTower(towerAddr) + err = h.clientMgr.AddTower(towerAddr) require.NoError(h.t, err) // Remove the old tower address from the client. @@ -1795,11 +1795,11 @@ var clientTests = []clientTest{ require.NoError(h.t, h.server.server.Start()) // Re-add the server to the client - err = h.client.AddTower(h.server.addr) + err = h.clientMgr.AddTower(h.server.addr) require.NoError(h.t, err) // Also add the new tower address. - err = h.client.AddTower(towerAddr) + err = h.clientMgr.AddTower(towerAddr) require.NoError(h.t, err) // Assert that if the client attempts to remove the diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index b26ef3a6f..d583d6c52 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -16,6 +16,16 @@ import ( "github.com/lightningnetwork/lnd/watchtower/wtpolicy" ) +// TowerClientManager is the primary interface used by the daemon to control a +// client's lifecycle and backup revoked states. +type TowerClientManager interface { + // AddTower adds a new watchtower reachable at the given address and + // considers it for new sessions. If the watchtower already exists, then + // any new addresses included will be considered when dialing it for + // session negotiations and backups. + AddTower(*lnwire.NetAddress) error +} + // Config provides the TowerClient with access to the resources it requires to // perform its duty. All nillable fields must be non-nil for the tower to be // initialized properly. @@ -109,6 +119,8 @@ type Manager struct { clientsMu sync.Mutex } +var _ TowerClientManager = (*Manager)(nil) + // NewManager constructs a new Manager. func NewManager(config *Config) (*Manager, error) { // Copy the config to prevent side effects from modifying both the @@ -192,3 +204,42 @@ func (m *Manager) Stop() error { return returnErr } + +// AddTower adds a new watchtower reachable at the given address and considers +// it for new sessions. If the watchtower already exists, then any new addresses +// included will be considered when dialing it for session negotiations and +// backups. +func (m *Manager) AddTower(address *lnwire.NetAddress) error { + // We'll start by updating our persisted state, followed by the + // in-memory state of each client, with the new tower. This might not + // actually be a new tower, but it might include a new address at which + // it can be reached. + dbTower, err := m.cfg.DB.CreateTower(address) + if err != nil { + return err + } + + tower, err := NewTowerFromDBTower(dbTower) + if err != nil { + return err + } + + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + for blobType, client := range m.clients { + clientType, err := blobType.Identifier() + if err != nil { + return err + } + + if err := client.addTower(tower); err != nil { + return fmt.Errorf("could not add tower(%x) to the %s "+ + "tower client: %w", + tower.IdentityKey.SerializeCompressed(), + clientType, err) + } + } + + return nil +}