From ab0375e0c12d151193ef0be562016a32371344d5 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 11:28:29 +0200 Subject: [PATCH 01/12] wtclient+server: introduce tower client Manager Introduce a wtclient `Manager` which handles tower clients. It indexes clients by the policy used. The policy field is thus removed from the `Config` struct which configures the Manager and is instead added to a new `towerClientCfg` which configures a specific client managed by the manager. For now, only the `NewClient` method is added to the Manager. It can be used to construct a new `TowerClient`. The Manager currently does notthing with the clients added to it. --- server.go | 38 +++---- watchtower/wtclient/client.go | 123 +++-------------------- watchtower/wtclient/client_test.go | 40 ++++---- watchtower/wtclient/manager.go | 156 +++++++++++++++++++++++++++++ 4 files changed, 204 insertions(+), 153 deletions(-) create mode 100644 watchtower/wtclient/manager.go diff --git a/server.go b/server.go index fcc489435..f23b4415d 100644 --- a/server.go +++ b/server.go @@ -282,6 +282,8 @@ type server struct { sphinx *hop.OnionProcessor + towerClientMgr *wtclient.Manager + towerClient wtclient.Client anchorTowerClient wtclient.Client @@ -1548,7 +1550,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID - s.towerClient, err = wtclient.New(&wtclient.Config{ + s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{ FetchClosedChannel: fetchClosedChannel, BuildBreachRetribution: buildBreachRetribution, SessionCloseRange: cfg.WtClient.SessionCloseRange, @@ -1565,7 +1567,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Dial: cfg.net.Dial, AuthDial: authDial, DB: dbs.TowerClientDB, - Policy: policy, ChainHash: *s.cfg.ActiveNetParams.GenesisHash, MinBackoff: 10 * time.Second, MaxBackoff: 5 * time.Minute, @@ -1575,35 +1576,22 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return nil, err } + // Register a legacy tower client. + s.towerClient, err = s.towerClientMgr.NewClient(policy) + if err != nil { + return nil, err + } + // Copy the policy for legacy channels and set the blob flag // signalling support for anchor channels. anchorPolicy := policy anchorPolicy.TxPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel) - s.anchorTowerClient, err = wtclient.New(&wtclient.Config{ - FetchClosedChannel: fetchClosedChannel, - BuildBreachRetribution: buildBreachRetribution, - SessionCloseRange: cfg.WtClient.SessionCloseRange, - ChainNotifier: s.cc.ChainNotifier, - SubscribeChannelEvents: func() (subscribe.Subscription, - error) { - - return s.channelNotifier. - SubscribeChannelEvents() - }, - Signer: cc.Wallet.Cfg.Signer, - NewAddress: newSweepPkScriptGen(cc.Wallet), - SecretKeyRing: s.cc.KeyRing, - Dial: cfg.net.Dial, - AuthDial: authDial, - DB: dbs.TowerClientDB, - Policy: anchorPolicy, - ChainHash: *s.cfg.ActiveNetParams.GenesisHash, - MinBackoff: 10 * time.Second, - MaxBackoff: 5 * time.Minute, - MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue, - }) + // Register an anchors tower client. + s.anchorTowerClient, err = s.towerClientMgr.NewClient( + anchorPolicy, + ) if err != nil { return nil, err } diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index f9036e87f..68839b8df 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -11,19 +11,16 @@ import ( "time" "github.com/btcsuite/btcd/btcec/v2" - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/fn" - "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/subscribe" - "github.com/lightningnetwork/lnd/tor" "github.com/lightningnetwork/lnd/watchtower/wtdb" "github.com/lightningnetwork/lnd/watchtower/wtpolicy" "github.com/lightningnetwork/lnd/watchtower/wtserver" @@ -148,92 +145,6 @@ type Client interface { Stop() error } -// Config provides the TowerClient with access to the resources it requires to -// perform its duty. All nillable fields must be non-nil for the tower to be -// initialized properly. -type Config struct { - // Signer provides access to the wallet so that the client can sign - // justice transactions that spend from a remote party's commitment - // transaction. - Signer input.Signer - - // SubscribeChannelEvents can be used to subscribe to channel event - // notifications. - SubscribeChannelEvents func() (subscribe.Subscription, error) - - // FetchClosedChannel can be used to fetch the info about a closed - // channel. If the channel is not found or not yet closed then - // channeldb.ErrClosedChannelNotFound will be returned. - FetchClosedChannel func(cid lnwire.ChannelID) ( - *channeldb.ChannelCloseSummary, error) - - // ChainNotifier can be used to subscribe to block notifications. - ChainNotifier chainntnfs.ChainNotifier - - // BuildBreachRetribution is a function closure that allows the client - // fetch the breach retribution info for a certain channel at a certain - // revoked commitment height. - BuildBreachRetribution BreachRetributionBuilder - - // NewAddress generates a new on-chain sweep pkscript. - NewAddress func() ([]byte, error) - - // SecretKeyRing is used to derive the session keys used to communicate - // with the tower. The client only stores the KeyLocators internally so - // that we never store private keys on disk. - SecretKeyRing ECDHKeyRing - - // Dial connects to an addr using the specified net and returns the - // connection object. - Dial tor.DialFunc - - // AuthDialer establishes a brontide connection over an onion or clear - // network. - AuthDial AuthDialer - - // DB provides access to the client's stable storage medium. - DB DB - - // Policy is the session policy the client will propose when creating - // new sessions with the tower. If the policy differs from any active - // sessions recorded in the database, those sessions will be ignored and - // new sessions will be requested immediately. - Policy wtpolicy.Policy - - // ChainHash identifies the chain that the client is on and for which - // the tower must be watching to monitor for breaches. - ChainHash chainhash.Hash - - // ReadTimeout is the duration we will wait during a read before - // breaking out of a blocking read. If the value is less than or equal - // to zero, the default will be used instead. - ReadTimeout time.Duration - - // WriteTimeout is the duration we will wait during a write before - // breaking out of a blocking write. If the value is less than or equal - // to zero, the default will be used instead. - WriteTimeout time.Duration - - // MinBackoff defines the initial backoff applied to connections with - // watchtowers. Subsequent backoff durations will grow exponentially up - // until MaxBackoff. - MinBackoff time.Duration - - // MaxBackoff defines the maximum backoff applied to connections with - // watchtowers. If the exponential backoff produces a timeout greater - // than this value, the backoff will be clamped to MaxBackoff. - MaxBackoff time.Duration - - // SessionCloseRange is the range over which we will generate a random - // number of blocks to delay closing a session after its last channel - // has been closed. - SessionCloseRange uint32 - - // MaxTasksInMemQueue is the maximum number of backup tasks that should - // be kept in-memory. Any more tasks will overflow to disk. - MaxTasksInMemQueue uint64 -} - // BreachRetributionBuilder is a function that can be used to construct a // BreachRetribution from a channel ID and a commitment height. type BreachRetributionBuilder func(id lnwire.ChannelID, @@ -273,6 +184,17 @@ type staleTowerMsg struct { errChan chan error } +// towerClientCfg holds the configuration values required by a TowerClient. +type towerClientCfg struct { + *Config + + // Policy is the session policy the client will propose when creating + // new sessions with the tower. If the policy differs from any active + // sessions recorded in the database, those sessions will be ignored and + // new sessions will be requested immediately. + Policy wtpolicy.Policy +} + // TowerClient is a concrete implementation of the Client interface, offering a // non-blocking, reliable subsystem for backing up revoked states to a specified // private tower. @@ -280,7 +202,7 @@ type TowerClient struct { started sync.Once stopped sync.Once - cfg *Config + cfg *towerClientCfg log btclog.Logger @@ -313,24 +235,9 @@ type TowerClient struct { // interface. var _ Client = (*TowerClient)(nil) -// New initializes a new TowerClient from the provide Config. An error is -// returned if the client could not be initialized. -func New(config *Config) (*TowerClient, error) { - // Copy the config to prevent side effects from modifying both the - // internal and external version of the Config. - cfg := new(Config) - *cfg = *config - - // Set the read timeout to the default if none was provided. - if cfg.ReadTimeout <= 0 { - cfg.ReadTimeout = DefaultReadTimeout - } - - // Set the write timeout to the default if none was provided. - if cfg.WriteTimeout <= 0 { - cfg.WriteTimeout = DefaultWriteTimeout - } - +// newTowerClient initializes a new TowerClient from the provided +// towerClientCfg. An error is returned if the client could not be initialized. +func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { identifier, err := cfg.Policy.BlobType.Identifier() if err != nil { return nil, err diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 51dd16e09..00404b40f 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -395,15 +395,16 @@ func (c *mockChannel) getState( } type testHarness struct { - t *testing.T - cfg harnessCfg - signer *wtmock.MockSigner - capacity lnwire.MilliSatoshi - clientDB *wtdb.ClientDB - clientCfg *wtclient.Config - client wtclient.Client - server *serverHarness - net *mockNet + t *testing.T + cfg harnessCfg + signer *wtmock.MockSigner + capacity lnwire.MilliSatoshi + clientDB *wtdb.ClientDB + clientCfg *wtclient.Config + clientPolicy wtpolicy.Policy + client wtclient.Client + server *serverHarness + net *mockNet blockEvents *mockBlockSub height int32 @@ -486,6 +487,7 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { return &channeldb.ChannelCloseSummary{CloseHeight: height}, nil } + h.clientPolicy = cfg.policy h.clientCfg = &wtclient.Config{ Signer: signer, SubscribeChannelEvents: func() (subscribe.Subscription, error) { @@ -497,7 +499,6 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { DB: clientDB, AuthDial: mockNet.AuthDial, SecretKeyRing: wtmock.NewSecretKeyRing(), - Policy: cfg.policy, NewAddress: func() ([]byte, error) { return addrScript, nil }, @@ -559,7 +560,10 @@ func (h *testHarness) startClient() { Address: towerTCPAddr, } - h.client, err = wtclient.New(h.clientCfg) + m, err := wtclient.NewManager(h.clientCfg) + require.NoError(h.t, err) + + h.client, err = m.NewClient(h.clientPolicy) require.NoError(h.t, err) require.NoError(h.t, h.client.Start()) require.NoError(h.t, h.client.AddTower(towerAddr)) @@ -1452,9 +1456,7 @@ var clientTests = []clientTest{ // Assert that the server has updates for the clients // most recent policy. - h.server.assertUpdatesForPolicy( - hints, h.clientCfg.Policy, - ) + h.server.assertUpdatesForPolicy(hints, h.clientPolicy) }, }, { @@ -1496,7 +1498,7 @@ var clientTests = []clientTest{ // Restart the client with a new policy, which will // immediately try to overwrite the prior session with // the old policy. - h.clientCfg.Policy.SweepFeeRate *= 2 + h.clientPolicy.SweepFeeRate *= 2 h.startClient() // Wait for all the updates to be populated in the @@ -1505,9 +1507,7 @@ var clientTests = []clientTest{ // Assert that the server has updates for the clients // most recent policy. - h.server.assertUpdatesForPolicy( - hints, h.clientCfg.Policy, - ) + h.server.assertUpdatesForPolicy(hints, h.clientPolicy) }, }, { @@ -1549,10 +1549,10 @@ var clientTests = []clientTest{ // adjusting the MaxUpdates. The client should detect // that the two policies have equivalent TxPolicies and // continue using the first. - expPolicy := h.clientCfg.Policy + expPolicy := h.clientPolicy // Restart the client with a new policy. - h.clientCfg.Policy.MaxUpdates = 20 + h.clientPolicy.MaxUpdates = 20 h.startClient() // Now, queue the second half of the retributions. diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go new file mode 100644 index 000000000..922766d2d --- /dev/null +++ b/watchtower/wtclient/manager.go @@ -0,0 +1,156 @@ +package wtclient + +import ( + "fmt" + "sync" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/subscribe" + "github.com/lightningnetwork/lnd/tor" + "github.com/lightningnetwork/lnd/watchtower/blob" + "github.com/lightningnetwork/lnd/watchtower/wtpolicy" +) + +// Config provides the TowerClient with access to the resources it requires to +// perform its duty. All nillable fields must be non-nil for the tower to be +// initialized properly. +type Config struct { + // Signer provides access to the wallet so that the client can sign + // justice transactions that spend from a remote party's commitment + // transaction. + Signer input.Signer + + // SubscribeChannelEvents can be used to subscribe to channel event + // notifications. + SubscribeChannelEvents func() (subscribe.Subscription, error) + + // FetchClosedChannel can be used to fetch the info about a closed + // channel. If the channel is not found or not yet closed then + // channeldb.ErrClosedChannelNotFound will be returned. + FetchClosedChannel func(cid lnwire.ChannelID) ( + *channeldb.ChannelCloseSummary, error) + + // ChainNotifier can be used to subscribe to block notifications. + ChainNotifier chainntnfs.ChainNotifier + + // BuildBreachRetribution is a function closure that allows the client + // fetch the breach retribution info for a certain channel at a certain + // revoked commitment height. + BuildBreachRetribution BreachRetributionBuilder + + // NewAddress generates a new on-chain sweep pkscript. + NewAddress func() ([]byte, error) + + // SecretKeyRing is used to derive the session keys used to communicate + // with the tower. The client only stores the KeyLocators internally so + // that we never store private keys on disk. + SecretKeyRing ECDHKeyRing + + // Dial connects to an addr using the specified net and returns the + // connection object. + Dial tor.DialFunc + + // AuthDialer establishes a brontide connection over an onion or clear + // network. + AuthDial AuthDialer + + // DB provides access to the client's stable storage medium. + DB DB + + // ChainHash identifies the chain that the client is on and for which + // the tower must be watching to monitor for breaches. + ChainHash chainhash.Hash + + // ReadTimeout is the duration we will wait during a read before + // breaking out of a blocking read. If the value is less than or equal + // to zero, the default will be used instead. + ReadTimeout time.Duration + + // WriteTimeout is the duration we will wait during a write before + // breaking out of a blocking write. If the value is less than or equal + // to zero, the default will be used instead. + WriteTimeout time.Duration + + // MinBackoff defines the initial backoff applied to connections with + // watchtowers. Subsequent backoff durations will grow exponentially up + // until MaxBackoff. + MinBackoff time.Duration + + // MaxBackoff defines the maximum backoff applied to connections with + // watchtowers. If the exponential backoff produces a timeout greater + // than this value, the backoff will be clamped to MaxBackoff. + MaxBackoff time.Duration + + // SessionCloseRange is the range over which we will generate a random + // number of blocks to delay closing a session after its last channel + // has been closed. + SessionCloseRange uint32 + + // MaxTasksInMemQueue is the maximum number of backup tasks that should + // be kept in-memory. Any more tasks will overflow to disk. + MaxTasksInMemQueue uint64 +} + +// Manager manages the various tower clients that are active. A client is +// required for each different commitment transaction type. The Manager acts as +// a tower client multiplexer. +type Manager struct { + cfg *Config + + clients map[blob.Type]*TowerClient + clientsMu sync.Mutex +} + +// NewManager constructs a new Manager. +func NewManager(config *Config) (*Manager, error) { + // Copy the config to prevent side effects from modifying both the + // internal and external version of the Config. + cfg := *config + + // Set the read timeout to the default if none was provided. + if cfg.ReadTimeout <= 0 { + cfg.ReadTimeout = DefaultReadTimeout + } + + // Set the write timeout to the default if none was provided. + if cfg.WriteTimeout <= 0 { + cfg.WriteTimeout = DefaultWriteTimeout + } + + return &Manager{ + cfg: &cfg, + clients: make(map[blob.Type]*TowerClient), + }, nil +} + +// NewClient constructs a new TowerClient and adds it to the set of clients that +// the Manager is keeping track of. +func (m *Manager) NewClient(policy wtpolicy.Policy) (*TowerClient, error) { + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + _, ok := m.clients[policy.BlobType] + if ok { + return nil, fmt.Errorf("a client with blob type %s has "+ + "already been registered", policy.BlobType) + } + + cfg := &towerClientCfg{ + Config: m.cfg, + Policy: policy, + } + + client, err := newTowerClient(cfg) + if err != nil { + return nil, err + } + + m.clients[policy.BlobType] = client + + return client, nil +} From 2abc422aac38620514c1f5667f5961daeffa217b Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 11:39:24 +0200 Subject: [PATCH 02/12] watchtower+server: let manager Start & Stop the clients In this commit, the `Stop` and `Start` methods are removed from the `Client` interface and instead added to the new `Manager`. Callers now only need to call the Manager to start or stop the clients instead of needing to call stop/start on each individual client. --- server.go | 25 +-- watchtower/wtclient/client.go | 272 +++++++++++++---------------- watchtower/wtclient/client_test.go | 29 +-- watchtower/wtclient/manager.go | 38 ++++ 4 files changed, 183 insertions(+), 181 deletions(-) diff --git a/server.go b/server.go index f23b4415d..b20c692ce 100644 --- a/server.go +++ b/server.go @@ -1913,19 +1913,12 @@ func (s *server) Start() error { } cleanup = cleanup.add(s.htlcNotifier.Stop) - if s.towerClient != nil { - if err := s.towerClient.Start(); err != nil { + if s.towerClientMgr != nil { + if err := s.towerClientMgr.Start(); err != nil { startErr = err return } - cleanup = cleanup.add(s.towerClient.Stop) - } - if s.anchorTowerClient != nil { - if err := s.anchorTowerClient.Start(); err != nil { - startErr = err - return - } - cleanup = cleanup.add(s.anchorTowerClient.Stop) + cleanup = cleanup.add(s.towerClientMgr.Stop) } if err := s.sweeper.Start(); err != nil { @@ -2298,16 +2291,10 @@ func (s *server) Stop() error { // client which will reliably flush all queued states to the // tower. If this is halted for any reason, the force quit timer // will kick in and abort to allow this method to return. - if s.towerClient != nil { - if err := s.towerClient.Stop(); err != nil { + if s.towerClientMgr != nil { + if err := s.towerClientMgr.Stop(); err != nil { srvrLog.Warnf("Unable to shut down tower "+ - "client: %v", err) - } - } - if s.anchorTowerClient != nil { - if err := s.anchorTowerClient.Stop(); err != nil { - srvrLog.Warnf("Unable to shut down anchor "+ - "tower client: %v", err) + "client manager: %v", err) } } diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 68839b8df..3441891ee 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -134,15 +134,6 @@ type Client interface { // successful unless the justice transaction would create dust outputs // when trying to abide by the negotiated policy. BackupState(chanID *lnwire.ChannelID, stateNum uint64) error - - // Start initializes the watchtower client, allowing it process requests - // to backup revoked channel states. - Start() error - - // Stop attempts a graceful shutdown of the watchtower client. In doing - // so, it will attempt to flush the pipeline and deliver any queued - // states to the tower before exiting. - Stop() error } // BreachRetributionBuilder is a function that can be used to construct a @@ -199,9 +190,6 @@ type towerClientCfg struct { // non-blocking, reliable subsystem for backing up revoked states to a specified // private tower. type TowerClient struct { - started sync.Once - stopped sync.Once - cfg *towerClientCfg log btclog.Logger @@ -420,170 +408,158 @@ func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID, return sessions, nil } -// Start initializes the watchtower client by loading or negotiating an active +// start initializes the watchtower client by loading or negotiating an active // session and then begins processing backup tasks from the request pipeline. -func (c *TowerClient) Start() error { - var returnErr error - c.started.Do(func() { - c.log.Infof("Watchtower client starting") +func (c *TowerClient) start() error { + c.log.Infof("Watchtower client starting") - // First, restart a session queue for any sessions that have - // committed but unacked state updates. This ensures that these - // sessions will be able to flush the committed updates after a - // restart. - fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates - for _, session := range c.candidateSessions { - committedUpdates, err := fetchCommittedUpdates( - &session.ID, - ) - if err != nil { - returnErr = err - return - } - - if len(committedUpdates) > 0 { - c.log.Infof("Starting session=%s to process "+ - "%d committed backups", session.ID, - len(committedUpdates)) - - c.initActiveQueue(session, committedUpdates) - } - } - - chanSub, err := c.cfg.SubscribeChannelEvents() - if err != nil { - returnErr = err - return - } - - // Iterate over the list of registered channels and check if - // any of them can be marked as closed. - for id := range c.chanInfos { - isClosed, closedHeight, err := c.isChannelClosed(id) - if err != nil { - returnErr = err - return - } - - if !isClosed { - continue - } - - _, err = c.cfg.DB.MarkChannelClosed(id, closedHeight) - if err != nil { - c.log.Errorf("could not mark channel(%s) as "+ - "closed: %v", id, err) - - continue - } - - // Since the channel has been marked as closed, we can - // also remove it from the channel summaries map. - delete(c.chanInfos, id) - } - - // Load all closable sessions. - closableSessions, err := c.cfg.DB.ListClosableSessions() - if err != nil { - returnErr = err - return - } - - err = c.trackClosableSessions(closableSessions) - if err != nil { - returnErr = err - return - } - - c.wg.Add(1) - go c.handleChannelCloses(chanSub) - - // Subscribe to new block events. - blockEvents, err := c.cfg.ChainNotifier.RegisterBlockEpochNtfn( - nil, + // First, restart a session queue for any sessions that have + // committed but unacked state updates. This ensures that these + // sessions will be able to flush the committed updates after a + // restart. + fetchCommittedUpdates := c.cfg.DB.FetchSessionCommittedUpdates + for _, session := range c.candidateSessions { + committedUpdates, err := fetchCommittedUpdates( + &session.ID, ) if err != nil { - returnErr = err - return + return err } - c.wg.Add(1) - go c.handleClosableSessions(blockEvents) + if len(committedUpdates) > 0 { + c.log.Infof("Starting session=%s to process "+ + "%d committed backups", session.ID, + len(committedUpdates)) - // Now start the session negotiator, which will allow us to - // request new session as soon as the backupDispatcher starts - // up. - err = c.negotiator.Start() + c.initActiveQueue(session, committedUpdates) + } + } + + chanSub, err := c.cfg.SubscribeChannelEvents() + if err != nil { + return err + } + + // Iterate over the list of registered channels and check if + // any of them can be marked as closed. + for id := range c.chanInfos { + isClosed, closedHeight, err := c.isChannelClosed(id) if err != nil { - returnErr = err - return + return err } - // Start the task pipeline to which new backup tasks will be - // submitted from active links. - err = c.pipeline.Start() + if !isClosed { + continue + } + + _, err = c.cfg.DB.MarkChannelClosed(id, closedHeight) if err != nil { - returnErr = err - return + c.log.Errorf("could not mark channel(%s) as "+ + "closed: %v", id, err) + + continue } - c.wg.Add(1) - go c.backupDispatcher() + // Since the channel has been marked as closed, we can + // also remove it from the channel summaries map. + delete(c.chanInfos, id) + } - c.log.Infof("Watchtower client started successfully") - }) - return returnErr + // Load all closable sessions. + closableSessions, err := c.cfg.DB.ListClosableSessions() + if err != nil { + return err + } + + err = c.trackClosableSessions(closableSessions) + if err != nil { + return err + } + + c.wg.Add(1) + go c.handleChannelCloses(chanSub) + + // Subscribe to new block events. + blockEvents, err := c.cfg.ChainNotifier.RegisterBlockEpochNtfn( + nil, + ) + if err != nil { + return err + } + + c.wg.Add(1) + go c.handleClosableSessions(blockEvents) + + // Now start the session negotiator, which will allow us to + // request new session as soon as the backupDispatcher starts + // up. + err = c.negotiator.Start() + if err != nil { + return err + } + + // Start the task pipeline to which new backup tasks will be + // submitted from active links. + err = c.pipeline.Start() + if err != nil { + return err + } + + c.wg.Add(1) + go c.backupDispatcher() + + c.log.Infof("Watchtower client started successfully") + + return nil } -// Stop idempotently initiates a graceful shutdown of the watchtower client. -func (c *TowerClient) Stop() error { +// stop idempotently initiates a graceful shutdown of the watchtower client. +func (c *TowerClient) stop() error { var returnErr error - c.stopped.Do(func() { - c.log.Debugf("Stopping watchtower client") + c.log.Debugf("Stopping watchtower client") - // 1. Stop the session negotiator. - err := c.negotiator.Stop() + // 1. Stop the session negotiator. + err := c.negotiator.Stop() + if err != nil { + returnErr = err + } + + // 2. Stop the backup dispatcher and any other goroutines. + close(c.quit) + c.wg.Wait() + + // 3. If there was a left over 'prevTask' from the backup + // dispatcher, replay that onto the pipeline. + if c.prevTask != nil { + err = c.pipeline.QueueBackupID(c.prevTask) if err != nil { returnErr = err } + } - // 2. Stop the backup dispatcher and any other goroutines. - close(c.quit) - c.wg.Wait() - - // 3. If there was a left over 'prevTask' from the backup - // dispatcher, replay that onto the pipeline. - if c.prevTask != nil { - err = c.pipeline.QueueBackupID(c.prevTask) + // 4. Shutdown all active session queues in parallel. These will + // exit once all unhandled updates have been replayed to the + // task pipeline. + c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { + return func() { + err := s.Stop(false) if err != nil { + c.log.Errorf("could not stop session "+ + "queue: %s: %v", s.ID(), err) + returnErr = err } } - - // 4. Shutdown all active session queues in parallel. These will - // exit once all unhandled updates have been replayed to the - // task pipeline. - c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { - return func() { - err := s.Stop(false) - if err != nil { - c.log.Errorf("could not stop session "+ - "queue: %s: %v", s.ID(), err) - - returnErr = err - } - } - }) - - // 5. Shutdown the backup queue, which will prevent any further - // updates from being accepted. - if err = c.pipeline.Stop(); err != nil { - returnErr = err - } - - c.log.Debugf("Client successfully stopped, stats: %s", c.stats) }) + // 5. Shutdown the backup queue, which will prevent any further + // updates from being accepted. + if err = c.pipeline.Stop(); err != nil { + returnErr = err + } + + c.log.Debugf("Client successfully stopped, stats: %s", c.stats) + return returnErr } diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 00404b40f..ee383b495 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -399,6 +399,7 @@ type testHarness struct { cfg harnessCfg signer *wtmock.MockSigner capacity lnwire.MilliSatoshi + clientMgr *wtclient.Manager clientDB *wtdb.ClientDB clientCfg *wtclient.Config clientPolicy wtpolicy.Policy @@ -526,7 +527,7 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { h.startClient() t.Cleanup(func() { - require.NoError(t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) require.NoError(t, h.clientDB.Close()) }) @@ -560,12 +561,12 @@ func (h *testHarness) startClient() { Address: towerTCPAddr, } - m, err := wtclient.NewManager(h.clientCfg) + h.clientMgr, err = wtclient.NewManager(h.clientCfg) require.NoError(h.t, err) - h.client, err = m.NewClient(h.clientPolicy) + h.client, err = h.clientMgr.NewClient(h.clientPolicy) require.NoError(h.t, err) - require.NoError(h.t, h.client.Start()) + require.NoError(h.t, h.clientMgr.Start()) require.NoError(h.t, h.client.AddTower(towerAddr)) } @@ -1127,7 +1128,7 @@ var clientTests = []clientTest{ ) // Stop the client, subsequent backups should fail. - h.client.Stop() + require.NoError(h.t, h.clientMgr.Stop()) // Advance the channel and try to back up the states. We // expect ErrClientExiting to be returned from @@ -1242,7 +1243,7 @@ var clientTests = []clientTest{ // Stop the client to abort the state updates it has // queued. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) // Restart the server and allow it to ack the updates // after the client retransmits the unacked update. @@ -1437,7 +1438,7 @@ var clientTests = []clientTest{ h.server.waitForUpdates(nil, waitTime) // Stop the client since it has queued backups. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) // Restart the server and allow it to ack session // creation. @@ -1487,7 +1488,7 @@ var clientTests = []clientTest{ h.server.waitForUpdates(nil, waitTime) // Stop the client since it has queued backups. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) // Restart the server and allow it to ack session // creation. @@ -1541,7 +1542,7 @@ var clientTests = []clientTest{ h.server.waitForUpdates(hints[:numUpdates/2], waitTime) // Stop the client, which should have no more backups. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) // Record the policy that the first half was stored // under. We'll expect the second half to also be @@ -1602,7 +1603,7 @@ var clientTests = []clientTest{ // Restart the client, so we can ensure the deduping is // maintained across restarts. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) h.startClient() // Try to back up the full range of retributions. Only @@ -1882,7 +1883,7 @@ var clientTests = []clientTest{ require.False(h.t, h.isSessionClosable(sessionIDs[0])) // Restart the client. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) h.startClient() // The session should now have been marked as closable. @@ -2069,7 +2070,7 @@ var clientTests = []clientTest{ h.server.waitForUpdates(hints[:numUpdates/2], waitTime) // Now stop the client and reset its database. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) db := newClientDB(h.t) h.clientDB = db @@ -2122,7 +2123,7 @@ var clientTests = []clientTest{ h.backupStates(chanID, 0, numUpdates/2, nil) // Restart the Client. And also now start the server. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) h.server.start() h.startClient() @@ -2395,7 +2396,7 @@ var clientTests = []clientTest{ // Now restart the client. This ensures that the // updates are no longer in the pending queue. - require.NoError(h.t, h.client.Stop()) + require.NoError(h.t, h.clientMgr.Stop()) h.startClient() // Now remove the tower. diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index 922766d2d..b26ef3a6f 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -100,6 +100,9 @@ type Config struct { // required for each different commitment transaction type. The Manager acts as // a tower client multiplexer. type Manager struct { + started sync.Once + stopped sync.Once + cfg *Config clients map[blob.Type]*TowerClient @@ -154,3 +157,38 @@ func (m *Manager) NewClient(policy wtpolicy.Policy) (*TowerClient, error) { return client, nil } + +// Start starts all the clients that have been registered with the Manager. +func (m *Manager) Start() error { + var returnErr error + m.started.Do(func() { + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + for _, client := range m.clients { + if err := client.start(); err != nil { + returnErr = err + return + } + } + }) + + return returnErr +} + +// Stop stops all the clients that the Manger is managing. +func (m *Manager) Stop() error { + var returnErr error + m.stopped.Do(func() { + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + for _, client := range m.clients { + if err := client.stop(); err != nil { + returnErr = err + } + } + }) + + return returnErr +} From a44bf381c4e8ab9ca1c57238ab5dacbbfe8ab973 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 16 May 2023 13:28:11 +0200 Subject: [PATCH 03/12] multi: move AddTower to Tower Client Manager In this commit we move the AddTower method from the Client interface to the TowerClientManager interface. The wtclientrpc is updated to call the `AddTower` method of the Manager instead of calling the `AddTower` method of each individual client. The TowerClient now is also only concerned with adding a new tower (or new tower address) to its in-memory state; the tower Manager will handle adding the tower to the DB. --- lnrpc/wtclientrpc/config.go | 4 +++ lnrpc/wtclientrpc/wtclient.go | 6 +--- rpcserver.go | 2 +- subrpcserver_config.go | 4 +++ watchtower/wtclient/client.go | 37 ++++++---------------- watchtower/wtclient/client_test.go | 10 +++--- watchtower/wtclient/manager.go | 51 ++++++++++++++++++++++++++++++ 7 files changed, 75 insertions(+), 39 deletions(-) diff --git a/lnrpc/wtclientrpc/config.go b/lnrpc/wtclientrpc/config.go index 9127c0846..ed0401b13 100644 --- a/lnrpc/wtclientrpc/config.go +++ b/lnrpc/wtclientrpc/config.go @@ -23,6 +23,10 @@ type Config struct { // we'll interact through the watchtower RPC subserver. AnchorClient wtclient.Client + // ClientMgr is a tower client manager that manages a set of tower + // clients. + ClientMgr wtclient.TowerClientManager + // Resolver is a custom resolver that will be used to resolve watchtower // addresses to ensure we don't leak any information when running over // non-clear networks, e.g. Tor, etc. diff --git a/lnrpc/wtclientrpc/wtclient.go b/lnrpc/wtclientrpc/wtclient.go index aa7e1ae21..ad99f0541 100644 --- a/lnrpc/wtclientrpc/wtclient.go +++ b/lnrpc/wtclientrpc/wtclient.go @@ -208,11 +208,7 @@ func (c *WatchtowerClient) AddTower(ctx context.Context, Address: addr, } - // TODO(conner): make atomic via multiplexed client - if err := c.cfg.Client.AddTower(towerAddr); err != nil { - return nil, err - } - if err := c.cfg.AnchorClient.AddTower(towerAddr); err != nil { + if err := c.cfg.ClientMgr.AddTower(towerAddr); err != nil { return nil, err } diff --git a/rpcserver.go b/rpcserver.go index fa23f0485..4685b731b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -744,7 +744,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter, routerBackend, s.nodeSigner, s.graphDB, s.chanStateDB, s.sweeper, tower, s.towerClient, s.anchorTowerClient, - r.cfg.net.ResolveTCPAddr, genInvoiceFeatures, + s.towerClientMgr, r.cfg.net.ResolveTCPAddr, genInvoiceFeatures, genAmpInvoiceFeatures, s.getNodeAnnouncement, s.updateAndBrodcastSelfNode, parseAddr, rpcsLog, s.aliasMgr.GetPeerAlias, diff --git a/subrpcserver_config.go b/subrpcserver_config.go index 7706dfd27..27bef9f92 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -115,6 +115,7 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, tower *watchtower.Standalone, towerClient wtclient.Client, anchorTowerClient wtclient.Client, + towerClientMgr wtclient.TowerClientManager, tcpResolver lncfg.TCPResolver, genInvoiceFeatures func() *lnwire.FeatureVector, genAmpInvoiceFeatures func() *lnwire.FeatureVector, @@ -297,6 +298,9 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, subCfgValue.FieldByName("AnchorClient").Set( reflect.ValueOf(anchorTowerClient), ) + subCfgValue.FieldByName("ClientMgr").Set( + reflect.ValueOf(towerClientMgr), + ) } subCfgValue.FieldByName("Resolver").Set( reflect.ValueOf(tcpResolver), diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 3441891ee..e1749374a 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -95,12 +95,6 @@ type RegisteredTower struct { // Client is the primary interface used by the daemon to control a client's // lifecycle and backup revoked states. type Client interface { - // AddTower adds a new watchtower reachable at the given address and - // considers it for new sessions. If the watchtower already exists, then - // any new addresses included will be considered when dialing it for - // session negotiations and backups. - AddTower(*lnwire.NetAddress) error - // RemoveTower removes a watchtower from being considered for future // session negotiations and from being used for any subsequent backups // until it's added again. If an address is provided, then this call @@ -145,9 +139,9 @@ type BreachRetributionBuilder func(id lnwire.ChannelID, // newTowerMsg is an internal message we'll use within the TowerClient to signal // that a new tower can be considered. type newTowerMsg struct { - // addr is the tower's reachable address that we'll use to establish a - // connection with. - addr *lnwire.NetAddress + // tower holds the info about the new Tower or new tower address + // required to connect to it. + tower *Tower // errChan is the channel through which we'll send a response back to // the caller when handling their request. @@ -1071,7 +1065,7 @@ func (c *TowerClient) backupDispatcher() { // its corresponding sessions, if any, as new // candidates. case msg := <-c.newTowers: - msg.errChan <- c.handleNewTower(msg) + msg.errChan <- c.handleNewTower(msg.tower) // A tower has been requested to be removed. We'll // only allow removal of it if the address in question @@ -1155,7 +1149,7 @@ func (c *TowerClient) backupDispatcher() { // its corresponding sessions, if any, as new // candidates. case msg := <-c.newTowers: - msg.errChan <- c.handleNewTower(msg) + msg.errChan <- c.handleNewTower(msg.tower) // A tower has been removed, so we'll remove certain // information that's persisted and also in our @@ -1451,16 +1445,16 @@ func (c *TowerClient) isChannelClosed(id lnwire.ChannelID) (bool, uint32, return true, chanSum.CloseHeight, nil } -// AddTower adds a new watchtower reachable at the given address and considers +// addTower adds a new watchtower reachable at the given address and considers // it for new sessions. If the watchtower already exists, then any new addresses // included will be considered when dialing it for session negotiations and // backups. -func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error { +func (c *TowerClient) addTower(tower *Tower) error { errChan := make(chan error, 1) select { case c.newTowers <- &newTowerMsg{ - addr: addr, + tower: tower, errChan: errChan, }: case <-c.pipeline.quit: @@ -1478,20 +1472,7 @@ func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error { // handleNewTower handles a request for a new tower to be added. If the tower // already exists, then its corresponding sessions, if any, will be set // considered as candidates. -func (c *TowerClient) handleNewTower(msg *newTowerMsg) error { - // We'll start by updating our persisted state, followed by our - // in-memory state, with the new tower. This might not actually be a new - // tower, but it might include a new address at which it can be reached. - dbTower, err := c.cfg.DB.CreateTower(msg.addr) - if err != nil { - return err - } - - tower, err := NewTowerFromDBTower(dbTower) - if err != nil { - return err - } - +func (c *TowerClient) handleNewTower(tower *Tower) error { c.candidateTowers.AddCandidate(tower) // Include all of its corresponding sessions to our set of candidates. diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index ee383b495..6be1bf496 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -567,7 +567,7 @@ func (h *testHarness) startClient() { h.client, err = h.clientMgr.NewClient(h.clientPolicy) require.NoError(h.t, err) require.NoError(h.t, h.clientMgr.Start()) - require.NoError(h.t, h.client.AddTower(towerAddr)) + require.NoError(h.t, h.clientMgr.AddTower(towerAddr)) } // chanIDFromInt creates a unique channel id given a unique integral id. @@ -757,7 +757,7 @@ func (h *testHarness) recvPayments(id, from, to uint64, func (h *testHarness) addTower(addr *lnwire.NetAddress) { h.t.Helper() - err := h.client.AddTower(addr) + err := h.clientMgr.AddTower(addr) require.NoError(h.t, err) } @@ -1714,7 +1714,7 @@ var clientTests = []clientTest{ h.server.addr = towerAddr // Add the new tower address to the client. - err = h.client.AddTower(towerAddr) + err = h.clientMgr.AddTower(towerAddr) require.NoError(h.t, err) // Remove the old tower address from the client. @@ -1795,11 +1795,11 @@ var clientTests = []clientTest{ require.NoError(h.t, h.server.server.Start()) // Re-add the server to the client - err = h.client.AddTower(h.server.addr) + err = h.clientMgr.AddTower(h.server.addr) require.NoError(h.t, err) // Also add the new tower address. - err = h.client.AddTower(towerAddr) + err = h.clientMgr.AddTower(towerAddr) require.NoError(h.t, err) // Assert that if the client attempts to remove the diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index b26ef3a6f..d583d6c52 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -16,6 +16,16 @@ import ( "github.com/lightningnetwork/lnd/watchtower/wtpolicy" ) +// TowerClientManager is the primary interface used by the daemon to control a +// client's lifecycle and backup revoked states. +type TowerClientManager interface { + // AddTower adds a new watchtower reachable at the given address and + // considers it for new sessions. If the watchtower already exists, then + // any new addresses included will be considered when dialing it for + // session negotiations and backups. + AddTower(*lnwire.NetAddress) error +} + // Config provides the TowerClient with access to the resources it requires to // perform its duty. All nillable fields must be non-nil for the tower to be // initialized properly. @@ -109,6 +119,8 @@ type Manager struct { clientsMu sync.Mutex } +var _ TowerClientManager = (*Manager)(nil) + // NewManager constructs a new Manager. func NewManager(config *Config) (*Manager, error) { // Copy the config to prevent side effects from modifying both the @@ -192,3 +204,42 @@ func (m *Manager) Stop() error { return returnErr } + +// AddTower adds a new watchtower reachable at the given address and considers +// it for new sessions. If the watchtower already exists, then any new addresses +// included will be considered when dialing it for session negotiations and +// backups. +func (m *Manager) AddTower(address *lnwire.NetAddress) error { + // We'll start by updating our persisted state, followed by the + // in-memory state of each client, with the new tower. This might not + // actually be a new tower, but it might include a new address at which + // it can be reached. + dbTower, err := m.cfg.DB.CreateTower(address) + if err != nil { + return err + } + + tower, err := NewTowerFromDBTower(dbTower) + if err != nil { + return err + } + + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + for blobType, client := range m.clients { + clientType, err := blobType.Identifier() + if err != nil { + return err + } + + if err := client.addTower(tower); err != nil { + return fmt.Errorf("could not add tower(%x) to the %s "+ + "tower client: %w", + tower.IdentityKey.SerializeCompressed(), + clientType, err) + } + } + + return nil +} From a5e7d35af27702b7280d4c4433f382c76ef25a71 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 13:17:05 +0200 Subject: [PATCH 04/12] wtclient+lnrpc: move RemoveTower to Manager Simiarly to the previous commit, this commit moves the RemoveTower method from the Client to the TowerClientManager interface. The manager handles any DB related handling. The manager will first attempt to remove the tower from the in-memory state of each client and then will attempt to remove the tower from the DB. If the removal from the DB fails, the manager will re-add the tower to the in-memory state of each client. --- lnrpc/wtclientrpc/wtclient.go | 7 +--- watchtower/wtclient/client.go | 65 +++++------------------------- watchtower/wtclient/client_test.go | 18 ++++----- watchtower/wtclient/manager.go | 56 +++++++++++++++++++++++++ 4 files changed, 77 insertions(+), 69 deletions(-) diff --git a/lnrpc/wtclientrpc/wtclient.go b/lnrpc/wtclientrpc/wtclient.go index ad99f0541..4275ec25a 100644 --- a/lnrpc/wtclientrpc/wtclient.go +++ b/lnrpc/wtclientrpc/wtclient.go @@ -243,12 +243,7 @@ func (c *WatchtowerClient) RemoveTower(ctx context.Context, } } - // TODO(conner): make atomic via multiplexed client - err = c.cfg.Client.RemoveTower(pubKey, addr) - if err != nil { - return nil, err - } - err = c.cfg.AnchorClient.RemoveTower(pubKey, addr) + err = c.cfg.ClientMgr.RemoveTower(pubKey, addr) if err != nil { return nil, err } diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index e1749374a..6cea916ee 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -95,13 +95,6 @@ type RegisteredTower struct { // Client is the primary interface used by the daemon to control a client's // lifecycle and backup revoked states. type Client interface { - // RemoveTower removes a watchtower from being considered for future - // session negotiations and from being used for any subsequent backups - // until it's added again. If an address is provided, then this call - // only serves as a way of removing the address from the watchtower - // instead. - RemoveTower(*btcec.PublicKey, net.Addr) error - // RegisteredTowers retrieves the list of watchtowers registered with // the client. RegisteredTowers(...wtdb.ClientSessionListOption) ([]*RegisteredTower, @@ -153,6 +146,9 @@ type newTowerMsg struct { // staleTowerMsg is an internal message we'll use within the TowerClient to // signal that a tower should no longer be considered. type staleTowerMsg struct { + // id is the unique database identifier for the tower. + id wtdb.TowerID + // pubKey is the identifying public key of the watchtower. pubKey *btcec.PublicKey @@ -1492,17 +1488,18 @@ func (c *TowerClient) handleNewTower(tower *Tower) error { return nil } -// RemoveTower removes a watchtower from being considered for future session +// removeTower removes a watchtower from being considered for future session // negotiations and from being used for any subsequent backups until it's added // again. If an address is provided, then this call only serves as a way of // removing the address from the watchtower instead. -func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey, +func (c *TowerClient) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey, addr net.Addr) error { errChan := make(chan error, 1) select { case c.staleTowers <- &staleTowerMsg{ + id: id, pubKey: pubKey, addr: addr, errChan: errChan, @@ -1524,9 +1521,8 @@ func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey, // inactive and removed as candidates. If the active session queue corresponds // to any of these sessions, a new one will be negotiated. func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { - // We'll load the tower before potentially removing it in order to - // retrieve its ID within the database. - dbTower, err := c.cfg.DB.LoadTower(msg.pubKey) + // We'll first update our in-memory state. + err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr) if err != nil { return err } @@ -1534,19 +1530,14 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { // If an address was provided, then we're only meant to remove the // address from the tower. if msg.addr != nil { - return c.removeTowerAddr(dbTower, msg.addr) + return nil } // Otherwise, the tower should no longer be used for future session - // negotiations and backups. First, we'll update our in-memory state - // with the stale tower. - err = c.candidateTowers.RemoveCandidate(dbTower.ID, nil) - if err != nil { - return err - } + // negotiations and backups. pubKey := msg.pubKey.SerializeCompressed() - sessions, err := c.cfg.DB.ListClientSessions(&dbTower.ID) + sessions, err := c.cfg.DB.ListClientSessions(&msg.id) if err != nil { return fmt.Errorf("unable to retrieve sessions for tower %x: "+ "%v", pubKey, err) @@ -1573,40 +1564,6 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { } } - // Finally, we will update our persisted state with the stale tower. - return c.cfg.DB.RemoveTower(msg.pubKey, nil) -} - -// removeTowerAddr removes the given address from the tower. -func (c *TowerClient) removeTowerAddr(tower *wtdb.Tower, addr net.Addr) error { - if addr == nil { - return fmt.Errorf("an address must be provided") - } - - // We'll first update our in-memory state followed by our persisted - // state with the stale tower. The removal of the tower address from - // the in-memory state will fail if the address is currently being used - // for a session negotiation. - err := c.candidateTowers.RemoveCandidate(tower.ID, addr) - if err != nil { - return err - } - - err = c.cfg.DB.RemoveTower(tower.IdentityKey, addr) - if err != nil { - // If the persisted state update fails, re-add the address to - // our in-memory state. - tower, newTowerErr := NewTowerFromDBTower(tower) - if newTowerErr != nil { - log.Errorf("could not create new in-memory tower: %v", - newTowerErr) - } else { - c.candidateTowers.AddCandidate(tower) - } - - return err - } - return nil } diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 6be1bf496..f1deb697a 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -766,7 +766,7 @@ func (h *testHarness) addTower(addr *lnwire.NetAddress) { func (h *testHarness) removeTower(pubKey *btcec.PublicKey, addr net.Addr) { h.t.Helper() - err := h.client.RemoveTower(pubKey, addr) + err := h.clientMgr.RemoveTower(pubKey, addr) require.NoError(h.t, err) } @@ -1718,7 +1718,7 @@ var clientTests = []clientTest{ require.NoError(h.t, err) // Remove the old tower address from the client. - err = h.client.RemoveTower( + err = h.clientMgr.RemoveTower( towerAddr.IdentityKey, oldAddr, ) require.NoError(h.t, err) @@ -1752,7 +1752,7 @@ var clientTests = []clientTest{ // negotiation with the server will be in progress, so // the client should be able to remove the server. err := wait.NoError(func() error { - return h.client.RemoveTower( + return h.clientMgr.RemoveTower( h.server.addr.IdentityKey, nil, ) }, waitTime) @@ -1807,7 +1807,7 @@ var clientTests = []clientTest{ // address currently being locked for session // negotiation. err = wait.Predicate(func() bool { - err = h.client.RemoveTower( + err = h.clientMgr.RemoveTower( h.server.addr.IdentityKey, h.server.addr.Address, ) @@ -1818,7 +1818,7 @@ var clientTests = []clientTest{ // Assert that the second address can be removed since // it is not being used for session negotiation. err = wait.NoError(func() error { - return h.client.RemoveTower( + return h.clientMgr.RemoveTower( h.server.addr.IdentityKey, towerTCPAddr, ) }, waitTime) @@ -1830,7 +1830,7 @@ var clientTests = []clientTest{ // Assert that the client can now remove the first // address. err = wait.NoError(func() error { - return h.client.RemoveTower( + return h.clientMgr.RemoveTower( h.server.addr.IdentityKey, nil, ) }, waitTime) @@ -2223,7 +2223,7 @@ var clientTests = []clientTest{ // Now we can remove the old one. err := wait.Predicate(func() bool { - err := h.client.RemoveTower( + err := h.clientMgr.RemoveTower( h.server.addr.IdentityKey, nil, ) @@ -2309,7 +2309,7 @@ var clientTests = []clientTest{ require.NoError(h.t, err) // Now remove the tower. - err = h.client.RemoveTower( + err = h.clientMgr.RemoveTower( h.server.addr.IdentityKey, nil, ) require.NoError(h.t, err) @@ -2400,7 +2400,7 @@ var clientTests = []clientTest{ h.startClient() // Now remove the tower. - err = h.client.RemoveTower( + err = h.clientMgr.RemoveTower( h.server.addr.IdentityKey, nil, ) require.NoError(h.t, err) diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index d583d6c52..b28268f62 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -2,9 +2,11 @@ package wtclient import ( "fmt" + "net" "sync" "time" + "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -24,6 +26,13 @@ type TowerClientManager interface { // any new addresses included will be considered when dialing it for // session negotiations and backups. AddTower(*lnwire.NetAddress) error + + // RemoveTower removes a watchtower from being considered for future + // session negotiations and from being used for any subsequent backups + // until it's added again. If an address is provided, then this call + // only serves as a way of removing the address from the watchtower + // instead. + RemoveTower(*btcec.PublicKey, net.Addr) error } // Config provides the TowerClient with access to the resources it requires to @@ -243,3 +252,50 @@ func (m *Manager) AddTower(address *lnwire.NetAddress) error { return nil } + +// RemoveTower removes a watchtower from being considered for future session +// negotiations and from being used for any subsequent backups until it's added +// again. If an address is provided, then this call only serves as a way of +// removing the address from the watchtower instead. +func (m *Manager) RemoveTower(key *btcec.PublicKey, addr net.Addr) error { + // We'll load the tower before potentially removing it in order to + // retrieve its ID within the database. + dbTower, err := m.cfg.DB.LoadTower(key) + if err != nil { + return err + } + + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + for _, client := range m.clients { + err := client.removeTower(dbTower.ID, key, addr) + if err != nil { + return err + } + } + + if err := m.cfg.DB.RemoveTower(key, addr); err != nil { + // If the persisted state update fails, re-add the address to + // our client's in-memory state. + tower, newTowerErr := NewTowerFromDBTower(dbTower) + if newTowerErr != nil { + log.Errorf("could not create new in-memory tower: %v", + newTowerErr) + + return err + } + + for _, client := range m.clients { + addTowerErr := client.addTower(tower) + if addTowerErr != nil { + log.Errorf("could not re-add tower: %v", + addTowerErr) + } + } + + return err + } + + return nil +} From f38b5cf258d5656a05e621bc70848e5898143e4f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 13:54:28 +0200 Subject: [PATCH 05/12] lnrpc+wtclient: refactor ClientStats This commit removes the mutex from ClientStats and instead puts that in clientStats which wraps ClientStats with a mutex. This is so that the tower client interface can return a ClientStats struct without worrying about copying a mutex. --- lnrpc/wtclientrpc/wtclient.go | 10 +++---- watchtower/wtclient/client.go | 6 ++--- watchtower/wtclient/stats.go | 49 ++++++++++++++++++++--------------- 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/lnrpc/wtclientrpc/wtclient.go b/lnrpc/wtclientrpc/wtclient.go index 4275ec25a..756eec2bf 100644 --- a/lnrpc/wtclientrpc/wtclient.go +++ b/lnrpc/wtclientrpc/wtclient.go @@ -413,8 +413,8 @@ func constructFunctionalOptions(includeSessions, } // Stats returns the in-memory statistics of the client since startup. -func (c *WatchtowerClient) Stats(ctx context.Context, - req *StatsRequest) (*StatsResponse, error) { +func (c *WatchtowerClient) Stats(_ context.Context, + _ *StatsRequest) (*StatsResponse, error) { if err := c.isActive(); err != nil { return nil, err @@ -426,11 +426,7 @@ func (c *WatchtowerClient) Stats(ctx context.Context, } var stats wtclient.ClientStats - for i := range clientStats { - // Grab a reference to the slice index rather than copying bc - // ClientStats contains a lock which cannot be copied by value. - stat := &clientStats[i] - + for _, stat := range clientStats { stats.NumTasksAccepted += stat.NumTasksAccepted stats.NumTasksIneligible += stat.NumTasksIneligible stats.NumTasksPending += stat.NumTasksPending diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 6cea916ee..5714c382c 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -200,7 +200,7 @@ type TowerClient struct { chanInfos wtdb.ChannelInfos statTicker *time.Ticker - stats *ClientStats + stats *clientStats newTowers chan *newTowerMsg staleTowers chan *staleTowerMsg @@ -245,7 +245,7 @@ func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { chanInfos: chanInfos, closableSessionQueue: newSessionCloseMinHeap(), statTicker: time.NewTicker(DefaultStatInterval), - stats: new(ClientStats), + stats: new(clientStats), newTowers: make(chan *newTowerMsg), staleTowers: make(chan *staleTowerMsg), quit: make(chan struct{}), @@ -1637,7 +1637,7 @@ func (c *TowerClient) LookupTower(pubKey *btcec.PublicKey, // Stats returns the in-memory statistics of the client since startup. func (c *TowerClient) Stats() ClientStats { - return c.stats.Copy() + return c.stats.getStatsCopy() } // Policy returns the active client policy configuration. diff --git a/watchtower/wtclient/stats.go b/watchtower/wtclient/stats.go index b41783ff3..35303e925 100644 --- a/watchtower/wtclient/stats.go +++ b/watchtower/wtclient/stats.go @@ -8,8 +8,6 @@ import ( // ClientStats is a collection of in-memory statistics of the actions the client // has performed since its creation. type ClientStats struct { - mu sync.Mutex - // NumTasksPending is the total number of backups that are pending to // be acknowledged by all active and exhausted watchtower sessions. NumTasksPending int @@ -31,68 +29,77 @@ type ClientStats struct { NumSessionsExhausted int } +// clientStats wraps ClientStats with a mutex so that it's members can be +// accessed in a thread safe manner. +type clientStats struct { + mu sync.Mutex + + ClientStats +} + // taskReceived increments the number of backup requests the client has received // from active channels. -func (s *ClientStats) taskReceived() { +func (s *clientStats) taskReceived() { s.mu.Lock() defer s.mu.Unlock() + s.NumTasksPending++ } // taskAccepted increments the number of tasks that have been assigned to active // session queues, and are awaiting upload to a tower. -func (s *ClientStats) taskAccepted() { +func (s *clientStats) taskAccepted() { s.mu.Lock() defer s.mu.Unlock() + s.NumTasksAccepted++ s.NumTasksPending-- } +// getStatsCopy returns a copy of the ClientStats. +func (s *clientStats) getStatsCopy() ClientStats { + s.mu.Lock() + defer s.mu.Unlock() + + return s.ClientStats +} + // taskIneligible increments the number of tasks that were unable to satisfy the // active session queue's policy. These can potentially be retried later, but // typically this means that the balance created dust outputs, so it may not be // worth backing up at all. -func (s *ClientStats) taskIneligible() { +func (s *clientStats) taskIneligible() { s.mu.Lock() defer s.mu.Unlock() + s.NumTasksIneligible++ } // sessionAcquired increments the number of sessions that have been successfully // negotiated by the client during this execution. -func (s *ClientStats) sessionAcquired() { +func (s *clientStats) sessionAcquired() { s.mu.Lock() defer s.mu.Unlock() + s.NumSessionsAcquired++ } // sessionExhausted increments the number of session that have become full as a // result of accepting backup tasks. -func (s *ClientStats) sessionExhausted() { +func (s *clientStats) sessionExhausted() { s.mu.Lock() defer s.mu.Unlock() + s.NumSessionsExhausted++ } // String returns a human-readable summary of the client's metrics. -func (s *ClientStats) String() string { +func (s *clientStats) String() string { s.mu.Lock() defer s.mu.Unlock() + return fmt.Sprintf("tasks(received=%d accepted=%d ineligible=%d) "+ "sessions(acquired=%d exhausted=%d)", s.NumTasksPending, s.NumTasksAccepted, s.NumTasksIneligible, s.NumSessionsAcquired, s.NumSessionsExhausted) } - -// Copy returns a copy of the current stats. -func (s *ClientStats) Copy() ClientStats { - s.mu.Lock() - defer s.mu.Unlock() - return ClientStats{ - NumTasksPending: s.NumTasksPending, - NumTasksAccepted: s.NumTasksAccepted, - NumTasksIneligible: s.NumTasksIneligible, - NumSessionsAcquired: s.NumSessionsAcquired, - NumSessionsExhausted: s.NumSessionsExhausted, - } -} From 4348f2062ab2b7e1e4363a487ec0fc4d89c944ee Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 14:03:14 +0200 Subject: [PATCH 06/12] wtclient+lnrpc: move Stats to Manager Move the `Stats` method from the `Client` to the `Manager` interface. --- lnrpc/wtclientrpc/wtclient.go | 14 +------------- watchtower/wtclient/client.go | 7 ++----- watchtower/wtclient/manager.go | 22 ++++++++++++++++++++++ 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/lnrpc/wtclientrpc/wtclient.go b/lnrpc/wtclientrpc/wtclient.go index 756eec2bf..a6b1fa0c0 100644 --- a/lnrpc/wtclientrpc/wtclient.go +++ b/lnrpc/wtclientrpc/wtclient.go @@ -420,19 +420,7 @@ func (c *WatchtowerClient) Stats(_ context.Context, return nil, err } - clientStats := []wtclient.ClientStats{ - c.cfg.Client.Stats(), - c.cfg.AnchorClient.Stats(), - } - - var stats wtclient.ClientStats - for _, stat := range clientStats { - stats.NumTasksAccepted += stat.NumTasksAccepted - stats.NumTasksIneligible += stat.NumTasksIneligible - stats.NumTasksPending += stat.NumTasksPending - stats.NumSessionsAcquired += stat.NumSessionsAcquired - stats.NumSessionsExhausted += stat.NumSessionsExhausted - } + stats := c.cfg.ClientMgr.Stats() return &StatsResponse{ NumBackups: uint32(stats.NumTasksAccepted), diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 5714c382c..d7d3ef900 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -104,9 +104,6 @@ type Client interface { LookupTower(*btcec.PublicKey, ...wtdb.ClientSessionListOption) (*RegisteredTower, error) - // Stats returns the in-memory statistics of the client since startup. - Stats() ClientStats - // Policy returns the active client policy configuration. Policy() wtpolicy.Policy @@ -1635,8 +1632,8 @@ func (c *TowerClient) LookupTower(pubKey *btcec.PublicKey, }, nil } -// Stats returns the in-memory statistics of the client since startup. -func (c *TowerClient) Stats() ClientStats { +// getStats returns the in-memory statistics of the client since startup. +func (c *TowerClient) getStats() ClientStats { return c.stats.getStatsCopy() } diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index b28268f62..ac28cd21b 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -33,6 +33,9 @@ type TowerClientManager interface { // only serves as a way of removing the address from the watchtower // instead. RemoveTower(*btcec.PublicKey, net.Addr) error + + // Stats returns the in-memory statistics of the client since startup. + Stats() ClientStats } // Config provides the TowerClient with access to the resources it requires to @@ -299,3 +302,22 @@ func (m *Manager) RemoveTower(key *btcec.PublicKey, addr net.Addr) error { return nil } + +// Stats returns the in-memory statistics of the clients managed by the Manager +// since startup. +func (m *Manager) Stats() ClientStats { + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + var resp ClientStats + for _, client := range m.clients { + stats := client.getStats() + resp.NumTasksAccepted += stats.NumTasksAccepted + resp.NumTasksIneligible += stats.NumTasksIneligible + resp.NumTasksPending += stats.NumTasksPending + resp.NumSessionsAcquired += stats.NumSessionsAcquired + resp.NumSessionsExhausted += stats.NumSessionsExhausted + } + + return resp +} From 0b3d751e33cebf63c8116c889d554e3749a69ab5 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 16 May 2023 14:07:58 +0200 Subject: [PATCH 07/12] wtclient+lnrpc: move RegisteredTowers to Manager Move the `RegisteredTowers` method from the `Client` to the `Manager` interface. --- lnrpc/wtclientrpc/wtclient.go | 54 ++++++++++++++++------------------ watchtower/wtclient/client.go | 23 +++++---------- watchtower/wtclient/manager.go | 33 +++++++++++++++++++++ 3 files changed, 65 insertions(+), 45 deletions(-) diff --git a/lnrpc/wtclientrpc/wtclient.go b/lnrpc/wtclientrpc/wtclient.go index a6b1fa0c0..9ba6fce6c 100644 --- a/lnrpc/wtclientrpc/wtclient.go +++ b/lnrpc/wtclientrpc/wtclient.go @@ -263,23 +263,7 @@ func (c *WatchtowerClient) ListTowers(ctx context.Context, req.IncludeSessions, req.ExcludeExhaustedSessions, ) - anchorTowers, err := c.cfg.AnchorClient.RegisteredTowers(opts...) - if err != nil { - return nil, err - } - - // Collect all the anchor client towers. - rpcTowers := make(map[wtdb.TowerID]*Tower) - for _, tower := range anchorTowers { - rpcTower := marshallTower( - tower, PolicyType_ANCHOR, req.IncludeSessions, - ackCounts, committedUpdateCounts, - ) - - rpcTowers[tower.ID] = rpcTower - } - - legacyTowers, err := c.cfg.Client.RegisteredTowers(opts...) + towersPerBlobType, err := c.cfg.ClientMgr.RegisteredTowers(opts...) if err != nil { return nil, err } @@ -287,20 +271,32 @@ func (c *WatchtowerClient) ListTowers(ctx context.Context, // Collect all the legacy client towers. If it has any of the same // towers that the anchors client has, then just add the session info // for the legacy client to the existing tower. - for _, tower := range legacyTowers { - rpcTower := marshallTower( - tower, PolicyType_LEGACY, req.IncludeSessions, - ackCounts, committedUpdateCounts, - ) - - t, ok := rpcTowers[tower.ID] - if !ok { - rpcTowers[tower.ID] = rpcTower - continue + rpcTowers := make(map[wtdb.TowerID]*Tower) + for blobType, towers := range towersPerBlobType { + policyType := PolicyType_LEGACY + if blobType.IsAnchorChannel() { + policyType = PolicyType_ANCHOR } - t.SessionInfo = append(t.SessionInfo, rpcTower.SessionInfo...) - t.Sessions = append(t.Sessions, rpcTower.Sessions...) + for _, tower := range towers { + rpcTower := marshallTower( + tower, policyType, req.IncludeSessions, + ackCounts, committedUpdateCounts, + ) + + t, ok := rpcTowers[tower.ID] + if !ok { + rpcTowers[tower.ID] = rpcTower + continue + } + + t.SessionInfo = append( + t.SessionInfo, rpcTower.SessionInfo..., + ) + t.Sessions = append( + t.Sessions, rpcTower.Sessions..., + ) + } } towers := make([]*Tower, 0, len(rpcTowers)) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index d7d3ef900..e9b1c6dff 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -95,11 +95,6 @@ type RegisteredTower struct { // Client is the primary interface used by the daemon to control a client's // lifecycle and backup revoked states. type Client interface { - // RegisteredTowers retrieves the list of watchtowers registered with - // the client. - RegisteredTowers(...wtdb.ClientSessionListOption) ([]*RegisteredTower, - error) - // LookupTower retrieves a registered watchtower through its public key. LookupTower(*btcec.PublicKey, ...wtdb.ClientSessionListOption) (*RegisteredTower, error) @@ -1564,17 +1559,13 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { return nil } -// RegisteredTowers retrieves the list of watchtowers registered with the +// registeredTowers retrieves the list of watchtowers registered with the // client. -func (c *TowerClient) RegisteredTowers(opts ...wtdb.ClientSessionListOption) ( - []*RegisteredTower, error) { - - // Retrieve all of our towers along with all of our sessions. - towers, err := c.cfg.DB.ListTowers() - if err != nil { - return nil, err - } +func (c *TowerClient) registeredTowers(towers []*wtdb.Tower, + opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) { + // Generate a filter that will fetch all the client's sessions + // regardless of if they are active or not. opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false))) clientSessions, err := c.cfg.DB.ListClientSessions(nil, opts...) @@ -1582,8 +1573,8 @@ func (c *TowerClient) RegisteredTowers(opts ...wtdb.ClientSessionListOption) ( return nil, err } - // Construct a lookup map that coalesces all of the sessions for a - // specific watchtower. + // Construct a lookup map that coalesces all the sessions for a specific + // watchtower. towerSessions := make( map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession, ) diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index ac28cd21b..c0d501e6f 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -15,6 +15,7 @@ import ( "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/tor" "github.com/lightningnetwork/lnd/watchtower/blob" + "github.com/lightningnetwork/lnd/watchtower/wtdb" "github.com/lightningnetwork/lnd/watchtower/wtpolicy" ) @@ -36,6 +37,12 @@ type TowerClientManager interface { // Stats returns the in-memory statistics of the client since startup. Stats() ClientStats + + // RegisteredTowers retrieves the list of watchtowers registered with + // the client. It returns a set of registered towers per client policy + // type. + RegisteredTowers(opts ...wtdb.ClientSessionListOption) ( + map[blob.Type][]*RegisteredTower, error) } // Config provides the TowerClient with access to the resources it requires to @@ -321,3 +328,29 @@ func (m *Manager) Stats() ClientStats { return resp } + +// RegisteredTowers retrieves the list of watchtowers being used by the various +// clients. +func (m *Manager) RegisteredTowers(opts ...wtdb.ClientSessionListOption) ( + map[blob.Type][]*RegisteredTower, error) { + + towers, err := m.cfg.DB.ListTowers() + if err != nil { + return nil, err + } + + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + resp := make(map[blob.Type][]*RegisteredTower) + for _, client := range m.clients { + towers, err := client.registeredTowers(towers, opts...) + if err != nil { + return nil, err + } + + resp[client.Policy().BlobType] = towers + } + + return resp, nil +} From 4e51bf3a3ff0cc6697cb0e8eb4f763f6479c09c8 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 16 May 2023 14:11:43 +0200 Subject: [PATCH 08/12] wtclient+lnrpc: move LookupTower to Manager --- lnrpc/wtclientrpc/wtclient.go | 56 ++++++++++++++++++---------------- watchtower/wtclient/client.go | 14 ++------- watchtower/wtclient/manager.go | 30 ++++++++++++++++++ 3 files changed, 62 insertions(+), 38 deletions(-) diff --git a/lnrpc/wtclientrpc/wtclient.go b/lnrpc/wtclientrpc/wtclient.go index 9ba6fce6c..9ee94977b 100644 --- a/lnrpc/wtclientrpc/wtclient.go +++ b/lnrpc/wtclientrpc/wtclient.go @@ -324,40 +324,42 @@ func (c *WatchtowerClient) GetTowerInfo(ctx context.Context, req.IncludeSessions, req.ExcludeExhaustedSessions, ) - // Get the tower and its sessions from anchors client. - tower, err := c.cfg.AnchorClient.LookupTower(pubKey, opts...) - if err != nil { - return nil, err - } - rpcTower := marshallTower( - tower, PolicyType_ANCHOR, req.IncludeSessions, ackCounts, - committedUpdateCounts, - ) - - // Get the tower and its sessions from legacy client. - tower, err = c.cfg.Client.LookupTower(pubKey, opts...) + towersPerBlobType, err := c.cfg.ClientMgr.LookupTower(pubKey, opts...) if err != nil { return nil, err } - rpcLegacyTower := marshallTower( - tower, PolicyType_LEGACY, req.IncludeSessions, ackCounts, - committedUpdateCounts, - ) + var resTower *Tower + for blobType, tower := range towersPerBlobType { + policyType := PolicyType_LEGACY + if blobType.IsAnchorChannel() { + policyType = PolicyType_ANCHOR + } - if !bytes.Equal(rpcTower.Pubkey, rpcLegacyTower.Pubkey) { - return nil, fmt.Errorf("legacy and anchor clients returned " + - "inconsistent results for the given tower") + rpcTower := marshallTower( + tower, policyType, req.IncludeSessions, + ackCounts, committedUpdateCounts, + ) + + if resTower == nil { + resTower = rpcTower + continue + } + + if !bytes.Equal(rpcTower.Pubkey, resTower.Pubkey) { + return nil, fmt.Errorf("tower clients returned " + + "inconsistent results for the given tower") + } + + resTower.SessionInfo = append( + resTower.SessionInfo, rpcTower.SessionInfo..., + ) + resTower.Sessions = append( + resTower.Sessions, rpcTower.Sessions..., + ) } - rpcTower.SessionInfo = append( - rpcTower.SessionInfo, rpcLegacyTower.SessionInfo..., - ) - rpcTower.Sessions = append( - rpcTower.Sessions, rpcLegacyTower.Sessions..., - ) - - return rpcTower, nil + return resTower, nil } // constructFunctionalOptions is a helper function that constructs a list of diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index e9b1c6dff..d68be1afd 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -95,10 +95,6 @@ type RegisteredTower struct { // Client is the primary interface used by the daemon to control a client's // lifecycle and backup revoked states. type Client interface { - // LookupTower retrieves a registered watchtower through its public key. - LookupTower(*btcec.PublicKey, - ...wtdb.ClientSessionListOption) (*RegisteredTower, error) - // Policy returns the active client policy configuration. Policy() wtpolicy.Policy @@ -1600,15 +1596,11 @@ func (c *TowerClient) registeredTowers(towers []*wtdb.Tower, return registeredTowers, nil } -// LookupTower retrieves a registered watchtower through its public key. -func (c *TowerClient) LookupTower(pubKey *btcec.PublicKey, +// lookupTower retrieves the info of sessions held with the given tower handled +// by this client. +func (c *TowerClient) lookupTower(tower *wtdb.Tower, opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) { - tower, err := c.cfg.DB.LoadTower(pubKey) - if err != nil { - return nil, err - } - opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false))) towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID, opts...) diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index c0d501e6f..ba2cdb27d 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -43,6 +43,10 @@ type TowerClientManager interface { // type. RegisteredTowers(opts ...wtdb.ClientSessionListOption) ( map[blob.Type][]*RegisteredTower, error) + + // LookupTower retrieves a registered watchtower through its public key. + LookupTower(*btcec.PublicKey, ...wtdb.ClientSessionListOption) ( + map[blob.Type]*RegisteredTower, error) } // Config provides the TowerClient with access to the resources it requires to @@ -354,3 +358,29 @@ func (m *Manager) RegisteredTowers(opts ...wtdb.ClientSessionListOption) ( return resp, nil } + +// LookupTower retrieves a registered watchtower through its public key. +func (m *Manager) LookupTower(key *btcec.PublicKey, + opts ...wtdb.ClientSessionListOption) (map[blob.Type]*RegisteredTower, + error) { + + tower, err := m.cfg.DB.LoadTower(key) + if err != nil { + return nil, err + } + + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + resp := make(map[blob.Type]*RegisteredTower) + for _, client := range m.clients { + tower, err := client.lookupTower(tower, opts...) + if err != nil { + return nil, err + } + + resp[client.Policy().BlobType] = tower + } + + return resp, nil +} From ab2f781b4ad97d63b39e7b411d03c4f40140dd91 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 16 May 2023 14:15:09 +0200 Subject: [PATCH 09/12] wtclient+lnrpc: move Policy to Manager --- lnrpc/wtclientrpc/wtclient.go | 13 +++++++++---- watchtower/wtclient/client.go | 7 ++----- watchtower/wtclient/manager.go | 22 ++++++++++++++++++++-- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/lnrpc/wtclientrpc/wtclient.go b/lnrpc/wtclientrpc/wtclient.go index 9ee94977b..bf7f9a1da 100644 --- a/lnrpc/wtclientrpc/wtclient.go +++ b/lnrpc/wtclientrpc/wtclient.go @@ -16,9 +16,9 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/watchtower" + "github.com/lightningnetwork/lnd/watchtower/blob" "github.com/lightningnetwork/lnd/watchtower/wtclient" "github.com/lightningnetwork/lnd/watchtower/wtdb" - "github.com/lightningnetwork/lnd/watchtower/wtpolicy" "google.golang.org/grpc" "gopkg.in/macaroon-bakery.v2/bakery" ) @@ -437,17 +437,22 @@ func (c *WatchtowerClient) Policy(ctx context.Context, return nil, err } - var policy wtpolicy.Policy + var blobType blob.Type switch req.PolicyType { case PolicyType_LEGACY: - policy = c.cfg.Client.Policy() + blobType = blob.TypeAltruistCommit case PolicyType_ANCHOR: - policy = c.cfg.AnchorClient.Policy() + blobType = blob.TypeAltruistAnchorCommit default: return nil, fmt.Errorf("unknown policy type: %v", req.PolicyType) } + policy, err := c.cfg.ClientMgr.Policy(blobType) + if err != nil { + return nil, err + } + return &PolicyResponse{ MaxUpdates: uint32(policy.MaxUpdates), SweepSatPerVbyte: uint32(policy.SweepFeeRate.FeePerVByte()), diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index d68be1afd..871960712 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -95,9 +95,6 @@ type RegisteredTower struct { // Client is the primary interface used by the daemon to control a client's // lifecycle and backup revoked states. type Client interface { - // Policy returns the active client policy configuration. - Policy() wtpolicy.Policy - // RegisterChannel persistently initializes any channel-dependent // parameters within the client. This should be called during link // startup to ensure that the client is able to support the link during @@ -1620,8 +1617,8 @@ func (c *TowerClient) getStats() ClientStats { return c.stats.getStatsCopy() } -// Policy returns the active client policy configuration. -func (c *TowerClient) Policy() wtpolicy.Policy { +// policy returns the active client policy configuration. +func (c *TowerClient) policy() wtpolicy.Policy { return c.cfg.Policy } diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index ba2cdb27d..61386548e 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -38,6 +38,9 @@ type TowerClientManager interface { // Stats returns the in-memory statistics of the client since startup. Stats() ClientStats + // Policy returns the active client policy configuration. + Policy(blob.Type) (wtpolicy.Policy, error) + // RegisteredTowers retrieves the list of watchtowers registered with // the client. It returns a set of registered towers per client policy // type. @@ -353,7 +356,7 @@ func (m *Manager) RegisteredTowers(opts ...wtdb.ClientSessionListOption) ( return nil, err } - resp[client.Policy().BlobType] = towers + resp[client.policy().BlobType] = towers } return resp, nil @@ -379,8 +382,23 @@ func (m *Manager) LookupTower(key *btcec.PublicKey, return nil, err } - resp[client.Policy().BlobType] = tower + resp[client.policy().BlobType] = tower } return resp, nil } + +// Policy returns the active client policy configuration for the client using +// the given blob type. +func (m *Manager) Policy(blobType blob.Type) (wtpolicy.Policy, error) { + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + var policy wtpolicy.Policy + client, ok := m.clients[blobType] + if !ok { + return policy, fmt.Errorf("no client for the given blob type") + } + + return client.policy(), nil +} From fcfdf699e3aea3688a396d2ef4fc17a843042409 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 16 May 2023 15:22:45 +0200 Subject: [PATCH 10/12] multi: move BackupState and RegisterChannel to Manager This commit moves over the last two methods, `RegisterChannel` and `BackupState` from the `Client` to the `Manager` interface. With this change, we no longer need to pass around the individual clients around and now only need to pass the manager around. To do this change, all the goroutines that handle channel closes, closable sessions needed to be moved to the Manager and so a large part of this commit is just moving this code from the TowerClient to the Manager. --- htlcswitch/interfaces.go | 2 +- htlcswitch/link.go | 4 +- lnrpc/wtclientrpc/config.go | 8 - peer/brontide.go | 22 +- rpcserver.go | 9 +- server.go | 24 +- subrpcserver_config.go | 12 +- watchtower/wtclient/client.go | 432 ++----------------------- watchtower/wtclient/client_test.go | 15 +- watchtower/wtclient/manager.go | 494 ++++++++++++++++++++++++++++- 10 files changed, 551 insertions(+), 471 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 1ce4ffb2e..92e071f13 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -250,7 +250,7 @@ type TowerClient interface { // parameters within the client. This should be called during link // startup to ensure that the client is able to support the link during // operation. - RegisterChannel(lnwire.ChannelID) error + RegisterChannel(lnwire.ChannelID, channeldb.ChannelType) error // BackupState initiates a request to back up a particular revoked // state. If the method returns nil, the backup is guaranteed to be diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 2a1e49923..7cd37ac35 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -416,7 +416,9 @@ func (l *channelLink) Start() error { // If the config supplied watchtower client, ensure the channel is // registered before trying to use it during operation. if l.cfg.TowerClient != nil { - err := l.cfg.TowerClient.RegisterChannel(l.ChanID()) + err := l.cfg.TowerClient.RegisterChannel( + l.ChanID(), l.channel.State().ChanType, + ) if err != nil { return err } diff --git a/lnrpc/wtclientrpc/config.go b/lnrpc/wtclientrpc/config.go index ed0401b13..58566c19a 100644 --- a/lnrpc/wtclientrpc/config.go +++ b/lnrpc/wtclientrpc/config.go @@ -15,14 +15,6 @@ type Config struct { // Active indicates if the watchtower client is enabled. Active bool - // Client is the backing watchtower client that we'll interact with - // through the watchtower RPC subserver. - Client wtclient.Client - - // AnchorClient is the backing watchtower client for anchor channels that - // we'll interact through the watchtower RPC subserver. - AnchorClient wtclient.Client - // ClientMgr is a tower client manager that manages a set of tower // clients. ClientMgr wtclient.TowerClientManager diff --git a/peer/brontide.go b/peer/brontide.go index d7d93f4fa..1751ef5ac 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -268,12 +268,8 @@ type Config struct { // HtlcNotifier is used when creating a ChannelLink. HtlcNotifier *htlcswitch.HtlcNotifier - // TowerClient is used by legacy channels to backup revoked states. - TowerClient wtclient.Client - - // AnchorTowerClient is used by anchor channels to backup revoked - // states. - AnchorTowerClient wtclient.Client + // TowerClient is used to backup revoked states. + TowerClient wtclient.TowerClientManager // DisconnectPeer is used to disconnect this peer if the cooperative close // process fails. @@ -1040,14 +1036,8 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update) } - chanType := lnChan.State().ChanType - - // Select the appropriate tower client based on the channel type. It's - // okay if the clients are disabled altogether and these values are nil, - // as the link will check for nilness before using either. - var towerClient htlcswitch.TowerClient - switch { - case chanType.IsTaproot(): + var towerClient wtclient.TowerClientManager + if lnChan.ChanType().IsTaproot() { // Leave the tower client as nil for now until the tower client // has support for taproot channels. // @@ -1060,9 +1050,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, "are not yet taproot channel compatible", chanPoint) } - case chanType.HasAnchors(): - towerClient = p.cfg.AnchorTowerClient - default: + } else { towerClient = p.cfg.TowerClient } diff --git a/rpcserver.go b/rpcserver.go index 4685b731b..8062c4fdc 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -743,11 +743,10 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, r.cfg, s.cc, r.cfg.networkDir, macService, atpl, invoiceRegistry, s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter, routerBackend, s.nodeSigner, s.graphDB, s.chanStateDB, - s.sweeper, tower, s.towerClient, s.anchorTowerClient, - s.towerClientMgr, r.cfg.net.ResolveTCPAddr, genInvoiceFeatures, - genAmpInvoiceFeatures, s.getNodeAnnouncement, - s.updateAndBrodcastSelfNode, parseAddr, rpcsLog, - s.aliasMgr.GetPeerAlias, + s.sweeper, tower, s.towerClientMgr, r.cfg.net.ResolveTCPAddr, + genInvoiceFeatures, genAmpInvoiceFeatures, + s.getNodeAnnouncement, s.updateAndBrodcastSelfNode, parseAddr, + rpcsLog, s.aliasMgr.GetPeerAlias, ) if err != nil { return err diff --git a/server.go b/server.go index b20c692ce..076802606 100644 --- a/server.go +++ b/server.go @@ -284,10 +284,6 @@ type server struct { towerClientMgr *wtclient.Manager - towerClient wtclient.Client - - anchorTowerClient wtclient.Client - connMgr *connmgr.ConnManager sigPool *lnwallet.SigPool @@ -1577,7 +1573,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } // Register a legacy tower client. - s.towerClient, err = s.towerClientMgr.NewClient(policy) + _, err = s.towerClientMgr.NewClient(policy) if err != nil { return nil, err } @@ -1589,9 +1585,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, blob.Type(blob.FlagAnchorChannel) // Register an anchors tower client. - s.anchorTowerClient, err = s.towerClientMgr.NewClient( - anchorPolicy, - ) + _, err = s.towerClientMgr.NewClient(anchorPolicy) if err != nil { return nil, err } @@ -3782,6 +3776,17 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, } } + // If we directly set the peer.Config TowerClient member to the + // s.towerClientMgr then in the case that the s.towerClientMgr is nil, + // the peer.Config's TowerClient member will not evaluate to nil even + // though the underlying value is nil. To avoid this gotcha which can + // cause a panic, we need to explicitly pass nil to the peer.Config's + // TowerClient if needed. + var towerClient wtclient.TowerClientManager + if s.towerClientMgr != nil { + towerClient = s.towerClientMgr + } + // Now that we've established a connection, create a peer, and it to the // set of currently active peers. Configure the peer with the incoming // and outgoing broadcast deltas to prevent htlcs from being accepted or @@ -3820,8 +3825,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, Invoices: s.invoices, ChannelNotifier: s.channelNotifier, HtlcNotifier: s.htlcNotifier, - TowerClient: s.towerClient, - AnchorTowerClient: s.anchorTowerClient, + TowerClient: towerClient, DisconnectPeer: s.DisconnectPeer, GenNodeAnnouncement: func(...netann.NodeAnnModifier) ( lnwire.NodeAnnouncement, error) { diff --git a/subrpcserver_config.go b/subrpcserver_config.go index 27bef9f92..175515de3 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -113,8 +113,6 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, chanStateDB *channeldb.ChannelStateDB, sweeper *sweep.UtxoSweeper, tower *watchtower.Standalone, - towerClient wtclient.Client, - anchorTowerClient wtclient.Client, towerClientMgr wtclient.TowerClientManager, tcpResolver lncfg.TCPResolver, genInvoiceFeatures func() *lnwire.FeatureVector, @@ -288,15 +286,9 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, case *wtclientrpc.Config: subCfgValue := extractReflectValue(subCfg) - if towerClient != nil && anchorTowerClient != nil { + if towerClientMgr != nil { subCfgValue.FieldByName("Active").Set( - reflect.ValueOf(towerClient != nil), - ) - subCfgValue.FieldByName("Client").Set( - reflect.ValueOf(towerClient), - ) - subCfgValue.FieldByName("AnchorClient").Set( - reflect.ValueOf(anchorTowerClient), + reflect.ValueOf(towerClientMgr != nil), ) subCfgValue.FieldByName("ClientMgr").Set( reflect.ValueOf(towerClientMgr), diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 871960712..bacc45ab1 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -13,14 +13,10 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/build" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/channelnotifier" - "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/watchtower/wtdb" "github.com/lightningnetwork/lnd/watchtower/wtpolicy" "github.com/lightningnetwork/lnd/watchtower/wtserver" @@ -57,9 +53,7 @@ func (c *TowerClient) genSessionFilter( activeOnly bool) wtdb.ClientSessionFilterFn { return func(session *wtdb.ClientSession) bool { - if c.cfg.Policy.IsAnchorChannel() != - session.Policy.IsAnchorChannel() { - + if c.cfg.Policy.TxPolicy != session.Policy.TxPolicy { return false } @@ -92,22 +86,6 @@ type RegisteredTower struct { ActiveSessionCandidate bool } -// Client is the primary interface used by the daemon to control a client's -// lifecycle and backup revoked states. -type Client interface { - // RegisterChannel persistently initializes any channel-dependent - // parameters within the client. This should be called during link - // startup to ensure that the client is able to support the link during - // operation. - RegisterChannel(lnwire.ChannelID) error - - // BackupState initiates a request to back up a particular revoked - // state. If the method returns nil, the backup is guaranteed to be - // successful unless the justice transaction would create dust outputs - // when trying to abide by the negotiated policy. - BackupState(chanID *lnwire.ChannelID, stateNum uint64) error -} - // BreachRetributionBuilder is a function that can be used to construct a // BreachRetribution from a channel ID and a commitment height. type BreachRetributionBuilder func(id lnwire.ChannelID, @@ -159,6 +137,8 @@ type towerClientCfg struct { // sessions recorded in the database, those sessions will be ignored and // new sessions will be requested immediately. Policy wtpolicy.Policy + + getSweepScript func(lnwire.ChannelID) ([]byte, bool) } // TowerClient is a concrete implementation of the Client interface, offering a @@ -179,11 +159,6 @@ type TowerClient struct { sessionQueue *sessionQueue prevTask *wtdb.BackupID - closableSessionQueue *sessionCloseMinHeap - - backupMu sync.Mutex - chanInfos wtdb.ChannelInfos - statTicker *time.Ticker stats *clientStats @@ -194,10 +169,6 @@ type TowerClient struct { quit chan struct{} } -// Compile-time constraint to ensure *TowerClient implements the Client -// interface. -var _ Client = (*TowerClient)(nil) - // newTowerClient initializes a new TowerClient from the provided // towerClientCfg. An error is returned if the client could not be initialized. func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { @@ -209,11 +180,6 @@ func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { plog := build.NewPrefixLog(prefix, log) - chanInfos, err := cfg.DB.FetchChanInfos() - if err != nil { - return nil, err - } - queueDB := cfg.DB.GetDBQueue([]byte(identifier)) queue, err := NewDiskOverflowQueue[*wtdb.BackupID]( queueDB, cfg.MaxTasksInMemQueue, plog, @@ -223,17 +189,15 @@ func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { } c := &TowerClient{ - cfg: cfg, - log: plog, - pipeline: queue, - activeSessions: newSessionQueueSet(), - chanInfos: chanInfos, - closableSessionQueue: newSessionCloseMinHeap(), - statTicker: time.NewTicker(DefaultStatInterval), - stats: new(clientStats), - newTowers: make(chan *newTowerMsg), - staleTowers: make(chan *staleTowerMsg), - quit: make(chan struct{}), + cfg: cfg, + log: plog, + pipeline: queue, + activeSessions: newSessionQueueSet(), + statTicker: time.NewTicker(DefaultStatInterval), + stats: new(clientStats), + newTowers: make(chan *newTowerMsg), + staleTowers: make(chan *staleTowerMsg), + quit: make(chan struct{}), } candidateTowers := newTowerListIterator() @@ -410,65 +374,9 @@ func (c *TowerClient) start() error { } } - chanSub, err := c.cfg.SubscribeChannelEvents() - if err != nil { - return err - } - - // Iterate over the list of registered channels and check if - // any of them can be marked as closed. - for id := range c.chanInfos { - isClosed, closedHeight, err := c.isChannelClosed(id) - if err != nil { - return err - } - - if !isClosed { - continue - } - - _, err = c.cfg.DB.MarkChannelClosed(id, closedHeight) - if err != nil { - c.log.Errorf("could not mark channel(%s) as "+ - "closed: %v", id, err) - - continue - } - - // Since the channel has been marked as closed, we can - // also remove it from the channel summaries map. - delete(c.chanInfos, id) - } - - // Load all closable sessions. - closableSessions, err := c.cfg.DB.ListClosableSessions() - if err != nil { - return err - } - - err = c.trackClosableSessions(closableSessions) - if err != nil { - return err - } - - c.wg.Add(1) - go c.handleChannelCloses(chanSub) - - // Subscribe to new block events. - blockEvents, err := c.cfg.ChainNotifier.RegisterBlockEpochNtfn( - nil, - ) - if err != nil { - return err - } - - c.wg.Add(1) - go c.handleClosableSessions(blockEvents) - - // Now start the session negotiator, which will allow us to - // request new session as soon as the backupDispatcher starts - // up. - err = c.negotiator.Start() + // Now start the session negotiator, which will allow us to request new + // session as soon as the backupDispatcher starts up. + err := c.negotiator.Start() if err != nil { return err } @@ -538,84 +446,15 @@ func (c *TowerClient) stop() error { return returnErr } -// RegisterChannel persistently initializes any channel-dependent parameters -// within the client. This should be called during link startup to ensure that -// the client is able to support the link during operation. -func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error { - c.backupMu.Lock() - defer c.backupMu.Unlock() - - // If a pkscript for this channel already exists, the channel has been - // previously registered. - if _, ok := c.chanInfos[chanID]; ok { - return nil - } - - // Otherwise, generate a new sweep pkscript used to sweep funds for this - // channel. - pkScript, err := c.cfg.NewAddress() - if err != nil { - return err - } - - // Persist the sweep pkscript so that restarts will not introduce - // address inflation when the channel is reregistered after a restart. - err = c.cfg.DB.RegisterChannel(chanID, pkScript) - if err != nil { - return err - } - - // Finally, cache the pkscript in our in-memory cache to avoid db - // lookups for the remainder of the daemon's execution. - c.chanInfos[chanID] = &wtdb.ChannelInfo{ - ClientChanSummary: wtdb.ClientChanSummary{ - SweepPkScript: pkScript, - }, - } - - return nil -} - -// BackupState initiates a request to back up a particular revoked state. If the +// backupState initiates a request to back up a particular revoked state. If the // method returns nil, the backup is guaranteed to be successful unless the: // - justice transaction would create dust outputs when trying to abide by the // negotiated policy, or // - breached outputs contain too little value to sweep at the target sweep // fee rate. -func (c *TowerClient) BackupState(chanID *lnwire.ChannelID, +func (c *TowerClient) backupState(chanID *lnwire.ChannelID, stateNum uint64) error { - // Make sure that this channel is registered with the tower client. - c.backupMu.Lock() - info, ok := c.chanInfos[*chanID] - if !ok { - c.backupMu.Unlock() - - return ErrUnregisteredChannel - } - - // Ignore backups that have already been presented to the client. - var duplicate bool - info.MaxHeight.WhenSome(func(maxHeight uint64) { - if stateNum <= maxHeight { - duplicate = true - } - }) - if duplicate { - c.backupMu.Unlock() - - c.log.Debugf("Ignoring duplicate backup for chanid=%v at "+ - "height=%d", chanID, stateNum) - - return nil - } - - // This backup has a higher commit height than any known backup for this - // channel. We'll update our tip so that we won't accept it again if the - // link flaps. - c.chanInfos[*chanID].MaxHeight = fn.Some(stateNum) - c.backupMu.Unlock() - id := &wtdb.BackupID{ ChanID: *chanID, CommitHeight: stateNum, @@ -667,215 +506,10 @@ func (c *TowerClient) nextSessionQueue() (*sessionQueue, error) { return c.getOrInitActiveQueue(candidateSession, updates), nil } -// handleChannelCloses listens for channel close events and marks channels as -// closed in the DB. -// -// NOTE: This method MUST be run as a goroutine. -func (c *TowerClient) handleChannelCloses(chanSub subscribe.Subscription) { - defer c.wg.Done() - - c.log.Debugf("Starting channel close handler") - defer c.log.Debugf("Stopping channel close handler") - - for { - select { - case update, ok := <-chanSub.Updates(): - if !ok { - c.log.Debugf("Channel notifier has exited") - return - } - - // We only care about channel-close events. - event, ok := update.(channelnotifier.ClosedChannelEvent) - if !ok { - continue - } - - chanID := lnwire.NewChanIDFromOutPoint( - &event.CloseSummary.ChanPoint, - ) - - c.log.Debugf("Received ClosedChannelEvent for "+ - "channel: %s", chanID) - - err := c.handleClosedChannel( - chanID, event.CloseSummary.CloseHeight, - ) - if err != nil { - c.log.Errorf("Could not handle channel close "+ - "event for channel(%s): %v", chanID, - err) - } - - case <-c.quit: - return - } - } -} - -// handleClosedChannel handles the closure of a single channel. It will mark the -// channel as closed in the DB, then it will handle all the sessions that are -// now closable due to the channel closure. -func (c *TowerClient) handleClosedChannel(chanID lnwire.ChannelID, - closeHeight uint32) error { - - c.backupMu.Lock() - defer c.backupMu.Unlock() - - // We only care about channels registered with the tower client. - if _, ok := c.chanInfos[chanID]; !ok { - return nil - } - - c.log.Debugf("Marking channel(%s) as closed", chanID) - - sessions, err := c.cfg.DB.MarkChannelClosed(chanID, closeHeight) - if err != nil { - return fmt.Errorf("could not mark channel(%s) as closed: %w", - chanID, err) - } - - closableSessions := make(map[wtdb.SessionID]uint32, len(sessions)) - for _, sess := range sessions { - closableSessions[sess] = closeHeight - } - - c.log.Debugf("Tracking %d new closable sessions as a result of "+ - "closing channel %s", len(closableSessions), chanID) - - err = c.trackClosableSessions(closableSessions) - if err != nil { - return fmt.Errorf("could not track closable sessions: %w", err) - } - - delete(c.chanInfos, chanID) - - return nil -} - -// handleClosableSessions listens for new block notifications. For each block, -// it checks the closableSessionQueue to see if there is a closable session with -// a delete-height smaller than or equal to the new block, if there is then the -// tower is informed that it can delete the session, and then we also delete it -// from our DB. -func (c *TowerClient) handleClosableSessions( - blocksChan *chainntnfs.BlockEpochEvent) { - - defer c.wg.Done() - - c.log.Debug("Starting closable sessions handler") - defer c.log.Debug("Stopping closable sessions handler") - - for { - select { - case newBlock := <-blocksChan.Epochs: - if newBlock == nil { - return - } - - height := uint32(newBlock.Height) - for { - select { - case <-c.quit: - return - default: - } - - // If there are no closable sessions that we - // need to handle, then we are done and can - // reevaluate when the next block comes. - item := c.closableSessionQueue.Top() - if item == nil { - break - } - - // If there is closable session but the delete - // height we have set for it is after the - // current block height, then our work is done. - if item.deleteHeight > height { - break - } - - // Otherwise, we pop this item from the heap - // and handle it. - c.closableSessionQueue.Pop() - - // Stop the session and remove it from the - // in-memory set. - err := c.activeSessions.StopAndRemove( - item.sessionID, - ) - if err != nil { - c.log.Errorf("could not remove "+ - "session(%s) from in-memory "+ - "set: %v", item.sessionID, err) - - return - } - - // Fetch the session from the DB so that we can - // extract the Tower info. - sess, err := c.cfg.DB.GetClientSession( - item.sessionID, - ) - if err != nil { - c.log.Errorf("error calling "+ - "GetClientSession for "+ - "session %s: %v", - item.sessionID, err) - - continue - } - - err = c.deleteSessionFromTower(sess) - if err != nil { - c.log.Errorf("error deleting "+ - "session %s from tower: %v", - sess.ID, err) - - continue - } - - err = c.cfg.DB.DeleteSession(item.sessionID) - if err != nil { - c.log.Errorf("could not delete "+ - "session(%s) from DB: %w", - sess.ID, err) - - continue - } - } - - case <-c.quit: - return - } - } -} - -// trackClosableSessions takes in a map of session IDs to the earliest block -// height at which the session should be deleted. For each of the sessions, -// a random delay is added to the block height and the session is added to the -// closableSessionQueue. -func (c *TowerClient) trackClosableSessions( - sessions map[wtdb.SessionID]uint32) error { - - // For each closable session, add a random delay to its close - // height and add it to the closableSessionQueue. - for sID, blockHeight := range sessions { - delay, err := newRandomDelay(c.cfg.SessionCloseRange) - if err != nil { - return err - } - - deleteHeight := blockHeight + delay - - c.closableSessionQueue.Push(&sessionCloseItem{ - sessionID: sID, - deleteHeight: deleteHeight, - }) - } - - return nil +// stopAndRemoveSession stops the session with the given ID and removes it from +// the in-memory active sessions set. +func (c *TowerClient) stopAndRemoveSession(id wtdb.SessionID) error { + return c.activeSessions.StopAndRemove(id) } // deleteSessionFromTower dials the tower that we created the session with and @@ -1154,19 +788,15 @@ func (c *TowerClient) backupDispatcher() { // that are rejected because the active sessionQueue is full will be cached as // the prevTask, and should be reprocessed after obtaining a new sessionQueue. func (c *TowerClient) processTask(task *wtdb.BackupID) { - c.backupMu.Lock() - summary, ok := c.chanInfos[task.ChanID] + script, ok := c.cfg.getSweepScript(task.ChanID) if !ok { - c.backupMu.Unlock() - log.Infof("not processing task for unregistered channel: %s", task.ChanID) return } - c.backupMu.Unlock() - backupTask := newBackupTask(*task, summary.SweepPkScript) + backupTask := newBackupTask(*task, script) status, accepted := c.sessionQueue.AcceptTask(backupTask) if accepted { @@ -1410,22 +1040,6 @@ func (c *TowerClient) initActiveQueue(s *ClientSession, return sq } -// isChanClosed can be used to check if the channel with the given ID has been -// closed. If it has been, the block height in which its closing transaction was -// mined will also be returned. -func (c *TowerClient) isChannelClosed(id lnwire.ChannelID) (bool, uint32, - error) { - - chanSum, err := c.cfg.FetchClosedChannel(id) - if errors.Is(err, channeldb.ErrClosedChannelNotFound) { - return false, 0, nil - } else if err != nil { - return false, 0, err - } - - return true, chanSum.CloseHeight, nil -} - // addTower adds a new watchtower reachable at the given address and considers // it for new sessions. If the watchtower already exists, then any new addresses // included will be considered when dialing it for session negotiations and diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index f1deb697a..452404919 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -403,7 +403,6 @@ type testHarness struct { clientDB *wtdb.ClientDB clientCfg *wtclient.Config clientPolicy wtpolicy.Policy - client wtclient.Client server *serverHarness net *mockNet @@ -564,7 +563,7 @@ func (h *testHarness) startClient() { h.clientMgr, err = wtclient.NewManager(h.clientCfg) require.NoError(h.t, err) - h.client, err = h.clientMgr.NewClient(h.clientPolicy) + _, err = h.clientMgr.NewClient(h.clientPolicy) require.NoError(h.t, err) require.NoError(h.t, h.clientMgr.Start()) require.NoError(h.t, h.clientMgr.AddTower(towerAddr)) @@ -668,7 +667,7 @@ func (h *testHarness) registerChannel(id uint64) { h.t.Helper() chanID := chanIDFromInt(id) - err := h.client.RegisterChannel(chanID) + err := h.clientMgr.RegisterChannel(chanID, channeldb.SingleFunderBit) require.NoError(h.t, err) } @@ -709,8 +708,8 @@ func (h *testHarness) backupState(id, i uint64, expErr error) { chanID := chanIDFromInt(id) - err := h.client.BackupState(&chanID, retribution.RevokedStateNum) - require.ErrorIs(h.t, expErr, err) + err := h.clientMgr.BackupState(&chanID, retribution.RevokedStateNum) + require.ErrorIs(h.t, err, expErr) } // sendPayments instructs the channel identified by id to send amt to the remote @@ -1254,6 +1253,7 @@ var clientTests = []clientTest{ // Restart the client and allow it to process the // committed update. h.startClient() + h.registerChannel(chanID) // Wait for the committed update to be accepted by the // tower. @@ -1555,6 +1555,7 @@ var clientTests = []clientTest{ // Restart the client with a new policy. h.clientPolicy.MaxUpdates = 20 h.startClient() + h.registerChannel(chanID) // Now, queue the second half of the retributions. h.backupStates(chanID, numUpdates/2, numUpdates, nil) @@ -1605,6 +1606,7 @@ var clientTests = []clientTest{ // maintained across restarts. require.NoError(h.t, h.clientMgr.Stop()) h.startClient() + h.registerChannel(chanID) // Try to back up the full range of retributions. Only // the second half should actually be sent. @@ -2126,6 +2128,7 @@ var clientTests = []clientTest{ require.NoError(h.t, h.clientMgr.Stop()) h.server.start() h.startClient() + h.registerChannel(chanID) // Back up a few more states. h.backupStates(chanID, numUpdates/2, numUpdates, nil) @@ -2520,7 +2523,7 @@ var clientTests = []clientTest{ // Wait for channel to be "unregistered". chanID := chanIDFromInt(chanIDInt) err = wait.Predicate(func() bool { - err := h.client.BackupState(&chanID, 0) + err := h.clientMgr.BackupState(&chanID, 0) return errors.Is( err, wtclient.ErrUnregisteredChannel, diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index 61386548e..be0042f43 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -1,6 +1,7 @@ package wtclient import ( + "errors" "fmt" "net" "sync" @@ -10,6 +11,8 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/subscribe" @@ -50,6 +53,18 @@ type TowerClientManager interface { // LookupTower retrieves a registered watchtower through its public key. LookupTower(*btcec.PublicKey, ...wtdb.ClientSessionListOption) ( map[blob.Type]*RegisteredTower, error) + + // RegisterChannel persistently initializes any channel-dependent + // parameters within the client. This should be called during link + // startup to ensure that the client is able to support the link during + // operation. + RegisterChannel(lnwire.ChannelID, channeldb.ChannelType) error + + // BackupState initiates a request to back up a particular revoked + // state. If the method returns nil, the backup is guaranteed to be + // successful unless the justice transaction would create dust outputs + // when trying to abide by the negotiated policy. + BackupState(chanID *lnwire.ChannelID, stateNum uint64) error } // Config provides the TowerClient with access to the resources it requires to @@ -143,6 +158,15 @@ type Manager struct { clients map[blob.Type]*TowerClient clientsMu sync.Mutex + + backupMu sync.Mutex + chanInfos wtdb.ChannelInfos + chanBlobType map[lnwire.ChannelID]blob.Type + + closableSessionQueue *sessionCloseMinHeap + + wg sync.WaitGroup + quit chan struct{} } var _ TowerClientManager = (*Manager)(nil) @@ -163,9 +187,18 @@ func NewManager(config *Config) (*Manager, error) { cfg.WriteTimeout = DefaultWriteTimeout } + chanInfos, err := cfg.DB.FetchChanInfos() + if err != nil { + return nil, err + } + return &Manager{ - cfg: &cfg, - clients: make(map[blob.Type]*TowerClient), + cfg: &cfg, + clients: make(map[blob.Type]*TowerClient), + chanBlobType: make(map[lnwire.ChannelID]blob.Type), + chanInfos: chanInfos, + closableSessionQueue: newSessionCloseMinHeap(), + quit: make(chan struct{}), }, nil } @@ -182,8 +215,9 @@ func (m *Manager) NewClient(policy wtpolicy.Policy) (*TowerClient, error) { } cfg := &towerClientCfg{ - Config: m.cfg, - Policy: policy, + Config: m.cfg, + Policy: policy, + getSweepScript: m.getSweepScript, } client, err := newTowerClient(cfg) @@ -200,6 +234,71 @@ func (m *Manager) NewClient(policy wtpolicy.Policy) (*TowerClient, error) { func (m *Manager) Start() error { var returnErr error m.started.Do(func() { + chanSub, err := m.cfg.SubscribeChannelEvents() + if err != nil { + returnErr = err + + return + } + + // Iterate over the list of registered channels and check if any + // of them can be marked as closed. + for id := range m.chanInfos { + isClosed, closedHeight, err := m.isChannelClosed(id) + if err != nil { + returnErr = err + + return + } + + if !isClosed { + continue + } + + _, err = m.cfg.DB.MarkChannelClosed(id, closedHeight) + if err != nil { + log.Errorf("could not mark channel(%s) as "+ + "closed: %v", id, err) + + continue + } + + // Since the channel has been marked as closed, we can + // also remove it from the channel summaries map. + delete(m.chanInfos, id) + } + + // Load all closable sessions. + closableSessions, err := m.cfg.DB.ListClosableSessions() + if err != nil { + returnErr = err + + return + } + + err = m.trackClosableSessions(closableSessions) + if err != nil { + returnErr = err + + return + } + + m.wg.Add(1) + go m.handleChannelCloses(chanSub) + + // Subscribe to new block events. + blockEvents, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn( + nil, + ) + if err != nil { + returnErr = err + + return + } + + m.wg.Add(1) + go m.handleClosableSessions(blockEvents) + m.clientsMu.Lock() defer m.clientsMu.Unlock() @@ -221,6 +320,9 @@ func (m *Manager) Stop() error { m.clientsMu.Lock() defer m.clientsMu.Unlock() + close(m.quit) + m.wg.Wait() + for _, client := range m.clients { if err := client.stop(); err != nil { returnErr = err @@ -402,3 +504,387 @@ func (m *Manager) Policy(blobType blob.Type) (wtpolicy.Policy, error) { return client.policy(), nil } + +// RegisterChannel persistently initializes any channel-dependent parameters +// within the client. This should be called during link startup to ensure that +// the client is able to support the link during operation. +func (m *Manager) RegisterChannel(id lnwire.ChannelID, + chanType channeldb.ChannelType) error { + + blobType := blob.TypeAltruistCommit + if chanType.HasAnchors() { + blobType = blob.TypeAltruistAnchorCommit + } + + m.clientsMu.Lock() + if _, ok := m.clients[blobType]; !ok { + m.clientsMu.Unlock() + + return fmt.Errorf("no client registered for blob type %s", + blobType) + } + m.clientsMu.Unlock() + + m.backupMu.Lock() + defer m.backupMu.Unlock() + + // If a pkscript for this channel already exists, the channel has been + // previously registered. + if _, ok := m.chanInfos[id]; ok { + // Keep track of which blob type this channel will use for + // updates. + m.chanBlobType[id] = blobType + + return nil + } + + // Otherwise, generate a new sweep pkscript used to sweep funds for this + // channel. + pkScript, err := m.cfg.NewAddress() + if err != nil { + return err + } + + // Persist the sweep pkscript so that restarts will not introduce + // address inflation when the channel is reregistered after a restart. + err = m.cfg.DB.RegisterChannel(id, pkScript) + if err != nil { + return err + } + + // Finally, cache the pkscript in our in-memory cache to avoid db + // lookups for the remainder of the daemon's execution. + m.chanInfos[id] = &wtdb.ChannelInfo{ + ClientChanSummary: wtdb.ClientChanSummary{ + SweepPkScript: pkScript, + }, + } + + // Keep track of which blob type this channel will use for updates. + m.chanBlobType[id] = blobType + + return nil +} + +// BackupState initiates a request to back up a particular revoked state. If the +// method returns nil, the backup is guaranteed to be successful unless the +// justice transaction would create dust outputs when trying to abide by the +// negotiated policy. +func (m *Manager) BackupState(chanID *lnwire.ChannelID, stateNum uint64) error { + select { + case <-m.quit: + return ErrClientExiting + default: + } + + // Make sure that this channel is registered with the tower client. + m.backupMu.Lock() + info, ok := m.chanInfos[*chanID] + if !ok { + m.backupMu.Unlock() + + return ErrUnregisteredChannel + } + + // Ignore backups that have already been presented to the client. + var duplicate bool + info.MaxHeight.WhenSome(func(maxHeight uint64) { + if stateNum <= maxHeight { + duplicate = true + } + }) + if duplicate { + m.backupMu.Unlock() + + log.Debugf("Ignoring duplicate backup for chanid=%v at "+ + "height=%d", chanID, stateNum) + + return nil + } + + // This backup has a higher commit height than any known backup for this + // channel. We'll update our tip so that we won't accept it again if the + // link flaps. + m.chanInfos[*chanID].MaxHeight = fn.Some(stateNum) + + blobType, ok := m.chanBlobType[*chanID] + if !ok { + m.backupMu.Unlock() + + return ErrUnregisteredChannel + } + m.backupMu.Unlock() + + m.clientsMu.Lock() + client, ok := m.clients[blobType] + if !ok { + m.clientsMu.Unlock() + + return fmt.Errorf("no client registered for blob type %s", + blobType) + } + m.clientsMu.Unlock() + + return client.backupState(chanID, stateNum) +} + +// isChanClosed can be used to check if the channel with the given ID has been +// closed. If it has been, the block height in which its closing transaction was +// mined will also be returned. +func (m *Manager) isChannelClosed(id lnwire.ChannelID) (bool, uint32, + error) { + + chanSum, err := m.cfg.FetchClosedChannel(id) + if errors.Is(err, channeldb.ErrClosedChannelNotFound) { + return false, 0, nil + } else if err != nil { + return false, 0, err + } + + return true, chanSum.CloseHeight, nil +} + +// trackClosableSessions takes in a map of session IDs to the earliest block +// height at which the session should be deleted. For each of the sessions, +// a random delay is added to the block height and the session is added to the +// closableSessionQueue. +func (m *Manager) trackClosableSessions( + sessions map[wtdb.SessionID]uint32) error { + + // For each closable session, add a random delay to its close + // height and add it to the closableSessionQueue. + for sID, blockHeight := range sessions { + delay, err := newRandomDelay(m.cfg.SessionCloseRange) + if err != nil { + return err + } + + deleteHeight := blockHeight + delay + + m.closableSessionQueue.Push(&sessionCloseItem{ + sessionID: sID, + deleteHeight: deleteHeight, + }) + } + + return nil +} + +// handleChannelCloses listens for channel close events and marks channels as +// closed in the DB. +// +// NOTE: This method MUST be run as a goroutine. +func (m *Manager) handleChannelCloses(chanSub subscribe.Subscription) { + defer m.wg.Done() + + log.Debugf("Starting channel close handler") + defer log.Debugf("Stopping channel close handler") + + for { + select { + case update, ok := <-chanSub.Updates(): + if !ok { + log.Debugf("Channel notifier has exited") + return + } + + // We only care about channel-close events. + event, ok := update.(channelnotifier.ClosedChannelEvent) + if !ok { + continue + } + + chanID := lnwire.NewChanIDFromOutPoint( + &event.CloseSummary.ChanPoint, + ) + + log.Debugf("Received ClosedChannelEvent for "+ + "channel: %s", chanID) + + err := m.handleClosedChannel( + chanID, event.CloseSummary.CloseHeight, + ) + if err != nil { + log.Errorf("Could not handle channel close "+ + "event for channel(%s): %v", chanID, + err) + } + + case <-m.quit: + return + } + } +} + +// handleClosedChannel handles the closure of a single channel. It will mark the +// channel as closed in the DB, then it will handle all the sessions that are +// now closable due to the channel closure. +func (m *Manager) handleClosedChannel(chanID lnwire.ChannelID, + closeHeight uint32) error { + + m.backupMu.Lock() + defer m.backupMu.Unlock() + + // We only care about channels registered with the tower client. + if _, ok := m.chanInfos[chanID]; !ok { + return nil + } + + log.Debugf("Marking channel(%s) as closed", chanID) + + sessions, err := m.cfg.DB.MarkChannelClosed(chanID, closeHeight) + if err != nil { + return fmt.Errorf("could not mark channel(%s) as closed: %w", + chanID, err) + } + + closableSessions := make(map[wtdb.SessionID]uint32, len(sessions)) + for _, sess := range sessions { + closableSessions[sess] = closeHeight + } + + log.Debugf("Tracking %d new closable sessions as a result of "+ + "closing channel %s", len(closableSessions), chanID) + + err = m.trackClosableSessions(closableSessions) + if err != nil { + return fmt.Errorf("could not track closable sessions: %w", err) + } + + delete(m.chanInfos, chanID) + + return nil +} + +// handleClosableSessions listens for new block notifications. For each block, +// it checks the closableSessionQueue to see if there is a closable session with +// a delete-height smaller than or equal to the new block, if there is then the +// tower is informed that it can delete the session, and then we also delete it +// from our DB. +func (m *Manager) handleClosableSessions( + blocksChan *chainntnfs.BlockEpochEvent) { + + defer m.wg.Done() + + log.Debug("Starting closable sessions handler") + defer log.Debug("Stopping closable sessions handler") + + for { + select { + case newBlock := <-blocksChan.Epochs: + if newBlock == nil { + return + } + + height := uint32(newBlock.Height) + for { + select { + case <-m.quit: + return + default: + } + + // If there are no closable sessions that we + // need to handle, then we are done and can + // reevaluate when the next block comes. + item := m.closableSessionQueue.Top() + if item == nil { + break + } + + // If there is closable session but the delete + // height we have set for it is after the + // current block height, then our work is done. + if item.deleteHeight > height { + break + } + + // Otherwise, we pop this item from the heap + // and handle it. + m.closableSessionQueue.Pop() + + // Fetch the session from the DB so that we can + // extract the Tower info. + sess, err := m.cfg.DB.GetClientSession( + item.sessionID, + ) + if err != nil { + log.Errorf("error calling "+ + "GetClientSession for "+ + "session %s: %v", + item.sessionID, err) + + continue + } + + // get appropriate client. + m.clientsMu.Lock() + client, ok := m.clients[sess.Policy.BlobType] + if !ok { + m.clientsMu.Unlock() + log.Errorf("no client currently " + + "active for the session type") + + continue + } + m.clientsMu.Unlock() + + clientName, err := client.policy().BlobType. + Identifier() + if err != nil { + log.Errorf("could not get client "+ + "identifier: %v", err) + + continue + } + + // Stop the session and remove it from the + // in-memory set. + err = client.stopAndRemoveSession( + item.sessionID, + ) + if err != nil { + log.Errorf("could not remove "+ + "session(%s) from in-memory "+ + "set of the %s client: %v", + item.sessionID, clientName, err) + + continue + } + + err = client.deleteSessionFromTower(sess) + if err != nil { + log.Errorf("error deleting "+ + "session %s from tower: %v", + sess.ID, err) + + continue + } + + err = m.cfg.DB.DeleteSession(item.sessionID) + if err != nil { + log.Errorf("could not delete "+ + "session(%s) from DB: %w", + sess.ID, err) + + continue + } + } + + case <-m.quit: + return + } + } +} + +func (m *Manager) getSweepScript(id lnwire.ChannelID) ([]byte, bool) { + m.backupMu.Lock() + defer m.backupMu.Unlock() + + summary, ok := m.chanInfos[id] + if !ok { + return nil, false + } + + return summary.SweepPkScript, true +} From e800aacff430b66ea9e73ae65a8e4056ea6acd0f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 14:53:54 +0200 Subject: [PATCH 11/12] wtclient+server: unexport and rename TowerClient Rename and unexport the `TowerClient` struct to `client` and rename the `TowerClientManager` interface to `ClientManager`. --- lnrpc/wtclientrpc/config.go | 2 +- peer/brontide.go | 4 +- server.go | 27 +++------- subrpcserver_config.go | 2 +- watchtower/wtclient/client.go | 79 ++++++++++++++-------------- watchtower/wtclient/client_test.go | 5 +- watchtower/wtclient/manager.go | 44 ++++++++++------ watchtower/wtclient/session_queue.go | 2 +- 8 files changed, 80 insertions(+), 85 deletions(-) diff --git a/lnrpc/wtclientrpc/config.go b/lnrpc/wtclientrpc/config.go index 58566c19a..b95fb1197 100644 --- a/lnrpc/wtclientrpc/config.go +++ b/lnrpc/wtclientrpc/config.go @@ -17,7 +17,7 @@ type Config struct { // ClientMgr is a tower client manager that manages a set of tower // clients. - ClientMgr wtclient.TowerClientManager + ClientMgr wtclient.ClientManager // Resolver is a custom resolver that will be used to resolve watchtower // addresses to ensure we don't leak any information when running over diff --git a/peer/brontide.go b/peer/brontide.go index 1751ef5ac..e593bf393 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -269,7 +269,7 @@ type Config struct { HtlcNotifier *htlcswitch.HtlcNotifier // TowerClient is used to backup revoked states. - TowerClient wtclient.TowerClientManager + TowerClient wtclient.ClientManager // DisconnectPeer is used to disconnect this peer if the cooperative close // process fails. @@ -1036,7 +1036,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update) } - var towerClient wtclient.TowerClientManager + var towerClient wtclient.ClientManager if lnChan.ChanType().IsTaproot() { // Leave the tower client as nil for now until the tower client // has support for taproot channels. diff --git a/server.go b/server.go index 076802606..fd53ee5e6 100644 --- a/server.go +++ b/server.go @@ -1546,6 +1546,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr, fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID + // Copy the policy for legacy channels and set the blob flag + // signalling support for anchor channels. + anchorPolicy := policy + anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel) + s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{ FetchClosedChannel: fetchClosedChannel, BuildBreachRetribution: buildBreachRetribution, @@ -1567,25 +1572,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, MinBackoff: 10 * time.Second, MaxBackoff: 5 * time.Minute, MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue, - }) - if err != nil { - return nil, err - } - - // Register a legacy tower client. - _, err = s.towerClientMgr.NewClient(policy) - if err != nil { - return nil, err - } - - // Copy the policy for legacy channels and set the blob flag - // signalling support for anchor channels. - anchorPolicy := policy - anchorPolicy.TxPolicy.BlobType |= - blob.Type(blob.FlagAnchorChannel) - - // Register an anchors tower client. - _, err = s.towerClientMgr.NewClient(anchorPolicy) + }, policy, anchorPolicy) if err != nil { return nil, err } @@ -3782,7 +3769,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // though the underlying value is nil. To avoid this gotcha which can // cause a panic, we need to explicitly pass nil to the peer.Config's // TowerClient if needed. - var towerClient wtclient.TowerClientManager + var towerClient wtclient.ClientManager if s.towerClientMgr != nil { towerClient = s.towerClientMgr } diff --git a/subrpcserver_config.go b/subrpcserver_config.go index 175515de3..360d7fa31 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -113,7 +113,7 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, chanStateDB *channeldb.ChannelStateDB, sweeper *sweep.UtxoSweeper, tower *watchtower.Standalone, - towerClientMgr wtclient.TowerClientManager, + towerClientMgr wtclient.ClientManager, tcpResolver lncfg.TCPResolver, genInvoiceFeatures func() *lnwire.FeatureVector, genAmpInvoiceFeatures func() *lnwire.FeatureVector, diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index bacc45ab1..24d974ced 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -49,7 +49,7 @@ const ( // genSessionFilter constructs a filter that can be used to select sessions only // if they match the policy of the client (namely anchor vs legacy). If // activeOnly is set, then only active sessions will be returned. -func (c *TowerClient) genSessionFilter( +func (c *client) genSessionFilter( activeOnly bool) wtdb.ClientSessionFilterFn { return func(session *wtdb.ClientSession) bool { @@ -92,7 +92,7 @@ type BreachRetributionBuilder func(id lnwire.ChannelID, commitHeight uint64) (*lnwallet.BreachRetribution, channeldb.ChannelType, error) -// newTowerMsg is an internal message we'll use within the TowerClient to signal +// newTowerMsg is an internal message we'll use within the client to signal // that a new tower can be considered. type newTowerMsg struct { // tower holds the info about the new Tower or new tower address @@ -106,7 +106,7 @@ type newTowerMsg struct { errChan chan error } -// staleTowerMsg is an internal message we'll use within the TowerClient to +// staleTowerMsg is an internal message we'll use within the client to // signal that a tower should no longer be considered. type staleTowerMsg struct { // id is the unique database identifier for the tower. @@ -128,8 +128,8 @@ type staleTowerMsg struct { errChan chan error } -// towerClientCfg holds the configuration values required by a TowerClient. -type towerClientCfg struct { +// clientCfg holds the configuration values required by a client. +type clientCfg struct { *Config // Policy is the session policy the client will propose when creating @@ -141,11 +141,10 @@ type towerClientCfg struct { getSweepScript func(lnwire.ChannelID) ([]byte, bool) } -// TowerClient is a concrete implementation of the Client interface, offering a -// non-blocking, reliable subsystem for backing up revoked states to a specified -// private tower. -type TowerClient struct { - cfg *towerClientCfg +// client manages backing up revoked states for all states that fall under a +// specific policy type. +type client struct { + cfg *clientCfg log btclog.Logger @@ -169,9 +168,9 @@ type TowerClient struct { quit chan struct{} } -// newTowerClient initializes a new TowerClient from the provided -// towerClientCfg. An error is returned if the client could not be initialized. -func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { +// newClient initializes a new client from the provided clientCfg. An error is +// returned if the client could not be initialized. +func newClient(cfg *clientCfg) (*client, error) { identifier, err := cfg.Policy.BlobType.Identifier() if err != nil { return nil, err @@ -188,7 +187,7 @@ func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { return nil, err } - c := &TowerClient{ + c := &client{ cfg: cfg, log: plog, pipeline: queue, @@ -349,7 +348,7 @@ func getClientSessions(db DB, keyRing ECDHKeyRing, forTower *wtdb.TowerID, // start initializes the watchtower client by loading or negotiating an active // session and then begins processing backup tasks from the request pipeline. -func (c *TowerClient) start() error { +func (c *client) start() error { c.log.Infof("Watchtower client starting") // First, restart a session queue for any sessions that have @@ -397,7 +396,7 @@ func (c *TowerClient) start() error { } // stop idempotently initiates a graceful shutdown of the watchtower client. -func (c *TowerClient) stop() error { +func (c *client) stop() error { var returnErr error c.log.Debugf("Stopping watchtower client") @@ -452,7 +451,7 @@ func (c *TowerClient) stop() error { // negotiated policy, or // - breached outputs contain too little value to sweep at the target sweep // fee rate. -func (c *TowerClient) backupState(chanID *lnwire.ChannelID, +func (c *client) backupState(chanID *lnwire.ChannelID, stateNum uint64) error { id := &wtdb.BackupID{ @@ -468,7 +467,7 @@ func (c *TowerClient) backupState(chanID *lnwire.ChannelID, // active client's advertised policy will be ignored, but may be resumed if the // client is restarted with a matching policy. If no candidates were found, nil // is returned to signal that we need to request a new policy. -func (c *TowerClient) nextSessionQueue() (*sessionQueue, error) { +func (c *client) nextSessionQueue() (*sessionQueue, error) { // Select any candidate session at random, and remove it from the set of // candidate sessions. var candidateSession *ClientSession @@ -508,13 +507,13 @@ func (c *TowerClient) nextSessionQueue() (*sessionQueue, error) { // stopAndRemoveSession stops the session with the given ID and removes it from // the in-memory active sessions set. -func (c *TowerClient) stopAndRemoveSession(id wtdb.SessionID) error { +func (c *client) stopAndRemoveSession(id wtdb.SessionID) error { return c.activeSessions.StopAndRemove(id) } // deleteSessionFromTower dials the tower that we created the session with and // attempts to send the tower the DeleteSession message. -func (c *TowerClient) deleteSessionFromTower(sess *wtdb.ClientSession) error { +func (c *client) deleteSessionFromTower(sess *wtdb.ClientSession) error { // First, we check if we have already loaded this tower in our // candidate towers iterator. tower, err := c.candidateTowers.GetTower(sess.TowerID) @@ -638,10 +637,10 @@ func (c *TowerClient) deleteSessionFromTower(sess *wtdb.ClientSession) error { // backupDispatcher processes events coming from the taskPipeline and is // responsible for detecting when the client needs to renegotiate a session to -// fulfill continuing demand. The event loop exits if the TowerClient is quit. +// fulfill continuing demand. The event loop exits if the client is quit. // // NOTE: This method MUST be run as a goroutine. -func (c *TowerClient) backupDispatcher() { +func (c *client) backupDispatcher() { defer c.wg.Done() c.log.Tracef("Starting backup dispatcher") @@ -787,7 +786,7 @@ func (c *TowerClient) backupDispatcher() { // sessionQueue hasn't been exhausted before proceeding to the next task. Tasks // that are rejected because the active sessionQueue is full will be cached as // the prevTask, and should be reprocessed after obtaining a new sessionQueue. -func (c *TowerClient) processTask(task *wtdb.BackupID) { +func (c *client) processTask(task *wtdb.BackupID) { script, ok := c.cfg.getSweepScript(task.ChanID) if !ok { log.Infof("not processing task for unregistered channel: %s", @@ -811,7 +810,7 @@ func (c *TowerClient) processTask(task *wtdb.BackupID) { // prevTask is always removed as a result of this call. The client's // sessionQueue will be removed if accepting the task left the sessionQueue in // an exhausted state. -func (c *TowerClient) taskAccepted(task *wtdb.BackupID, +func (c *client) taskAccepted(task *wtdb.BackupID, newStatus sessionQueueStatus) { c.log.Infof("Queued %v successfully for session %v", task, @@ -849,7 +848,7 @@ func (c *TowerClient) taskAccepted(task *wtdb.BackupID, // exhausted and not shutting down, the client marks the task as ineligible, as // this implies we couldn't construct a valid justice transaction given the // session's policy. -func (c *TowerClient) taskRejected(task *wtdb.BackupID, +func (c *client) taskRejected(task *wtdb.BackupID, curStatus sessionQueueStatus) { switch curStatus { @@ -910,7 +909,7 @@ func (c *TowerClient) taskRejected(task *wtdb.BackupID, // dial connects the peer at addr using privKey as our secret key for the // connection. The connection will use the configured Net's resolver to resolve // the address for either Tor or clear net connections. -func (c *TowerClient) dial(localKey keychain.SingleKeyECDH, +func (c *client) dial(localKey keychain.SingleKeyECDH, addr *lnwire.NetAddress) (wtserver.Peer, error) { return c.cfg.AuthDial(localKey, addr, c.cfg.Dial) @@ -920,7 +919,7 @@ func (c *TowerClient) dial(localKey keychain.SingleKeyECDH, // error is returned if a message is not received before the server's read // timeout, the read off the wire failed, or the message could not be // deserialized. -func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) { +func (c *client) readMessage(peer wtserver.Peer) (wtwire.Message, error) { // Set a read timeout to ensure we drop the connection if nothing is // received in a timely manner. err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout)) @@ -954,7 +953,7 @@ func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) { } // sendMessage sends a watchtower wire message to the target peer. -func (c *TowerClient) sendMessage(peer wtserver.Peer, +func (c *client) sendMessage(peer wtserver.Peer, msg wtwire.Message) error { // Encode the next wire message into the buffer. @@ -988,7 +987,7 @@ func (c *TowerClient) sendMessage(peer wtserver.Peer, // newSessionQueue creates a sessionQueue from a ClientSession loaded from the // database and supplying it with the resources needed by the client. -func (c *TowerClient) newSessionQueue(s *ClientSession, +func (c *client) newSessionQueue(s *ClientSession, updates []wtdb.CommittedUpdate) *sessionQueue { return newSessionQueue(&sessionQueueConfig{ @@ -1010,7 +1009,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession, // getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the // passed ClientSession. If it exists, the active sessionQueue is returned. // Otherwise, a new sessionQueue is initialized and added to the set. -func (c *TowerClient) getOrInitActiveQueue(s *ClientSession, +func (c *client) getOrInitActiveQueue(s *ClientSession, updates []wtdb.CommittedUpdate) *sessionQueue { if sq, ok := c.activeSessions.Get(s.ID); ok { @@ -1024,7 +1023,7 @@ func (c *TowerClient) getOrInitActiveQueue(s *ClientSession, // adds the sessionQueue to the activeSessions set, and starts the sessionQueue // so that it can deliver any committed updates or begin accepting newly // assigned tasks. -func (c *TowerClient) initActiveQueue(s *ClientSession, +func (c *client) initActiveQueue(s *ClientSession, updates []wtdb.CommittedUpdate) *sessionQueue { // Initialize the session queue, providing it with all the resources it @@ -1044,7 +1043,7 @@ func (c *TowerClient) initActiveQueue(s *ClientSession, // it for new sessions. If the watchtower already exists, then any new addresses // included will be considered when dialing it for session negotiations and // backups. -func (c *TowerClient) addTower(tower *Tower) error { +func (c *client) addTower(tower *Tower) error { errChan := make(chan error, 1) select { @@ -1067,7 +1066,7 @@ func (c *TowerClient) addTower(tower *Tower) error { // handleNewTower handles a request for a new tower to be added. If the tower // already exists, then its corresponding sessions, if any, will be set // considered as candidates. -func (c *TowerClient) handleNewTower(tower *Tower) error { +func (c *client) handleNewTower(tower *Tower) error { c.candidateTowers.AddCandidate(tower) // Include all of its corresponding sessions to our set of candidates. @@ -1091,7 +1090,7 @@ func (c *TowerClient) handleNewTower(tower *Tower) error { // negotiations and from being used for any subsequent backups until it's added // again. If an address is provided, then this call only serves as a way of // removing the address from the watchtower instead. -func (c *TowerClient) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey, +func (c *client) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey, addr net.Addr) error { errChan := make(chan error, 1) @@ -1119,7 +1118,7 @@ func (c *TowerClient) removeTower(id wtdb.TowerID, pubKey *btcec.PublicKey, // none of the tower's sessions have pending updates, then they will become // inactive and removed as candidates. If the active session queue corresponds // to any of these sessions, a new one will be negotiated. -func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { +func (c *client) handleStaleTower(msg *staleTowerMsg) error { // We'll first update our in-memory state. err := c.candidateTowers.RemoveCandidate(msg.id, msg.addr) if err != nil { @@ -1168,7 +1167,7 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { // registeredTowers retrieves the list of watchtowers registered with the // client. -func (c *TowerClient) registeredTowers(towers []*wtdb.Tower, +func (c *client) registeredTowers(towers []*wtdb.Tower, opts ...wtdb.ClientSessionListOption) ([]*RegisteredTower, error) { // Generate a filter that will fetch all the client's sessions @@ -1209,7 +1208,7 @@ func (c *TowerClient) registeredTowers(towers []*wtdb.Tower, // lookupTower retrieves the info of sessions held with the given tower handled // by this client. -func (c *TowerClient) lookupTower(tower *wtdb.Tower, +func (c *client) lookupTower(tower *wtdb.Tower, opts ...wtdb.ClientSessionListOption) (*RegisteredTower, error) { opts = append(opts, wtdb.WithPreEvalFilterFn(c.genSessionFilter(false))) @@ -1227,19 +1226,19 @@ func (c *TowerClient) lookupTower(tower *wtdb.Tower, } // getStats returns the in-memory statistics of the client since startup. -func (c *TowerClient) getStats() ClientStats { +func (c *client) getStats() ClientStats { return c.stats.getStatsCopy() } // policy returns the active client policy configuration. -func (c *TowerClient) policy() wtpolicy.Policy { +func (c *client) policy() wtpolicy.Policy { return c.cfg.Policy } // logMessage writes information about a message received from a remote peer, // using directional prepositions to signal whether the message was sent or // received. -func (c *TowerClient) logMessage( +func (c *client) logMessage( peer wtserver.Peer, msg wtwire.Message, read bool) { var action = "Received" diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 452404919..95e2664eb 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -560,10 +560,7 @@ func (h *testHarness) startClient() { Address: towerTCPAddr, } - h.clientMgr, err = wtclient.NewManager(h.clientCfg) - require.NoError(h.t, err) - - _, err = h.clientMgr.NewClient(h.clientPolicy) + h.clientMgr, err = wtclient.NewManager(h.clientCfg, h.clientPolicy) require.NoError(h.t, err) require.NoError(h.t, h.clientMgr.Start()) require.NoError(h.t, h.clientMgr.AddTower(towerAddr)) diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go index be0042f43..73f259085 100644 --- a/watchtower/wtclient/manager.go +++ b/watchtower/wtclient/manager.go @@ -22,9 +22,9 @@ import ( "github.com/lightningnetwork/lnd/watchtower/wtpolicy" ) -// TowerClientManager is the primary interface used by the daemon to control a +// ClientManager is the primary interface used by the daemon to control a // client's lifecycle and backup revoked states. -type TowerClientManager interface { +type ClientManager interface { // AddTower adds a new watchtower reachable at the given address and // considers it for new sessions. If the watchtower already exists, then // any new addresses included will be considered when dialing it for @@ -67,7 +67,7 @@ type TowerClientManager interface { BackupState(chanID *lnwire.ChannelID, stateNum uint64) error } -// Config provides the TowerClient with access to the resources it requires to +// Config provides the client with access to the resources it requires to // perform its duty. All nillable fields must be non-nil for the tower to be // initialized properly. type Config struct { @@ -156,7 +156,7 @@ type Manager struct { cfg *Config - clients map[blob.Type]*TowerClient + clients map[blob.Type]*client clientsMu sync.Mutex backupMu sync.Mutex @@ -169,10 +169,10 @@ type Manager struct { quit chan struct{} } -var _ TowerClientManager = (*Manager)(nil) +var _ ClientManager = (*Manager)(nil) // NewManager constructs a new Manager. -func NewManager(config *Config) (*Manager, error) { +func NewManager(config *Config, policies ...wtpolicy.Policy) (*Manager, error) { // Copy the config to prevent side effects from modifying both the // internal and external version of the Config. cfg := *config @@ -192,42 +192,54 @@ func NewManager(config *Config) (*Manager, error) { return nil, err } - return &Manager{ + m := &Manager{ cfg: &cfg, - clients: make(map[blob.Type]*TowerClient), + clients: make(map[blob.Type]*client), chanBlobType: make(map[lnwire.ChannelID]blob.Type), chanInfos: chanInfos, closableSessionQueue: newSessionCloseMinHeap(), quit: make(chan struct{}), - }, nil + } + + for _, policy := range policies { + if err = policy.Validate(); err != nil { + return nil, err + } + + if err = m.newClient(policy); err != nil { + return nil, err + } + } + + return m, nil } -// NewClient constructs a new TowerClient and adds it to the set of clients that +// newClient constructs a new client and adds it to the set of clients that // the Manager is keeping track of. -func (m *Manager) NewClient(policy wtpolicy.Policy) (*TowerClient, error) { +func (m *Manager) newClient(policy wtpolicy.Policy) error { m.clientsMu.Lock() defer m.clientsMu.Unlock() _, ok := m.clients[policy.BlobType] if ok { - return nil, fmt.Errorf("a client with blob type %s has "+ + return fmt.Errorf("a client with blob type %s has "+ "already been registered", policy.BlobType) } - cfg := &towerClientCfg{ + cfg := &clientCfg{ Config: m.cfg, Policy: policy, getSweepScript: m.getSweepScript, } - client, err := newTowerClient(cfg) + client, err := newClient(cfg) if err != nil { - return nil, err + return err } m.clients[policy.BlobType] = client - return client, nil + return nil } // Start starts all the clients that have been registered with the Manager. diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index 27c36c6fe..1bd7a4ddb 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -94,7 +94,7 @@ type sessionQueueConfig struct { // sessionQueue implements a reliable queue that will encrypt and send accepted // backups to the watchtower specified in the config's ClientSession. Calling // Stop will attempt to perform a clean shutdown replaying any un-committed -// pending updates to the TowerClient's main task pipeline. +// pending updates to the client's main task pipeline. type sessionQueue struct { started sync.Once stopped sync.Once From 59ebe02fa478586eea4b08fefcc552c996d74e59 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 15:09:17 +0200 Subject: [PATCH 12/12] docs: update release notes --- docs/release-notes/release-notes-0.18.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/release-notes/release-notes-0.18.0.md b/docs/release-notes/release-notes-0.18.0.md index 0dde33b8e..bbdc7589b 100644 --- a/docs/release-notes/release-notes-0.18.0.md +++ b/docs/release-notes/release-notes-0.18.0.md @@ -119,6 +119,10 @@ In particular, the complexity involved in the lifecycle loop has been decoupled into logical steps, with each step having its own responsibility, making it easier to reason about the payment flow. + +* [Add a watchtower tower client + multiplexer](https://github.com/lightningnetwork/lnd/pull/7702) to manage + tower clients of different types. ## Breaking Changes ## Performance Improvements