multi: move AddTower to Tower Client Manager

In this commit we move the AddTower method from the Client interface to
the TowerClientManager interface. The wtclientrpc is updated to call the
`AddTower` method of the Manager instead of calling the `AddTower`
method of each individual client. The TowerClient now is also only
concerned with adding a new tower (or new tower address) to its
in-memory state; the tower Manager will handle adding the tower to the
DB.
This commit is contained in:
Elle Mouton 2023-05-16 13:28:11 +02:00
parent 2abc422aac
commit a44bf381c4
No known key found for this signature in database
GPG Key ID: D7D916376026F177
7 changed files with 75 additions and 39 deletions

View File

@ -23,6 +23,10 @@ type Config struct {
// we'll interact through the watchtower RPC subserver.
AnchorClient wtclient.Client
// ClientMgr is a tower client manager that manages a set of tower
// clients.
ClientMgr wtclient.TowerClientManager
// Resolver is a custom resolver that will be used to resolve watchtower
// addresses to ensure we don't leak any information when running over
// non-clear networks, e.g. Tor, etc.

View File

@ -208,11 +208,7 @@ func (c *WatchtowerClient) AddTower(ctx context.Context,
Address: addr,
}
// TODO(conner): make atomic via multiplexed client
if err := c.cfg.Client.AddTower(towerAddr); err != nil {
return nil, err
}
if err := c.cfg.AnchorClient.AddTower(towerAddr); err != nil {
if err := c.cfg.ClientMgr.AddTower(towerAddr); err != nil {
return nil, err
}

View File

@ -744,7 +744,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter,
routerBackend, s.nodeSigner, s.graphDB, s.chanStateDB,
s.sweeper, tower, s.towerClient, s.anchorTowerClient,
r.cfg.net.ResolveTCPAddr, genInvoiceFeatures,
s.towerClientMgr, r.cfg.net.ResolveTCPAddr, genInvoiceFeatures,
genAmpInvoiceFeatures, s.getNodeAnnouncement,
s.updateAndBrodcastSelfNode, parseAddr, rpcsLog,
s.aliasMgr.GetPeerAlias,

View File

@ -115,6 +115,7 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config,
tower *watchtower.Standalone,
towerClient wtclient.Client,
anchorTowerClient wtclient.Client,
towerClientMgr wtclient.TowerClientManager,
tcpResolver lncfg.TCPResolver,
genInvoiceFeatures func() *lnwire.FeatureVector,
genAmpInvoiceFeatures func() *lnwire.FeatureVector,
@ -297,6 +298,9 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config,
subCfgValue.FieldByName("AnchorClient").Set(
reflect.ValueOf(anchorTowerClient),
)
subCfgValue.FieldByName("ClientMgr").Set(
reflect.ValueOf(towerClientMgr),
)
}
subCfgValue.FieldByName("Resolver").Set(
reflect.ValueOf(tcpResolver),

View File

@ -95,12 +95,6 @@ type RegisteredTower struct {
// Client is the primary interface used by the daemon to control a client's
// lifecycle and backup revoked states.
type Client interface {
// AddTower adds a new watchtower reachable at the given address and
// considers it for new sessions. If the watchtower already exists, then
// any new addresses included will be considered when dialing it for
// session negotiations and backups.
AddTower(*lnwire.NetAddress) error
// RemoveTower removes a watchtower from being considered for future
// session negotiations and from being used for any subsequent backups
// until it's added again. If an address is provided, then this call
@ -145,9 +139,9 @@ type BreachRetributionBuilder func(id lnwire.ChannelID,
// newTowerMsg is an internal message we'll use within the TowerClient to signal
// that a new tower can be considered.
type newTowerMsg struct {
// addr is the tower's reachable address that we'll use to establish a
// connection with.
addr *lnwire.NetAddress
// tower holds the info about the new Tower or new tower address
// required to connect to it.
tower *Tower
// errChan is the channel through which we'll send a response back to
// the caller when handling their request.
@ -1071,7 +1065,7 @@ func (c *TowerClient) backupDispatcher() {
// its corresponding sessions, if any, as new
// candidates.
case msg := <-c.newTowers:
msg.errChan <- c.handleNewTower(msg)
msg.errChan <- c.handleNewTower(msg.tower)
// A tower has been requested to be removed. We'll
// only allow removal of it if the address in question
@ -1155,7 +1149,7 @@ func (c *TowerClient) backupDispatcher() {
// its corresponding sessions, if any, as new
// candidates.
case msg := <-c.newTowers:
msg.errChan <- c.handleNewTower(msg)
msg.errChan <- c.handleNewTower(msg.tower)
// A tower has been removed, so we'll remove certain
// information that's persisted and also in our
@ -1451,16 +1445,16 @@ func (c *TowerClient) isChannelClosed(id lnwire.ChannelID) (bool, uint32,
return true, chanSum.CloseHeight, nil
}
// AddTower adds a new watchtower reachable at the given address and considers
// addTower adds a new watchtower reachable at the given address and considers
// it for new sessions. If the watchtower already exists, then any new addresses
// included will be considered when dialing it for session negotiations and
// backups.
func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error {
func (c *TowerClient) addTower(tower *Tower) error {
errChan := make(chan error, 1)
select {
case c.newTowers <- &newTowerMsg{
addr: addr,
tower: tower,
errChan: errChan,
}:
case <-c.pipeline.quit:
@ -1478,20 +1472,7 @@ func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error {
// handleNewTower handles a request for a new tower to be added. If the tower
// already exists, then its corresponding sessions, if any, will be set
// considered as candidates.
func (c *TowerClient) handleNewTower(msg *newTowerMsg) error {
// We'll start by updating our persisted state, followed by our
// in-memory state, with the new tower. This might not actually be a new
// tower, but it might include a new address at which it can be reached.
dbTower, err := c.cfg.DB.CreateTower(msg.addr)
if err != nil {
return err
}
tower, err := NewTowerFromDBTower(dbTower)
if err != nil {
return err
}
func (c *TowerClient) handleNewTower(tower *Tower) error {
c.candidateTowers.AddCandidate(tower)
// Include all of its corresponding sessions to our set of candidates.

View File

@ -567,7 +567,7 @@ func (h *testHarness) startClient() {
h.client, err = h.clientMgr.NewClient(h.clientPolicy)
require.NoError(h.t, err)
require.NoError(h.t, h.clientMgr.Start())
require.NoError(h.t, h.client.AddTower(towerAddr))
require.NoError(h.t, h.clientMgr.AddTower(towerAddr))
}
// chanIDFromInt creates a unique channel id given a unique integral id.
@ -757,7 +757,7 @@ func (h *testHarness) recvPayments(id, from, to uint64,
func (h *testHarness) addTower(addr *lnwire.NetAddress) {
h.t.Helper()
err := h.client.AddTower(addr)
err := h.clientMgr.AddTower(addr)
require.NoError(h.t, err)
}
@ -1714,7 +1714,7 @@ var clientTests = []clientTest{
h.server.addr = towerAddr
// Add the new tower address to the client.
err = h.client.AddTower(towerAddr)
err = h.clientMgr.AddTower(towerAddr)
require.NoError(h.t, err)
// Remove the old tower address from the client.
@ -1795,11 +1795,11 @@ var clientTests = []clientTest{
require.NoError(h.t, h.server.server.Start())
// Re-add the server to the client
err = h.client.AddTower(h.server.addr)
err = h.clientMgr.AddTower(h.server.addr)
require.NoError(h.t, err)
// Also add the new tower address.
err = h.client.AddTower(towerAddr)
err = h.clientMgr.AddTower(towerAddr)
require.NoError(h.t, err)
// Assert that if the client attempts to remove the

View File

@ -16,6 +16,16 @@ import (
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
)
// TowerClientManager is the primary interface used by the daemon to control a
// client's lifecycle and backup revoked states.
type TowerClientManager interface {
// AddTower adds a new watchtower reachable at the given address and
// considers it for new sessions. If the watchtower already exists, then
// any new addresses included will be considered when dialing it for
// session negotiations and backups.
AddTower(*lnwire.NetAddress) error
}
// Config provides the TowerClient with access to the resources it requires to
// perform its duty. All nillable fields must be non-nil for the tower to be
// initialized properly.
@ -109,6 +119,8 @@ type Manager struct {
clientsMu sync.Mutex
}
var _ TowerClientManager = (*Manager)(nil)
// NewManager constructs a new Manager.
func NewManager(config *Config) (*Manager, error) {
// Copy the config to prevent side effects from modifying both the
@ -192,3 +204,42 @@ func (m *Manager) Stop() error {
return returnErr
}
// AddTower adds a new watchtower reachable at the given address and considers
// it for new sessions. If the watchtower already exists, then any new addresses
// included will be considered when dialing it for session negotiations and
// backups.
func (m *Manager) AddTower(address *lnwire.NetAddress) error {
// We'll start by updating our persisted state, followed by the
// in-memory state of each client, with the new tower. This might not
// actually be a new tower, but it might include a new address at which
// it can be reached.
dbTower, err := m.cfg.DB.CreateTower(address)
if err != nil {
return err
}
tower, err := NewTowerFromDBTower(dbTower)
if err != nil {
return err
}
m.clientsMu.Lock()
defer m.clientsMu.Unlock()
for blobType, client := range m.clients {
clientType, err := blobType.Identifier()
if err != nil {
return err
}
if err := client.addTower(tower); err != nil {
return fmt.Errorf("could not add tower(%x) to the %s "+
"tower client: %w",
tower.IdentityKey.SerializeCompressed(),
clientType, err)
}
}
return nil
}