Merge pull request #6053 from yyforyongyu/itest-fix-invoice-registry

invoice: fix inactive client not cleaned from invoice registry
This commit is contained in:
Oliver Gugger 2022-01-03 13:21:01 +01:00 committed by GitHub
commit c2c96d907c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 108 deletions

View File

@ -10,6 +10,14 @@
* Add [auto-generated command-line completions](https://github.com/lightningnetwork/lnd/pull/4177)
for Fish shell.
## Bug Fixes
* [Fixed an inactive invoice subscription not removed from invoice
registry](https://github.com/lightningnetwork/lnd/pull/6053). When an invoice
subscription is created and canceled immediately, it could be left uncleaned
due to the cancel signal is processed before the creation. It is now properly
handled by moving creation before deletion.
## Misc
* [An example systemd service file](https://github.com/lightningnetwork/lnd/pull/6033)
@ -47,3 +55,4 @@
* ErikEk
* Liviu
* Torkel Rogstad
* Yong Yu

View File

@ -101,22 +101,22 @@ func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
type InvoiceRegistry struct {
sync.RWMutex
nextClientID uint32 // must be used atomically
cdb *channeldb.DB
// cfg contains the registry's configuration parameters.
cfg *RegistryConfig
clientMtx sync.Mutex
nextClientID uint32
notificationClients map[uint32]*InvoiceSubscription
notificationClients map[uint32]*InvoiceSubscription
// TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
// performance.
singleNotificationClients map[uint32]*SingleInvoiceSubscription
newSubscriptions chan *InvoiceSubscription
subscriptionCancels chan uint32
// invoiceEvents is a single channel over which both invoice updates and
// new single invoice subscriptions are carried.
invoiceEvents chan interface{}
// invoiceEvents is a single channel over which invoice updates are
// carried.
invoiceEvents chan *invoiceEvent
// subscriptions is a map from a circuit key to a list of subscribers.
// It is used for efficient notification of links.
@ -147,9 +147,7 @@ func NewRegistry(cdb *channeldb.DB, expiryWatcher *InvoiceExpiryWatcher,
cdb: cdb,
notificationClients: make(map[uint32]*InvoiceSubscription),
singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription),
newSubscriptions: make(chan *InvoiceSubscription),
subscriptionCancels: make(chan uint32),
invoiceEvents: make(chan interface{}, 100),
invoiceEvents: make(chan *invoiceEvent, 100),
hodlSubscriptions: make(map[channeldb.CircuitKey]map[chan<- interface{}]struct{}),
hodlReverseSubscriptions: make(map[chan<- interface{}]map[channeldb.CircuitKey]struct{}),
cfg: cfg,
@ -301,61 +299,18 @@ func (i *InvoiceRegistry) invoiceEventLoop() {
}
select {
// A new invoice subscription for all invoices has just arrived!
// We'll query for any backlog notifications, then add it to the
// set of clients.
case newClient := <-i.newSubscriptions:
log.Infof("New invoice subscription "+
"client: id=%v", newClient.id)
// With the backlog notifications delivered (if any),
// we'll add this to our active subscriptions and
// continue.
i.notificationClients[newClient.id] = newClient
// A client no longer wishes to receive invoice notifications.
// So we'll remove them from the set of active clients.
case clientID := <-i.subscriptionCancels:
log.Infof("Cancelling invoice subscription for "+
"client=%v", clientID)
delete(i.notificationClients, clientID)
delete(i.singleNotificationClients, clientID)
// An invoice event has come in. This can either be an update to
// an invoice or a new single invoice subscriber. Both type of
// events are passed in via the same channel, to make sure that
// subscribers get a consistent view of the event sequence.
// A sub-systems has just modified the invoice state, so we'll
// dispatch notifications to all registered clients.
case event := <-i.invoiceEvents:
switch e := event.(type) {
// For backwards compatibility, do not notify all
// invoice subscribers of cancel and accept events.
state := event.invoice.State
if state != channeldb.ContractCanceled &&
state != channeldb.ContractAccepted {
// A sub-systems has just modified the invoice state, so
// we'll dispatch notifications to all registered
// clients.
case *invoiceEvent:
// For backwards compatibility, do not notify
// all invoice subscribers of cancel and accept
// events.
state := e.invoice.State
if state != channeldb.ContractCanceled &&
state != channeldb.ContractAccepted {
i.dispatchToClients(e)
}
i.dispatchToSingleClients(e)
// A new single invoice subscription has arrived. Add it
// to the set of clients. It is important to do this in
// sequence with any other invoice events, because an
// initial invoice update has already been sent out to
// the subscriber.
case *SingleInvoiceSubscription:
log.Infof("New single invoice subscription "+
"client: id=%v, ref=%v", e.id,
e.invoiceRef)
i.singleNotificationClients[e.id] = e
i.dispatchToClients(event)
}
i.dispatchToSingleClients(event)
// A new htlc came in for auto-release.
case event := <-i.htlcAutoReleaseChan:
@ -386,27 +341,36 @@ func (i *InvoiceRegistry) invoiceEventLoop() {
}
}
// dispatchToSingleClients passes the supplied event to all notification clients
// that subscribed to all the invoice this event applies to.
// dispatchToSingleClients passes the supplied event to all notification
// clients that subscribed to all the invoice this event applies to.
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
// Dispatch to single invoice subscribers.
for _, client := range i.singleNotificationClients {
clients := i.copySingleClients()
for _, client := range clients {
payHash := client.invoiceRef.PayHash()
if payHash == nil || *payHash != event.hash {
continue
}
if atomic.LoadUint32(&client.canceled) == 1 {
log.Errorf("Client(id=%v) has stopped, skipped "+
"notification for event(pay_hash=%v)",
client.id, payHash)
continue
}
client.notify(event)
}
}
// dispatchToClients passes the supplied event to all notification clients that
// subscribed to all invoices. Add and settle indices are used to make sure that
// clients don't receive duplicate or unwanted events.
// subscribed to all invoices. Add and settle indices are used to make sure
// that clients don't receive duplicate or unwanted events.
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
invoice := event.invoice
for clientID, client := range i.notificationClients {
clients := i.copyClients()
for clientID, client := range clients {
// Before we dispatch this event, we'll check
// to ensure that this client hasn't already
// received this notification in order to
@ -568,6 +532,9 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents(
return err
}
log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
client.id, payHash)
return nil
}
@ -1404,8 +1371,11 @@ func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
// invoiceSubscriptionKit defines that are common to both all invoice
// subscribers and single invoice subscribers.
type invoiceSubscriptionKit struct {
id uint32
inv *InvoiceRegistry
id uint32 // nolint:structcheck
// quit is a chan mouted to InvoiceRegistry that signals a shutdown.
quit chan struct{}
ntfnQueue *queue.ConcurrentQueue
canceled uint32 // To be used atomically.
@ -1464,11 +1434,6 @@ func (i *invoiceSubscriptionKit) Cancel() {
return
}
select {
case i.inv.subscriptionCancels <- i.id:
case <-i.inv.quit:
}
i.ntfnQueue.Stop()
close(i.cancelChan)
@ -1478,7 +1443,7 @@ func (i *invoiceSubscriptionKit) Cancel() {
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
select {
case i.ntfnQueue.ChanIn() <- event:
case <-i.inv.quit:
case <-i.quit:
return ErrShuttingDown
}
@ -1499,17 +1464,16 @@ func (i *InvoiceRegistry) SubscribeNotifications(
addIndex: addIndex,
settleIndex: settleIndex,
invoiceSubscriptionKit: invoiceSubscriptionKit{
inv: i,
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
},
}
client.ntfnQueue.Start()
i.clientMtx.Lock()
client.id = i.nextClientID
i.nextClientID++
i.clientMtx.Unlock()
// Always increment by 1 first, and our client ID will start with 1,
// not 0.
client.id = atomic.AddUint32(&i.nextClientID, 1)
// Before we register this new invoice subscription, we'll launch a new
// goroutine that will proxy all notifications appended to the end of
@ -1518,6 +1482,7 @@ func (i *InvoiceRegistry) SubscribeNotifications(
i.wg.Add(1)
go func() {
defer i.wg.Done()
defer i.deleteClient(client.id)
for {
select {
@ -1569,9 +1534,6 @@ func (i *InvoiceRegistry) SubscribeNotifications(
}
}()
i.Lock()
defer i.Unlock()
// Query the database to see if based on the provided addIndex and
// settledIndex we need to deliver any backlog notifications.
err := i.deliverBacklogEvents(client)
@ -1579,11 +1541,13 @@ func (i *InvoiceRegistry) SubscribeNotifications(
return nil, err
}
select {
case i.newSubscriptions <- client:
case <-i.quit:
return nil, ErrShuttingDown
}
log.Infof("New invoice subscription client: id=%v", client.id)
i.Lock()
// With the backlog notifications delivered (if any), we'll add this to
// our active subscriptions.
i.notificationClients[client.id] = client
i.Unlock()
return client, nil
}
@ -1596,7 +1560,7 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
client := &SingleInvoiceSubscription{
Updates: make(chan *channeldb.Invoice),
invoiceSubscriptionKit: invoiceSubscriptionKit{
inv: i,
quit: i.quit,
ntfnQueue: queue.NewConcurrentQueue(20),
cancelChan: make(chan struct{}),
},
@ -1604,10 +1568,9 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
}
client.ntfnQueue.Start()
i.clientMtx.Lock()
client.id = i.nextClientID
i.nextClientID++
i.clientMtx.Unlock()
// Always increment by 1 first, and our client ID will start with 1,
// not 0.
client.id = atomic.AddUint32(&i.nextClientID, 1)
// Before we register this new invoice subscription, we'll launch a new
// goroutine that will proxy all notifications appended to the end of
@ -1616,6 +1579,7 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
i.wg.Add(1)
go func() {
defer i.wg.Done()
defer i.deleteClient(client.id)
for {
select {
@ -1644,22 +1608,17 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
}
}()
// Within the lock, we both query the invoice state and pass the client
// subscription to the invoiceEvents channel. This is to make sure that
// the client receives a consistent stream of events.
i.Lock()
defer i.Unlock()
err := i.deliverSingleBacklogEvents(client)
if err != nil {
return nil, err
}
select {
case i.invoiceEvents <- client:
case <-i.quit:
return nil, ErrShuttingDown
}
log.Infof("New single invoice subscription client: id=%v, ref=%v",
client.id, client.invoiceRef)
i.Lock()
i.singleNotificationClients[client.id] = client
i.Unlock()
return client, nil
}
@ -1724,3 +1683,40 @@ func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
delete(i.hodlReverseSubscriptions, subscriber)
}
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
// useful when we need to iterate the map to send notifications.
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription {
i.RLock()
defer i.RUnlock()
clients := make(map[uint32]*SingleInvoiceSubscription)
for k, v := range i.singleNotificationClients {
clients[k] = v
}
return clients
}
// copyClients copies i.notificationClients inside a lock. This is useful when
// we need to iterate the map to send notifications.
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
i.RLock()
defer i.RUnlock()
clients := make(map[uint32]*InvoiceSubscription)
for k, v := range i.notificationClients {
clients[k] = v
}
return clients
}
// deleteClient removes a client by its ID inside a lock. Noop if the client is
// not found.
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
i.Lock()
defer i.Unlock()
log.Infof("Cancelling invoice subscription for client=%v", clientID)
delete(i.notificationClients, clientID)
delete(i.singleNotificationClients, clientID)
}

View File

@ -244,6 +244,8 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest,
}
defer invoiceClient.Cancel()
log.Debugf("Created new single invoice(pay_hash=%v) subscription", hash)
for {
select {
case newInvoice := <-invoiceClient.Updates:
@ -265,7 +267,9 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest,
}
case <-updateStream.Context().Done():
return updateStream.Context().Err()
return fmt.Errorf("subscription for "+
"invoice(pay_hash=%v): %w", hash,
updateStream.Context().Err())
case <-s.quit:
return nil