From 93f87acb5910c68daa4943e2a9bc447f8cbee268 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 4 May 2022 17:38:07 -0400 Subject: [PATCH 1/4] invoices: add client to map before delivering backlog notif Prior to this change, if SubscribeSingleInvoice or SubscribeNotifications was called, it was possible that a state change would never be delivered to the client. The sequence of events would be: - delivery of backlog events with invoice in the Open state - invoice goes to the Accepted state, no client to notify - client added to map This is fixed by adding the client to the map first. However, with this change alone it then becomes possible for notifications to be delivered out of order. This is addressed in a following commit. --- invoices/invoiceregistry.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index 6505e259e..5c158c7e6 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -1543,6 +1543,10 @@ func (i *InvoiceRegistry) SubscribeNotifications( } }() + i.Lock() + i.notificationClients[client.id] = client + i.Unlock() + // Query the database to see if based on the provided addIndex and // settledIndex we need to deliver any backlog notifications. err := i.deliverBacklogEvents(client) @@ -1552,12 +1556,6 @@ func (i *InvoiceRegistry) SubscribeNotifications( log.Infof("New invoice subscription client: id=%v", client.id) - i.Lock() - // With the backlog notifications delivered (if any), we'll add this to - // our active subscriptions. - i.notificationClients[client.id] = client - i.Unlock() - return client, nil } @@ -1617,6 +1615,10 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( } }() + i.Lock() + i.singleNotificationClients[client.id] = client + i.Unlock() + err := i.deliverSingleBacklogEvents(client) if err != nil { return nil, err @@ -1625,10 +1627,6 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( log.Infof("New single invoice subscription client: id=%v, ref=%v", client.id, client.invoiceRef) - i.Lock() - i.singleNotificationClients[client.id] = client - i.Unlock() - return client, nil } From 5ddad90a17dee6c943c81c4be474e3c3792791d1 Mon Sep 17 00:00:00 2001 From: eugene Date: Wed, 4 May 2022 17:46:13 -0400 Subject: [PATCH 2/4] invoices: fix OOO notifications, use cancelChan instead of canceled This commit fixes OOO notifications between backlog and non-backlog events by having the non-backlog goroutines wait on a chan that signals that backlog processing is complete. This commit also replaces usage of the canceled atomic variable with the cancelChan to signal that delivery of an event should no longer occur. Atomics do not make perfect "sequence" points as the atomic may be checked too early and the end-result of delivering to a stopped ntfnQueue is the same. Using the cancelChan ensures that we do not hang on sending to ntfnQueue. --- invoices/invoiceregistry.go | 57 +++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index 5c158c7e6..99ed3aefd 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -355,12 +355,14 @@ func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) { continue } - if atomic.LoadUint32(&client.canceled) == 1 { - log.Errorf("Client(id=%v) has stopped, skipped "+ - "notification for event(pay_hash=%v)", - client.id, payHash) - continue + select { + case <-client.backlogDelivered: + // We won't deliver any events until the backlog has + // went through first. + case <-i.quit: + return } + client.notify(event) } } @@ -416,11 +418,19 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { } select { - case client.ntfnQueue.ChanIn() <- &invoiceEvent{ + case <-client.backlogDelivered: + // We won't deliver any events until the backlog has + // been processed. + case <-i.quit: + return + } + + err := client.notify(&invoiceEvent{ invoice: invoice, setID: event.setID, - }: - case <-i.quit: + }) + if err != nil { + log.Errorf("Failed dispatching to client: %v", err) return } @@ -1389,7 +1399,12 @@ type invoiceSubscriptionKit struct { canceled uint32 // To be used atomically. cancelChan chan struct{} - wg sync.WaitGroup + + // backlogDelivered is closed when the backlog events have been + // delivered. + backlogDelivered chan struct{} + + wg sync.WaitGroup } // InvoiceSubscription represents an intent to receive updates for newly added @@ -1452,6 +1467,10 @@ func (i *invoiceSubscriptionKit) Cancel() { func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error { select { case i.ntfnQueue.ChanIn() <- event: + case <-i.cancelChan: + // This can only be triggered by delivery of non-backlog + // events. + return ErrShuttingDown case <-i.quit: return ErrShuttingDown } @@ -1473,13 +1492,17 @@ func (i *InvoiceRegistry) SubscribeNotifications( addIndex: addIndex, settleIndex: settleIndex, invoiceSubscriptionKit: invoiceSubscriptionKit{ - quit: i.quit, - ntfnQueue: queue.NewConcurrentQueue(20), - cancelChan: make(chan struct{}), + quit: i.quit, + ntfnQueue: queue.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), + backlogDelivered: make(chan struct{}), }, } client.ntfnQueue.Start() + // This notifies other goroutines that the backlog phase is over. + defer close(client.backlogDelivered) + // Always increment by 1 first, and our client ID will start with 1, // not 0. client.id = atomic.AddUint32(&i.nextClientID, 1) @@ -1567,14 +1590,18 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( client := &SingleInvoiceSubscription{ Updates: make(chan *channeldb.Invoice), invoiceSubscriptionKit: invoiceSubscriptionKit{ - quit: i.quit, - ntfnQueue: queue.NewConcurrentQueue(20), - cancelChan: make(chan struct{}), + quit: i.quit, + ntfnQueue: queue.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), + backlogDelivered: make(chan struct{}), }, invoiceRef: channeldb.InvoiceRefByHash(hash), } client.ntfnQueue.Start() + // This notifies other goroutines that the backlog phase is done. + defer close(client.backlogDelivered) + // Always increment by 1 first, and our client ID will start with 1, // not 0. client.id = atomic.AddUint32(&i.nextClientID, 1) From d0999913396cb5ea7a639f09f09387ef757e6e1a Mon Sep 17 00:00:00 2001 From: eugene Date: Thu, 5 May 2022 11:06:08 -0400 Subject: [PATCH 3/4] invoices: remove unused invoiceSubscriptionKit wait group --- invoices/invoiceregistry.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index 99ed3aefd..6c26e5b0c 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -1403,8 +1403,6 @@ type invoiceSubscriptionKit struct { // backlogDelivered is closed when the backlog events have been // delivered. backlogDelivered chan struct{} - - wg sync.WaitGroup } // InvoiceSubscription represents an intent to receive updates for newly added @@ -1460,8 +1458,6 @@ func (i *invoiceSubscriptionKit) Cancel() { i.ntfnQueue.Stop() close(i.cancelChan) - - i.wg.Wait() } func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error { From 3a5d2804bbf09e2b2e374cda56787dade311f709 Mon Sep 17 00:00:00 2001 From: eugene Date: Thu, 5 May 2022 11:06:24 -0400 Subject: [PATCH 4/4] release-notes: update for 0.15.0 --- docs/release-notes/release-notes-0.15.0.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release-notes/release-notes-0.15.0.md b/docs/release-notes/release-notes-0.15.0.md index 3e07c9ef6..3bc4290b1 100644 --- a/docs/release-notes/release-notes-0.15.0.md +++ b/docs/release-notes/release-notes-0.15.0.md @@ -104,6 +104,11 @@ compact filters and block/block headers. arbitrator relying on htlcswitch to be started first](https://github.com/lightningnetwork/lnd/pull/6214). +* [Fixed an issue where invoice notifications could be missed when using the + SubscribeSingleInvoice or SubscribeNotifications rpcs.](https://github.com/lightningnetwork/lnd/pull/6477) + +## Neutrino + * [Fixed crash in MuSig2Combine](https://github.com/lightningnetwork/lnd/pull/6502) * [Added signature length