diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index f9aa085d9..be199437b 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -2,7 +2,9 @@ package wtclient import ( "bytes" + "errors" "fmt" + "net" "sync" "time" @@ -36,9 +38,48 @@ const ( DefaultForceQuitDelay = 10 * time.Second ) +// RegisteredTower encompasses information about a registered watchtower with +// the client. +type RegisteredTower struct { + *wtdb.Tower + + // Sessions is the set of sessions corresponding to the watchtower. + Sessions map[wtdb.SessionID]*wtdb.ClientSession + + // ActiveSessionCandidate determines whether the watchtower is currently + // being considered for new sessions. + ActiveSessionCandidate bool +} + // Client is the primary interface used by the daemon to control a client's // lifecycle and backup revoked states. type Client interface { + // AddTower adds a new watchtower reachable at the given address and + // considers it for new sessions. If the watchtower already exists, then + // any new addresses included will be considered when dialing it for + // session negotiations and backups. + AddTower(*lnwire.NetAddress) error + + // RemoveTower removes a watchtower from being considered for future + // session negotiations and from being used for any subsequent backups + // until it's added again. If an address is provided, then this call + // only serves as a way of removing the address from the watchtower + // instead. + RemoveTower(*btcec.PublicKey, net.Addr) error + + // RegisteredTowers retrieves the list of watchtowers registered with + // the client. + RegisteredTowers() ([]*RegisteredTower, error) + + // LookupTower retrieves a registered watchtower through its public key. + LookupTower(*btcec.PublicKey) (*RegisteredTower, error) + + // Stats returns the in-memory statistics of the client since startup. + Stats() ClientStats + + // Policy returns the active client policy configuration. + Policy() wtpolicy.Policy + // RegisterChannel persistently initializes any channel-dependent // parameters within the client. This should be called during link // startup to ensure that the client is able to support the link during @@ -136,6 +177,39 @@ type Config struct { MaxBackoff time.Duration } +// newTowerMsg is an internal message we'll use within the TowerClient to signal +// that a new tower can be considered. +type newTowerMsg struct { + // addr is the tower's reachable address that we'll use to establish a + // connection with. + addr *lnwire.NetAddress + + // errChan is the channel through which we'll send a response back to + // the caller when handling their request. + // + // NOTE: This channel must be buffered. + errChan chan error +} + +// staleTowerMsg is an internal message we'll use within the TowerClient to +// signal that a tower should no longer be considered. +type staleTowerMsg struct { + // pubKey is the identifying public key of the watchtower. + pubKey *btcec.PublicKey + + // addr is an optional field that when set signals that the address + // should be removed from the watchtower's set of addresses, indicating + // that it is stale. If it's not set, then the watchtower should be + // no longer be considered for new sessions. + addr net.Addr + + // errChan is the channel through which we'll send a response back to + // the caller when handling their request. + // + // NOTE: This channel must be buffered. + errChan chan error +} + // TowerClient is a concrete implementation of the Client interface, offering a // non-blocking, reliable subsystem for backing up revoked states to a specified // private tower. @@ -161,7 +235,10 @@ type TowerClient struct { chanCommitHeights map[lnwire.ChannelID]uint64 statTicker *time.Ticker - stats ClientStats + stats *ClientStats + + newTowers chan *newTowerMsg + staleTowers chan *staleTowerMsg wg sync.WaitGroup forceQuit chan struct{} @@ -247,6 +324,9 @@ func New(config *Config) (*TowerClient, error) { activeSessions: make(sessionQueueSet), summaries: chanSummaries, statTicker: time.NewTicker(DefaultStatInterval), + stats: new(ClientStats), + newTowers: make(chan *newTowerMsg), + staleTowers: make(chan *staleTowerMsg), forceQuit: make(chan struct{}), } c.negotiator = newSessionNegotiator(&NegotiatorConfig{ @@ -587,19 +667,38 @@ func (c *TowerClient) backupDispatcher() { c.candidateSessions[session.ID] = session c.stats.sessionAcquired() + // We'll continue to choose the newly negotiated + // session as our active session queue. + continue + case <-c.statTicker.C: log.Infof("Client stats: %s", c.stats) - // Instead of looping, we'll jump back into the - // select case and await the delivery of the - // session to prevent us from re-requesting - // additional sessions. - goto awaitSession + // A new tower has been requested to be added. We'll + // update our persisted and in-memory state and consider + // its corresponding sessions, if any, as new + // candidates. + case msg := <-c.newTowers: + msg.errChan <- c.handleNewTower(msg) + + // A tower has been requested to be removed. We'll + // immediately return an error as we want to avoid the + // possibility of a new session being negotiated with + // this request's tower. + case msg := <-c.staleTowers: + msg.errChan <- errors.New("removing towers " + + "is disallowed while a new session " + + "negotiation is in progress") case <-c.forceQuit: return } + // Instead of looping, we'll jump back into the select + // case and await the delivery of the session to prevent + // us from re-requesting additional sessions. + goto awaitSession + // No active session queue but have additional sessions. case c.sessionQueue == nil && len(c.candidateSessions) > 0: // We've exhausted the prior session, we'll pop another @@ -634,7 +733,7 @@ func (c *TowerClient) backupDispatcher() { // we can request new sessions before the session is // fully empty, which this case would handle. case session := <-c.negotiator.NewSessions(): - log.Warnf("Acquired new session with id=%s", + log.Warnf("Acquired new session with id=%s "+ "while processing tasks", session.ID) c.candidateSessions[session.ID] = session c.stats.sessionAcquired() @@ -655,6 +754,20 @@ func (c *TowerClient) backupDispatcher() { c.stats.taskReceived() c.processTask(task) + + // A new tower has been requested to be added. We'll + // update our persisted and in-memory state and consider + // its corresponding sessions, if any, as new + // candidates. + case msg := <-c.newTowers: + msg.errChan <- c.handleNewTower(msg) + + // A tower has been removed, so we'll remove certain + // information that's persisted and also in our + // in-memory state depending on the request, and set any + // of its corresponding candidate sessions as inactive. + case msg := <-c.staleTowers: + msg.errChan <- c.handleStaleTower(msg) } } } @@ -884,6 +997,207 @@ func (c *TowerClient) initActiveQueue(s *wtdb.ClientSession) *sessionQueue { return sq } +// AddTower adds a new watchtower reachable at the given address and considers +// it for new sessions. If the watchtower already exists, then any new addresses +// included will be considered when dialing it for session negotiations and +// backups. +func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error { + errChan := make(chan error, 1) + + select { + case c.newTowers <- &newTowerMsg{ + addr: addr, + errChan: errChan, + }: + case <-c.pipeline.quit: + return ErrClientExiting + case <-c.pipeline.forceQuit: + return ErrClientExiting + } + + select { + case err := <-errChan: + return err + case <-c.pipeline.quit: + return ErrClientExiting + case <-c.pipeline.forceQuit: + return ErrClientExiting + } +} + +// handleNewTower handles a request for a new tower to be added. If the tower +// already exists, then its corresponding sessions, if any, will be set +// considered as candidates. +func (c *TowerClient) handleNewTower(msg *newTowerMsg) error { + // We'll start by updating our persisted state, followed by our + // in-memory state, with the new tower. This might not actually be a new + // tower, but it might include a new address at which it can be reached. + tower, err := c.cfg.DB.CreateTower(msg.addr) + if err != nil { + return err + } + c.candidateTowers.AddCandidate(tower) + + // Include all of its corresponding sessions to our set of candidates. + sessions, err := c.cfg.DB.ListClientSessions(&tower.ID) + if err != nil { + return fmt.Errorf("unable to determine sessions for tower %x: "+ + "%v", tower.IdentityKey.SerializeCompressed(), err) + } + for id, session := range sessions { + c.candidateSessions[id] = session + } + + return nil +} + +// RemoveTower removes a watchtower from being considered for future session +// negotiations and from being used for any subsequent backups until it's added +// again. If an address is provided, then this call only serves as a way of +// removing the address from the watchtower instead. +func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error { + errChan := make(chan error, 1) + + select { + case c.staleTowers <- &staleTowerMsg{ + pubKey: pubKey, + addr: addr, + errChan: errChan, + }: + case <-c.pipeline.quit: + return ErrClientExiting + case <-c.pipeline.forceQuit: + return ErrClientExiting + } + + select { + case err := <-errChan: + return err + case <-c.pipeline.quit: + return ErrClientExiting + case <-c.pipeline.forceQuit: + return ErrClientExiting + } +} + +// handleNewTower handles a request for an existing tower to be removed. If none +// of the tower's sessions have pending updates, then they will become inactive +// and removed as candidates. If the active session queue corresponds to any of +// these sessions, a new one will be negotiated. +func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { + // We'll load the tower before potentially removing it in order to + // retrieve its ID within the database. + tower, err := c.cfg.DB.LoadTower(msg.pubKey) + if err != nil { + return err + } + + // We'll update our persisted state, followed by our in-memory state, + // with the stale tower. + if err := c.cfg.DB.RemoveTower(msg.pubKey, msg.addr); err != nil { + return err + } + c.candidateTowers.RemoveCandidate(tower.ID, msg.addr) + + // If an address was provided, then we're only meant to remove the + // address from the tower, so there's nothing left for us to do. + if msg.addr != nil { + return nil + } + + // Otherwise, the tower should no longer be used for future session + // negotiations and backups. + pubKey := msg.pubKey.SerializeCompressed() + sessions, err := c.cfg.DB.ListClientSessions(&tower.ID) + if err != nil { + return fmt.Errorf("unable to retrieve sessions for tower %x: "+ + "%v", pubKey, err) + } + for sessionID := range sessions { + delete(c.candidateSessions, sessionID) + } + + // If our active session queue corresponds to the stale tower, we'll + // proceed to negotiate a new one. + if c.sessionQueue != nil { + activeTower := c.sessionQueue.towerAddr.IdentityKey.SerializeCompressed() + if bytes.Equal(pubKey, activeTower) { + c.sessionQueue = nil + } + } + + return nil +} + +// RegisteredTowers retrieves the list of watchtowers registered with the +// client. +func (c *TowerClient) RegisteredTowers() ([]*RegisteredTower, error) { + // Retrieve all of our towers along with all of our sessions. + towers, err := c.cfg.DB.ListTowers() + if err != nil { + return nil, err + } + clientSessions, err := c.cfg.DB.ListClientSessions(nil) + if err != nil { + return nil, err + } + + // Construct a lookup map that coalesces all of the sessions for a + // specific watchtower. + towerSessions := make( + map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession, + ) + for id, s := range clientSessions { + sessions, ok := towerSessions[s.TowerID] + if !ok { + sessions = make(map[wtdb.SessionID]*wtdb.ClientSession) + towerSessions[s.TowerID] = sessions + } + sessions[id] = s + } + + registeredTowers := make([]*RegisteredTower, 0, len(towerSessions)) + for _, tower := range towers { + isActive := c.candidateTowers.IsActive(tower.ID) + registeredTowers = append(registeredTowers, &RegisteredTower{ + Tower: tower, + Sessions: towerSessions[tower.ID], + ActiveSessionCandidate: isActive, + }) + } + + return registeredTowers, nil +} + +// LookupTower retrieves a registered watchtower through its public key. +func (c *TowerClient) LookupTower(pubKey *btcec.PublicKey) (*RegisteredTower, error) { + tower, err := c.cfg.DB.LoadTower(pubKey) + if err != nil { + return nil, err + } + + towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID) + if err != nil { + return nil, err + } + + return &RegisteredTower{ + Tower: tower, + Sessions: towerSessions, + ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID), + }, nil +} + +// Stats returns the in-memory statistics of the client since startup. +func (c *TowerClient) Stats() ClientStats { + return c.stats.Copy() +} + +// Policy returns the active client policy configuration. +func (c *TowerClient) Policy() wtpolicy.Policy { + return c.cfg.Policy +} + // logMessage writes information about a message received from a remote peer, // using directional prepositions to signal whether the message was sent or // received. diff --git a/watchtower/wtclient/stats.go b/watchtower/wtclient/stats.go index b3812b703..174a9d3db 100644 --- a/watchtower/wtclient/stats.go +++ b/watchtower/wtclient/stats.go @@ -1,10 +1,15 @@ package wtclient -import "fmt" +import ( + "fmt" + "sync" +) // ClientStats is a collection of in-memory statistics of the actions the client // has performed since its creation. type ClientStats struct { + mu sync.Mutex + // NumTasksReceived is the total number of backups that are pending to // be acknowledged by all active and exhausted watchtower sessions. NumTasksReceived int @@ -29,12 +34,16 @@ type ClientStats struct { // taskReceived increments the number to backup requests the client has received // from active channels. func (s *ClientStats) taskReceived() { + s.mu.Lock() + defer s.mu.Unlock() s.NumTasksReceived++ } // taskAccepted increments the number of tasks that have been assigned to active // session queues, and are awaiting upload to a tower. func (s *ClientStats) taskAccepted() { + s.mu.Lock() + defer s.mu.Unlock() s.NumTasksAccepted++ } @@ -43,25 +52,46 @@ func (s *ClientStats) taskAccepted() { // typically this means that the balance created dust outputs, so it may not be // worth backing up at all. func (s *ClientStats) taskIneligible() { + s.mu.Lock() + defer s.mu.Unlock() s.NumTasksIneligible++ } // sessionAcquired increments the number of sessions that have been successfully // negotiated by the client during this execution. func (s *ClientStats) sessionAcquired() { + s.mu.Lock() + defer s.mu.Unlock() s.NumSessionsAcquired++ } // sessionExhausted increments the number of session that have become full as a // result of accepting backup tasks. func (s *ClientStats) sessionExhausted() { + s.mu.Lock() + defer s.mu.Unlock() s.NumSessionsExhausted++ } // String returns a human readable summary of the client's metrics. -func (s ClientStats) String() string { +func (s *ClientStats) String() string { + s.mu.Lock() + defer s.mu.Unlock() return fmt.Sprintf("tasks(received=%d accepted=%d ineligible=%d) "+ "sessions(acquired=%d exhausted=%d)", s.NumTasksReceived, s.NumTasksAccepted, s.NumTasksIneligible, s.NumSessionsAcquired, s.NumSessionsExhausted) } + +// Copy returns a copy of the current stats. +func (s *ClientStats) Copy() ClientStats { + s.mu.Lock() + defer s.mu.Unlock() + return ClientStats{ + NumTasksReceived: s.NumTasksReceived, + NumTasksAccepted: s.NumTasksAccepted, + NumTasksIneligible: s.NumTasksIneligible, + NumSessionsAcquired: s.NumSessionsAcquired, + NumSessionsExhausted: s.NumSessionsExhausted, + } +}