invoices: fix OOO notifications, use cancelChan instead of canceled

This commit fixes OOO notifications between backlog and non-backlog
events by having the non-backlog goroutines wait on a chan that
signals that backlog processing is complete.

This commit also replaces usage of the canceled atomic variable with
the cancelChan to signal that delivery of an event should no longer
occur. Atomics do not make perfect "sequence" points as the atomic
may be checked too early and the end-result of delivering to a stopped
ntfnQueue is the same. Using the cancelChan ensures that we do not
hang on sending to ntfnQueue.
This commit is contained in:
eugene
2022-05-04 17:46:13 -04:00
parent 93f87acb59
commit 5ddad90a17

View File

@@ -355,12 +355,14 @@ func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
continue
}
if atomic.LoadUint32(&client.canceled) == 1 {
log.Errorf("Client(id=%v) has stopped, skipped "+
"notification for event(pay_hash=%v)",
client.id, payHash)
continue
select {
case <-client.backlogDelivered:
// We won't deliver any events until the backlog has
// went through first.
case <-i.quit:
return
}
client.notify(event)
}
}
@@ -416,11 +418,19 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
}
select {
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
case <-client.backlogDelivered:
// We won't deliver any events until the backlog has
// been processed.
case <-i.quit:
return
}
err := client.notify(&invoiceEvent{
invoice: invoice,
setID: event.setID,
}:
case <-i.quit:
})
if err != nil {
log.Errorf("Failed dispatching to client: %v", err)
return
}
@@ -1389,7 +1399,12 @@ type invoiceSubscriptionKit struct {
canceled uint32 // To be used atomically.
cancelChan chan struct{}
wg sync.WaitGroup
// backlogDelivered is closed when the backlog events have been
// delivered.
backlogDelivered chan struct{}
wg sync.WaitGroup
}
// InvoiceSubscription represents an intent to receive updates for newly added
@@ -1452,6 +1467,10 @@ func (i *invoiceSubscriptionKit) Cancel() {
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
select {
case i.ntfnQueue.ChanIn() <- event:
case <-i.cancelChan:
// This can only be triggered by delivery of non-backlog
// events.
return ErrShuttingDown
case <-i.quit:
return ErrShuttingDown
}
@@ -1473,13 +1492,17 @@ func (i *InvoiceRegistry) SubscribeNotifications(
addIndex: addIndex,
settleIndex: settleIndex,
invoiceSubscriptionKit: invoiceSubscriptionKit{
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
backlogDelivered: make(chan struct{}),
},
}
client.ntfnQueue.Start()
// This notifies other goroutines that the backlog phase is over.
defer close(client.backlogDelivered)
// Always increment by 1 first, and our client ID will start with 1,
// not 0.
client.id = atomic.AddUint32(&i.nextClientID, 1)
@@ -1567,14 +1590,18 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
client := &SingleInvoiceSubscription{
Updates: make(chan *channeldb.Invoice),
invoiceSubscriptionKit: invoiceSubscriptionKit{
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
backlogDelivered: make(chan struct{}),
},
invoiceRef: channeldb.InvoiceRefByHash(hash),
}
client.ntfnQueue.Start()
// This notifies other goroutines that the backlog phase is done.
defer close(client.backlogDelivered)
// Always increment by 1 first, and our client ID will start with 1,
// not 0.
client.id = atomic.AddUint32(&i.nextClientID, 1)