Merge pull request #6477 from Crypt-iQ/testmultihopflake

invoices: add client to map before delivering backlog notif
This commit is contained in:
Olaoluwa Osuntokun
2022-05-11 16:45:36 -07:00
committed by GitHub
2 changed files with 53 additions and 27 deletions

View File

@@ -104,6 +104,11 @@ compact filters and block/block headers.
arbitrator relying on htlcswitch to be started
first](https://github.com/lightningnetwork/lnd/pull/6214).
* [Fixed an issue where invoice notifications could be missed when using the
SubscribeSingleInvoice or SubscribeNotifications rpcs.](https://github.com/lightningnetwork/lnd/pull/6477)
## Neutrino
* [Fixed crash in MuSig2Combine](https://github.com/lightningnetwork/lnd/pull/6502)
* [Added signature length

View File

@@ -355,12 +355,14 @@ func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
continue
}
if atomic.LoadUint32(&client.canceled) == 1 {
log.Errorf("Client(id=%v) has stopped, skipped "+
"notification for event(pay_hash=%v)",
client.id, payHash)
continue
select {
case <-client.backlogDelivered:
// We won't deliver any events until the backlog has
// went through first.
case <-i.quit:
return
}
client.notify(event)
}
}
@@ -416,11 +418,19 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
}
select {
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
case <-client.backlogDelivered:
// We won't deliver any events until the backlog has
// been processed.
case <-i.quit:
return
}
err := client.notify(&invoiceEvent{
invoice: invoice,
setID: event.setID,
}:
case <-i.quit:
})
if err != nil {
log.Errorf("Failed dispatching to client: %v", err)
return
}
@@ -1411,7 +1421,10 @@ type invoiceSubscriptionKit struct {
canceled uint32 // To be used atomically.
cancelChan chan struct{}
wg sync.WaitGroup
// backlogDelivered is closed when the backlog events have been
// delivered.
backlogDelivered chan struct{}
}
// InvoiceSubscription represents an intent to receive updates for newly added
@@ -1467,13 +1480,15 @@ func (i *invoiceSubscriptionKit) Cancel() {
i.ntfnQueue.Stop()
close(i.cancelChan)
i.wg.Wait()
}
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
select {
case i.ntfnQueue.ChanIn() <- event:
case <-i.cancelChan:
// This can only be triggered by delivery of non-backlog
// events.
return ErrShuttingDown
case <-i.quit:
return ErrShuttingDown
}
@@ -1495,13 +1510,17 @@ func (i *InvoiceRegistry) SubscribeNotifications(
addIndex: addIndex,
settleIndex: settleIndex,
invoiceSubscriptionKit: invoiceSubscriptionKit{
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
backlogDelivered: make(chan struct{}),
},
}
client.ntfnQueue.Start()
// This notifies other goroutines that the backlog phase is over.
defer close(client.backlogDelivered)
// Always increment by 1 first, and our client ID will start with 1,
// not 0.
client.id = atomic.AddUint32(&i.nextClientID, 1)
@@ -1565,6 +1584,10 @@ func (i *InvoiceRegistry) SubscribeNotifications(
}
}()
i.Lock()
i.notificationClients[client.id] = client
i.Unlock()
// Query the database to see if based on the provided addIndex and
// settledIndex we need to deliver any backlog notifications.
err := i.deliverBacklogEvents(client)
@@ -1574,12 +1597,6 @@ func (i *InvoiceRegistry) SubscribeNotifications(
log.Infof("New invoice subscription client: id=%v", client.id)
i.Lock()
// With the backlog notifications delivered (if any), we'll add this to
// our active subscriptions.
i.notificationClients[client.id] = client
i.Unlock()
return client, nil
}
@@ -1591,14 +1608,18 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
client := &SingleInvoiceSubscription{
Updates: make(chan *channeldb.Invoice),
invoiceSubscriptionKit: invoiceSubscriptionKit{
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
backlogDelivered: make(chan struct{}),
},
invoiceRef: channeldb.InvoiceRefByHash(hash),
}
client.ntfnQueue.Start()
// This notifies other goroutines that the backlog phase is done.
defer close(client.backlogDelivered)
// Always increment by 1 first, and our client ID will start with 1,
// not 0.
client.id = atomic.AddUint32(&i.nextClientID, 1)
@@ -1639,6 +1660,10 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
}
}()
i.Lock()
i.singleNotificationClients[client.id] = client
i.Unlock()
err := i.deliverSingleBacklogEvents(client)
if err != nil {
return nil, err
@@ -1647,10 +1672,6 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
log.Infof("New single invoice subscription client: id=%v, ref=%v",
client.id, client.invoiceRef)
i.Lock()
i.singleNotificationClients[client.id] = client
i.Unlock()
return client, nil
}