diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 85d7e19f2..98e930e4f 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -306,7 +306,7 @@ type TowerClient struct { negotiator SessionNegotiator candidateTowers TowerCandidateIterator candidateSessions map[wtdb.SessionID]*ClientSession - activeSessions sessionQueueSet + activeSessions *sessionQueueSet sessionQueue *sessionQueue prevTask *wtdb.BackupID @@ -378,7 +378,7 @@ func New(config *Config) (*TowerClient, error) { log: plog, pipeline: queue, chanCommitHeights: make(map[lnwire.ChannelID]uint64), - activeSessions: make(sessionQueueSet), + activeSessions: newSessionQueueSet(), summaries: chanSummaries, closableSessionQueue: newSessionCloseMinHeap(), statTicker: time.NewTicker(DefaultStatInterval), @@ -1609,7 +1609,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession, func (c *TowerClient) getOrInitActiveQueue(s *ClientSession, updates []wtdb.CommittedUpdate) *sessionQueue { - if sq, ok := c.activeSessions[s.ID]; ok { + if sq, ok := c.activeSessions.Get(s.ID); ok { return sq } @@ -1628,12 +1628,10 @@ func (c *TowerClient) initActiveQueue(s *ClientSession, sq := c.newSessionQueue(s, updates) // Add the session queue as an active session so that we remember to - // stop it on shutdown. - c.activeSessions.Add(sq) - - // Start the queue so that it can be active in processing newly assigned - // tasks or to upload previously committed updates. - sq.Start() + // stop it on shutdown. This method will also start the queue so that it + // can be active in processing newly assigned tasks or to upload + // previously committed updates. + c.activeSessions.AddAndStart(sq) return sq } diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index aa06709ee..d33f94f5b 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -682,19 +682,63 @@ func (q *sessionQueue) signalUntilShutdown() { // sessionQueueSet maintains a mapping of SessionIDs to their corresponding // sessionQueue. -type sessionQueueSet map[wtdb.SessionID]*sessionQueue +type sessionQueueSet struct { + queues map[wtdb.SessionID]*sessionQueue + mu sync.Mutex +} -// Add inserts a sessionQueue into the sessionQueueSet. -func (s *sessionQueueSet) Add(sessionQueue *sessionQueue) { - (*s)[*sessionQueue.ID()] = sessionQueue +// newSessionQueueSet constructs a new sessionQueueSet. +func newSessionQueueSet() *sessionQueueSet { + return &sessionQueueSet{ + queues: make(map[wtdb.SessionID]*sessionQueue), + } +} + +// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it. +func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) { + s.mu.Lock() + defer s.mu.Unlock() + + s.queues[*sessionQueue.ID()] = sessionQueue + + sessionQueue.Start() +} + +// StopAndRemove stops the given session queue and removes it from the +// sessionQueueSet. +func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) { + s.mu.Lock() + defer s.mu.Unlock() + + queue, ok := s.queues[id] + if !ok { + return + } + + queue.Stop() + + delete(s.queues, id) +} + +// Get fetches and returns the sessionQueue with the given ID. +func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) { + s.mu.Lock() + defer s.mu.Unlock() + + q, ok := s.queues[id] + + return q, ok } // ApplyAndWait executes the nil-adic function returned from getApply for each // sessionQueue in the set in parallel, then waits for all of them to finish // before returning to the caller. func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) { + s.mu.Lock() + defer s.mu.Unlock() + var wg sync.WaitGroup - for _, sessionq := range *s { + for _, sessionq := range s.queues { wg.Add(1) go func(sq *sessionQueue) { defer wg.Done()