wtclient: make sessionQueueSet thread safe

In preparation for an upcoming commit where multiple threads will have
access to the TowerClient sessionQueueSet, we turn it into a thread safe
struct.
This commit is contained in:
Elle Mouton 2023-03-23 09:45:23 +02:00
parent c4fec3ebc9
commit 25c4d3f1f7
No known key found for this signature in database
GPG Key ID: D7D916376026F177
2 changed files with 56 additions and 14 deletions

View File

@ -306,7 +306,7 @@ type TowerClient struct {
negotiator SessionNegotiator
candidateTowers TowerCandidateIterator
candidateSessions map[wtdb.SessionID]*ClientSession
activeSessions sessionQueueSet
activeSessions *sessionQueueSet
sessionQueue *sessionQueue
prevTask *wtdb.BackupID
@ -378,7 +378,7 @@ func New(config *Config) (*TowerClient, error) {
log: plog,
pipeline: queue,
chanCommitHeights: make(map[lnwire.ChannelID]uint64),
activeSessions: make(sessionQueueSet),
activeSessions: newSessionQueueSet(),
summaries: chanSummaries,
closableSessionQueue: newSessionCloseMinHeap(),
statTicker: time.NewTicker(DefaultStatInterval),
@ -1609,7 +1609,7 @@ func (c *TowerClient) newSessionQueue(s *ClientSession,
func (c *TowerClient) getOrInitActiveQueue(s *ClientSession,
updates []wtdb.CommittedUpdate) *sessionQueue {
if sq, ok := c.activeSessions[s.ID]; ok {
if sq, ok := c.activeSessions.Get(s.ID); ok {
return sq
}
@ -1628,12 +1628,10 @@ func (c *TowerClient) initActiveQueue(s *ClientSession,
sq := c.newSessionQueue(s, updates)
// Add the session queue as an active session so that we remember to
// stop it on shutdown.
c.activeSessions.Add(sq)
// Start the queue so that it can be active in processing newly assigned
// tasks or to upload previously committed updates.
sq.Start()
// stop it on shutdown. This method will also start the queue so that it
// can be active in processing newly assigned tasks or to upload
// previously committed updates.
c.activeSessions.AddAndStart(sq)
return sq
}

View File

@ -682,19 +682,63 @@ func (q *sessionQueue) signalUntilShutdown() {
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
// sessionQueue.
type sessionQueueSet map[wtdb.SessionID]*sessionQueue
type sessionQueueSet struct {
queues map[wtdb.SessionID]*sessionQueue
mu sync.Mutex
}
// Add inserts a sessionQueue into the sessionQueueSet.
func (s *sessionQueueSet) Add(sessionQueue *sessionQueue) {
(*s)[*sessionQueue.ID()] = sessionQueue
// newSessionQueueSet constructs a new sessionQueueSet.
func newSessionQueueSet() *sessionQueueSet {
return &sessionQueueSet{
queues: make(map[wtdb.SessionID]*sessionQueue),
}
}
// AddAndStart inserts a sessionQueue into the sessionQueueSet and starts it.
func (s *sessionQueueSet) AddAndStart(sessionQueue *sessionQueue) {
s.mu.Lock()
defer s.mu.Unlock()
s.queues[*sessionQueue.ID()] = sessionQueue
sessionQueue.Start()
}
// StopAndRemove stops the given session queue and removes it from the
// sessionQueueSet.
func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) {
s.mu.Lock()
defer s.mu.Unlock()
queue, ok := s.queues[id]
if !ok {
return
}
queue.Stop()
delete(s.queues, id)
}
// Get fetches and returns the sessionQueue with the given ID.
func (s *sessionQueueSet) Get(id wtdb.SessionID) (*sessionQueue, bool) {
s.mu.Lock()
defer s.mu.Unlock()
q, ok := s.queues[id]
return q, ok
}
// ApplyAndWait executes the nil-adic function returned from getApply for each
// sessionQueue in the set in parallel, then waits for all of them to finish
// before returning to the caller.
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
s.mu.Lock()
defer s.mu.Unlock()
var wg sync.WaitGroup
for _, sessionq := range *s {
for _, sessionq := range s.queues {
wg.Add(1)
go func(sq *sessionQueue) {
defer wg.Done()