From e800aacff430b66ea9e73ae65a8e4056ea6acd0f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 14:53:54 +0200 Subject: [PATCH] wtclient+server: unexport and rename TowerClient Rename and unexport the `TowerClient` struct to `client` and rename the `TowerClientManager` interface to `ClientManager`. --- lnrpc/wtclientrpc/config.go | 2 +- peer/brontide.go | 4 +- server.go | 27 +++------- subrpcserver_config.go | 2 +- watchtower/wtclient/client.go | 79 ++++++++++++++-------------- watchtower/wtclient/client_test.go | 5 +- watchtower/wtclient/manager.go | 44 ++++++++++------ watchtower/wtclient/session_queue.go | 2 +- 8 files changed, 80 insertions(+), 85 deletions(-) diff --git a/lnrpc/wtclientrpc/config.go b/lnrpc/wtclientrpc/config.go index 58566c19a..b95fb1197 100644 --- a/lnrpc/wtclientrpc/config.go +++ b/lnrpc/wtclientrpc/config.go @@ -17,7 +17,7 @@ type Config struct { // ClientMgr is a tower client manager that manages a set of tower // clients. - ClientMgr wtclient.TowerClientManager + ClientMgr wtclient.ClientManager // Resolver is a custom resolver that will be used to resolve watchtower // addresses to ensure we don't leak any information when running over diff --git a/peer/brontide.go b/peer/brontide.go index 1751ef5ac..e593bf393 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -269,7 +269,7 @@ type Config struct { HtlcNotifier *htlcswitch.HtlcNotifier // TowerClient is used to backup revoked states. - TowerClient wtclient.TowerClientManager + TowerClient wtclient.ClientManager // DisconnectPeer is used to disconnect this peer if the cooperative close // process fails. @@ -1036,7 +1036,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update) } - var towerClient wtclient.TowerClientManager + var towerClient wtclient.ClientManager if lnChan.ChanType().IsTaproot() { // Leave the tower client as nil for now until the tower client // has support for taproot channels. diff --git a/server.go b/server.go index 076802606..fd53ee5e6 100644 --- a/server.go +++ b/server.go @@ -1546,6 +1546,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID + // Copy the policy for legacy channels and set the blob flag + // signalling support for anchor channels. + anchorPolicy := policy + anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel) + s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{ FetchClosedChannel: fetchClosedChannel, BuildBreachRetribution: buildBreachRetribution, @@ -1567,25 +1572,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, MinBackoff: 10 * time.Second, MaxBackoff: 5 * time.Minute, MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue, - }) - if err != nil { - return nil, err - } - - // Register a legacy tower client. - _, err = s.towerClientMgr.NewClient(policy) - if err != nil { - return nil, err - } - - // Copy the policy for legacy channels and set the blob flag - // signalling support for anchor channels. - anchorPolicy := policy - anchorPolicy.TxPolicy.BlobType |= - blob.Type(blob.FlagAnchorChannel) - - // Register an anchors tower client. - _, err = s.towerClientMgr.NewClient(anchorPolicy) + }, policy, anchorPolicy) if err != nil { return nil, err } @@ -3782,7 +3769,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // though the underlying value is nil. To avoid this gotcha which can // cause a panic, we need to explicitly pass nil to the peer.Config's // TowerClient if needed. - var towerClient wtclient.TowerClientManager + var towerClient wtclient.ClientManager if s.towerClientMgr != nil { towerClient = s.towerClientMgr } diff --git a/subrpcserver_config.go b/subrpcserver_config.go index 175515de3..360d7fa31 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -113,7 +113,7 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, chanStateDB *channeldb.ChannelStateDB, sweeper *sweep.UtxoSweeper, tower *watchtower.Standalone, - towerClientMgr wtclient.TowerClientManager, + towerClientMgr wtclient.ClientManager, tcpResolver lncfg.TCPResolver, genInvoiceFeatures func() *lnwire.FeatureVector, genAmpInvoiceFeatures func() *lnwire.FeatureVector, diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index bacc45ab1..24d974ced 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -49,7 +49,7 @@ const ( // genSessionFilter constructs a filter that can be used to select sessions only // if they match the policy of the client (namely anchor vs legacy). If // activeOnly is set, then only active sessions will be returned. -func (c *TowerClient) genSessionFilter( +func (c *client) genSessionFilter( activeOnly bool) wtdb.ClientSessionFilterFn { return func(session *wtdb.ClientSession) bool { @@ -92,7 +92,7 @@ type BreachRetributionBuilder func(id lnwire.ChannelID, commitHeight uint64) (*lnwallet.BreachRetribution, channeldb.ChannelType, error) -// newTowerMsg is an internal message we'll use within the TowerClient to signal +// newTowerMsg is an internal message we'll use within the client to signal // that a new tower can be considered. type newTowerMsg struct { // tower holds the info about the new Tower or new tower address @@ -106,7 +106,7 @@ type newTowerMsg struct { errChan chan error } -// staleTowerMsg is an internal message we'll use within the TowerClient to +// staleTowerMsg is an internal message we'll use within the client to // signal that a tower should no longer be considered. type staleTowerMsg struct { // id is the unique database identifier for the tower. @@ -128,8 +128,8 @@ type staleTowerMsg struct { errChan chan error } -// towerClientCfg holds the configuration values required by a TowerClient. -type towerClientCfg struct { +// clientCfg holds the configuration values required by a client. +type clientCfg struct { *Config // Policy is the session policy the client will propose when creating @@ -141,11 +141,10 @@ type towerClientCfg struct { getSweepScript func(lnwire.ChannelID) ([]byte, bool) } -// TowerClient is a concrete implementation of the Client interface, offering a -// non-blocking, reliable subsystem for backing up revoked states to a specified -// private tower. -type TowerClient struct { - cfg *towerClientCfg +// client manages backing up revoked states for all states that fall under a +// specific policy type. +type client struct { + cfg *clientCfg log btclog.Logger @@ -169,9 +168,9 @@ type TowerClient struct { quit chan struct{} } -// newTowerClient initializes a new TowerClient from the provided -// towerClientCfg. An error is returned if the client could not be initialized. -func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { +// newClient initializes a new client from the provided clientCfg. An error is +// returned if the client could not be initialized. +func newClient(cfg *clientCfg) (*client, error) { identifier, err := cfg.Policy.BlobType.Identifier() if err != nil { return nil, err @@ -188,7 +187,7 @@ func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { return nil, err } - c := &TowerClient{ + c := &client{ cfg: cfg, log: plog, pipeline: queue, @@ -349,7 +348,7 @@ func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID, // start initializes the watchtower client by loading or negotiating an active // session and then begins processing backup tasks from the request pipeline. -func (c *TowerClient) start() error { +func (c *client) start() error { c.log.Infof("Watchtower client starting") // First, restart a session queue for any sessions that have @@ -397,7 +396,7 @@ func (c *TowerClient) start() error { } // stop idempotently initiates a graceful shutdown of the watchtower client. -func (c *TowerClient) stop() error { +func (c *client) stop() error { var returnErr error c.log.Debugf("Stopping watchtower client") @@ -452,7 +451,7 @@ func (c *TowerClient) stop() error { // negotiated policy, or // - breached outputs contain too little value to sweep at the target sweep // fee rate. -func (c *TowerClient) backupState(chanID *lnwire.ChannelID, +func (c *client) backupState(chanID *lnwire.ChannelID, stateNum uint64) error { id := &wtdb.BackupID{ @@ -468,7 +467,7 @@ func (c *TowerClient) backupState(chanID *lnwire.ChannelID, // active client's advertised policy will be ignored, but may be resumed if the // client is restarted with a matching policy. If no candidates were found, nil // is returned to signal that we need to request a new policy. -func (c *TowerClient) nextSessionQueue() (*sessionQueue, error) { +func (c *client) nextSessionQueue() (*sessionQueue, error) { // Select any candidate session at random, and remove it from the set of // candidate sessions. var candidateSession *ClientSession @@ -508,13 +507,13 @@ func (c *TowerClient) nextSessionQueue() (*sessionQueue, error) { // stopAndRemoveSession stops the session with the given ID and removes it from // the in-memory active sessions set. -func (c *TowerClient) stopAndRemoveSession(id wtdb.SessionID) error { +func (c *client) stopAndRemoveSession(id wtdb.SessionID) error { return c.activeSessions.StopAndRemove(id) } // deleteSessionFromTower dials the tower that we created the session with and // attempts to send the tower the DeleteSession message. -func (c *TowerClient) deleteSessionFromTower(sess *wtdb.ClientSession) error { +func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error { // First, we check if we have already loaded this tower in our // candidate towers iterator. tower, err := c.candidateTowers.GetTower(sess.TowerID) @@ -638,10 +637,10 @@ func (c *TowerClient) deleteSessionFromTower(sess *wtdb.ClientSession) error { // backupDispatcher processes events coming from the taskPipeline and is // responsible for detecting when the client needs to renegotiate a session to -// fulfill continuing demand. The event loop exits if the TowerClient is quit. +// fulfill continuing demand. The event loop exits if the client is quit. // // NOTE: This method MUST be run as a goroutine. -func (c *TowerClient) backupDispatcher() { +func (c *client) backupDispatcher() { defer c.wg.Done() c.log.Tracef("Starting backup dispatcher") @@ -787,7 +786,7 @@ func (c *TowerClient) backupDispatcher() { // sessionQueue hasn't been exhausted before proceeding to the next task. Tasks // that are rejected because the active sessionQueue is full will be cached as // the prevTask, and should be reprocessed after obtaining a new sessionQueue. -func (c *TowerClient) processTask(task *wtdb.BackupID) { +func (c *client) processTask(task *wtdb.BackupID) { script, ok := c.cfg.getSweepScript(task.ChanID) if !ok { log.Infof("not processing task for unregistered channel: %s", @@ -811,7 +810,7 @@ func (c *TowerClient) processTask(task *wtdb.BackupID) { // prevTask is always removed as a result of this call. The client's // sessionQueue will be removed if accepting the task left the sessionQueue in // an exhausted state. -func (c *TowerClient) taskAccepted(task *wtdb.BackupID, +func (c *client) taskAccepted(task *wtdb.BackupID, newStatus sessionQueueStatus) { c.log.Infof("Queued %v successfully for session %v", task, @@ -849,7 +848,7 @@ func (c *TowerClient) taskAccepted(task *wtdb.BackupID, // exhausted and not shutting down, the client marks the task as ineligible, as // this implies we couldn't construct a valid justice transaction given the // session's policy. -func (c *TowerClient) taskRejected(task *wtdb.BackupID, +func (c *client) taskRejected(task *wtdb.BackupID, curStatus sessionQueueStatus) { switch curStatus { @@ -910,7 +909,7 @@ func (c *TowerClient) taskRejected(task *wtdb.BackupID, // dial connects the peer at addr using privKey as our secret key for the // connection. The connection will use the configured Net's resolver to resolve // the address for either Tor or clear net connections. -func (c *TowerClient) dial(localKey keychain.SingleKeyECDH, +func (c *client) dial(localKey keychain.SingleKeyECDH, addr *lnwire.NetAddress) (wtserver.Peer, error) { return c.cfg.AuthDial(localKey, addr, c.cfg.Dial) @@ -920,7 +919,7 @@ func (c *TowerClient) dial(localKey keychain.SingleKeyECDH, // error is returned if a message is not received before the server's read // timeout, the read off the wire failed, or the message could not be // deserialized. -func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) { +func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) { // Set a read timeout to ensure we drop the connection if nothing is // received in a timely manner. err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout)) @@ -954,7 +953,7 @@ func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) { } // sendMessage sends a watchtower wire message to the target peer. -func (c *TowerClient) sendMessage(peer wtserver.Peer, +func (c *client) sendMessage(peer wtserver.Peer, msg wtwire.Message) error { // Encode the next wire message into the buffer. @@ -988,7 +987,7 @@ func (c *TowerClient) sendMessage(peer wtserver.Peer, // newSessionQueue creates a sessionQueue from a ClientSession loaded from the // database and supplying it with the resources needed by the client. -func (c *TowerClient) newSessionQueue(s *ClientSession, +func (c *client) newSessionQueue(s *ClientSession, updates []wtdb.CommittedUpdate) *sessionQueue { return newSessionQueue(&sessionQueueConfig{ @@ -1010,7 +1009,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession, // getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the // passed ClientSession. If it exists, the active sessionQueue is returned. // Otherwise, a new sessionQueue is initialized and added to the set. -func (c *TowerClient) getOrInitActiveQueue(s *ClientSession, +func (c *client) getOrInitActiveQueue(s *ClientSession, updates []wtdb.CommittedUpdate) *sessionQueue { if sq, ok := c.activeSessions.Get(s.ID); ok { @@ -1024,7 +1023,7 @@ func (c *TowerClient) getOrInitActiveQueue(s *ClientSession, // adds the sessionQueue to the activeSessions set, and starts the sessionQueue // so that it can deliver any committed updates or begin accepting newly // assigned tasks. -func (c *TowerClient) initActiveQueue(s *ClientSession, +func (c *client) initActiveQueue(s *ClientSession, updates []wtdb.CommittedUpdate) *sessionQueue { // Initialize the session queue, providing it with all the resources it @@ -1044,7 +1043,7 @@ func (c *TowerClient) initActiveQueue(s *ClientSession, // it for new sessions. If the watchtower already exists, then any new addresses // included will be considered when dialing it for session negotiations and // backups. -func (c *TowerClient) addTower(tower *Tower) error { +func (c *client) addTower(tower *Tower) error { errChan := make(chan error, 1) select { @@ -1067,7 +1066,7 @@ func (c *TowerClient) addTower(tower *Tower) error { // handleNewTower handles a request for a new tower to be added. If the tower // already exists, then its corresponding sessions, if any, will be set // considered as candidates. -func (c *TowerClient) handleNewTower(tower *Tower) error { +func (c *client) handleNewTower(tower *Tower) error { c.candidateTowers.AddCandidate(tower) // Include all of its corresponding sessions to our set of candidates. @@ -1091,7 +1090,7 @@ func (c *TowerClient) handleNewTower(tower *Tower) error { // negotiations and from being used for any subsequent backups until it's added // again. If an address is provided, then this call only serves as a way of // removing the address from the watchtower instead. -func (c *TowerClient) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey, +func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey, addr net.Addr) error { errChan := make(chan error, 1) @@ -1119,7 +1118,7 @@ func (c *TowerClient) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey, // none of the tower's sessions have pending updates, then they will become // inactive and removed as candidates. If the active session queue corresponds // to any of these sessions, a new one will be negotiated. -func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { +func (c *client) handleStaleTower(msg *staleTowerMsg) error { // We'll first update our in-memory state. err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr) if err != nil { @@ -1168,7 +1167,7 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { // registeredTowers retrieves the list of watchtowers registered with the // client. -func (c *TowerClient) registeredTowers(towers []*wtdb.Tower, +func (c *client) registeredTowers(towers []*wtdb.Tower, opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) { // Generate a filter that will fetch all the client's sessions @@ -1209,7 +1208,7 @@ func (c *TowerClient) registeredTowers(towers []*wtdb.Tower, // lookupTower retrieves the info of sessions held with the given tower handled // by this client. -func (c *TowerClient) lookupTower(tower *wtdb.Tower, +func (c *client) lookupTower(tower *wtdb.Tower, opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) { opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false))) @@ -1227,19 +1226,19 @@ func (c *TowerClient) lookupTower(tower *wtdb.Tower, } // getStats returns the in-memory statistics of the client since startup. -func (c *TowerClient) getStats() ClientStats { +func (c *client) getStats() ClientStats { return c.stats.getStatsCopy() } // policy returns the active client policy configuration. -func (c *TowerClient) policy() wtpolicy.Policy { +func (c *client) policy() wtpolicy.Policy { return c.cfg.Policy } // logMessage writes information about a message received from a remote peer, // using directional prepositions to signal whether the message was sent or // received. -func (c *TowerClient) logMessage( +func (c *client) logMessage( peer wtserver.Peer, msg wtwire.Message, read bool) { var action = "Received" diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 452404919..95e2664eb 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -560,10 +560,7 @@ func (h *testHarness) startClient() { Address: towerTCPAddr, } - h.clientMgr, err = wtclient.NewManager(h.clientCfg) - require.NoError(h.t, err) - - _, err = h.clientMgr.NewClient(h.clientPolicy) + h.clientMgr, err = wtclient.NewManager(h.clientCfg, h.clientPolicy) require.NoError(h.t, err) require.NoError(h.t, h.clientMgr.Start()) require.NoError(h.t, h.clientMgr.AddTower(towerAddr)) diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index be0042f43..73f259085 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -22,9 +22,9 @@ import ( "github.com/lightningnetwork/lnd/watchtower/wtpolicy" ) -// TowerClientManager is the primary interface used by the daemon to control a +// ClientManager is the primary interface used by the daemon to control a // client's lifecycle and backup revoked states. -type TowerClientManager interface { +type ClientManager interface { // AddTower adds a new watchtower reachable at the given address and // considers it for new sessions. If the watchtower already exists, then // any new addresses included will be considered when dialing it for @@ -67,7 +67,7 @@ type TowerClientManager interface { BackupState(chanID *lnwire.ChannelID, stateNum uint64) error } -// Config provides the TowerClient with access to the resources it requires to +// Config provides the client with access to the resources it requires to // perform its duty. All nillable fields must be non-nil for the tower to be // initialized properly. type Config struct { @@ -156,7 +156,7 @@ type Manager struct { cfg *Config - clients map[blob.Type]*TowerClient + clients map[blob.Type]*client clientsMu sync.Mutex backupMu sync.Mutex @@ -169,10 +169,10 @@ type Manager struct { quit chan struct{} } -var _ TowerClientManager = (*Manager)(nil) +var _ ClientManager = (*Manager)(nil) // NewManager constructs a new Manager. -func NewManager(config *Config) (*Manager, error) { +func NewManager(config *Config, policies ...wtpolicy.Policy) (*Manager, error) { // Copy the config to prevent side effects from modifying both the // internal and external version of the Config. cfg := *config @@ -192,42 +192,54 @@ func NewManager(config *Config) (*Manager, error) { return nil, err } - return &Manager{ + m := &Manager{ cfg: &cfg, - clients: make(map[blob.Type]*TowerClient), + clients: make(map[blob.Type]*client), chanBlobType: make(map[lnwire.ChannelID]blob.Type), chanInfos: chanInfos, closableSessionQueue: newSessionCloseMinHeap(), quit: make(chan struct{}), - }, nil + } + + for _, policy := range policies { + if err = policy.Validate(); err != nil { + return nil, err + } + + if err = m.newClient(policy); err != nil { + return nil, err + } + } + + return m, nil } -// NewClient constructs a new TowerClient and adds it to the set of clients that +// newClient constructs a new client and adds it to the set of clients that // the Manager is keeping track of. -func (m *Manager) NewClient(policy wtpolicy.Policy) (*TowerClient, error) { +func (m *Manager) newClient(policy wtpolicy.Policy) error { m.clientsMu.Lock() defer m.clientsMu.Unlock() _, ok := m.clients[policy.BlobType] if ok { - return nil, fmt.Errorf("a client with blob type %s has "+ + return fmt.Errorf("a client with blob type %s has "+ "already been registered", policy.BlobType) } - cfg := &towerClientCfg{ + cfg := &clientCfg{ Config: m.cfg, Policy: policy, getSweepScript: m.getSweepScript, } - client, err := newTowerClient(cfg) + client, err := newClient(cfg) if err != nil { - return nil, err + return err } m.clients[policy.BlobType] = client - return client, nil + return nil } // Start starts all the clients that have been registered with the Manager. diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index 27c36c6fe..1bd7a4ddb 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -94,7 +94,7 @@ type sessionQueueConfig struct { // sessionQueue implements a reliable queue that will encrypt and send accepted // backups to the watchtower specified in the config's ClientSession. Calling // Stop will attempt to perform a clean shutdown replaying any un-committed -// pending updates to the TowerClient's main task pipeline. +// pending updates to the client's main task pipeline. type sessionQueue struct { started sync.Once stopped sync.Once