From 6a02fa1107b82a69afba1fed16bdd362e06c771a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 19 May 2020 20:31:52 -0700 Subject: [PATCH] invoices/invoiceregistry: properly synchronize backlog This commit moves the db calls for retrieving add and settle backlogs outide of the main event loop. All other db operations are performed outside of the event loop and synchronized via the invoice registry's mutex, which also synchronizes the order in which events submitted to be processed. This resolves various concurrency issues where notifications can be missed of inconsistent reads against the databse. This is especially important in this case because we are actually making two separate database calls. --- invoices/invoiceregistry.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index b0e06a16b..520e75823 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -234,15 +234,6 @@ func (i *InvoiceRegistry) invoiceEventLoop() { // We'll query for any backlog notifications, then add it to the // set of clients. case newClient := <-i.newSubscriptions: - // Before we add the client to our set of active - // clients, we'll first attempt to deliver any backlog - // invoice events. - err := i.deliverBacklogEvents(newClient) - if err != nil { - log.Errorf("unable to deliver backlog invoice "+ - "notifications: %v", err) - } - log.Infof("New invoice subscription "+ "client: id=%v", newClient.id) @@ -410,9 +401,6 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { // deliverBacklogEvents will attempts to query the invoice database for any // notifications that the client has missed since it reconnected last. func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error { - // First, we'll query the database to see if based on the provided - // addIndex and settledIndex we need to deliver any backlog - // notifications. addEvents, err := i.cdb.InvoicesAddedSince(client.addIndex) if err != nil { return err @@ -1253,6 +1241,16 @@ func (i *InvoiceRegistry) SubscribeNotifications( } }() + i.Lock() + defer i.Unlock() + + // Query the database to see if based on the provided addIndex and + // settledIndex we need to deliver any backlog notifications. + err := i.deliverBacklogEvents(client) + if err != nil { + return nil, err + } + select { case i.newSubscriptions <- client: case <-i.quit: