From 5ddad90a17dee6c943c81c4be474e3c3792791d1 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 4 May 2022 17:46:13 -0400 Subject: [PATCH] invoices: fix OOO notifications, use cancelChan instead of canceled This commit fixes OOO notifications between backlog and non-backlog events by having the non-backlog goroutines wait on a chan that signals that backlog processing is complete. This commit also replaces usage of the canceled atomic variable with the cancelChan to signal that delivery of an event should no longer occur. Atomics do not make perfect "sequence" points as the atomic may be checked too early and the end-result of delivering to a stopped ntfnQueue is the same. Using the cancelChan ensures that we do not hang on sending to ntfnQueue. --- invoices/invoiceregistry.go | 57 +++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index 5c158c7e6..99ed3aefd 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -355,12 +355,14 @@ func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) { continue } - if atomic.LoadUint32(&client.canceled) == 1 { - log.Errorf("Client(id=%v) has stopped, skipped "+ - "notification for event(pay_hash=%v)", - client.id, payHash) - continue + select { + case <-client.backlogDelivered: + // We won't deliver any events until the backlog has + // went through first. + case <-i.quit: + return } + client.notify(event) } } @@ -416,11 +418,19 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { } select { - case client.ntfnQueue.ChanIn() <- &invoiceEvent{ + case <-client.backlogDelivered: + // We won't deliver any events until the backlog has + // been processed. + case <-i.quit: + return + } + + err := client.notify(&invoiceEvent{ invoice: invoice, setID: event.setID, - }: - case <-i.quit: + }) + if err != nil { + log.Errorf("Failed dispatching to client: %v", err) return } @@ -1389,7 +1399,12 @@ type invoiceSubscriptionKit struct { canceled uint32 // To be used atomically. cancelChan chan struct{} - wg sync.WaitGroup + + // backlogDelivered is closed when the backlog events have been + // delivered. + backlogDelivered chan struct{} + + wg sync.WaitGroup } // InvoiceSubscription represents an intent to receive updates for newly added @@ -1452,6 +1467,10 @@ func (i *invoiceSubscriptionKit) Cancel() { func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error { select { case i.ntfnQueue.ChanIn() <- event: + case <-i.cancelChan: + // This can only be triggered by delivery of non-backlog + // events. + return ErrShuttingDown case <-i.quit: return ErrShuttingDown } @@ -1473,13 +1492,17 @@ func (i *InvoiceRegistry) SubscribeNotifications( addIndex: addIndex, settleIndex: settleIndex, invoiceSubscriptionKit: invoiceSubscriptionKit{ - quit: i.quit, - ntfnQueue: queue.NewConcurrentQueue(20), - cancelChan: make(chan struct{}), + quit: i.quit, + ntfnQueue: queue.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), + backlogDelivered: make(chan struct{}), }, } client.ntfnQueue.Start() + // This notifies other goroutines that the backlog phase is over. + defer close(client.backlogDelivered) + // Always increment by 1 first, and our client ID will start with 1, // not 0. client.id = atomic.AddUint32(&i.nextClientID, 1) @@ -1567,14 +1590,18 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( client := &SingleInvoiceSubscription{ Updates: make(chan *channeldb.Invoice), invoiceSubscriptionKit: invoiceSubscriptionKit{ - quit: i.quit, - ntfnQueue: queue.NewConcurrentQueue(20), - cancelChan: make(chan struct{}), + quit: i.quit, + ntfnQueue: queue.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), + backlogDelivered: make(chan struct{}), }, invoiceRef: channeldb.InvoiceRefByHash(hash), } client.ntfnQueue.Start() + // This notifies other goroutines that the backlog phase is done. + defer close(client.backlogDelivered) + // Always increment by 1 first, and our client ID will start with 1, // not 0. client.id = atomic.AddUint32(&i.nextClientID, 1)