diff --git a/invoiceregistry.go b/invoiceregistry.go index 283c5359e..a49a85817 100644 --- a/invoiceregistry.go +++ b/invoiceregistry.go @@ -3,10 +3,13 @@ package main import ( "bytes" "crypto/sha256" + "fmt" "sync" + "sync/atomic" "time" "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/zpay32" @@ -37,10 +40,17 @@ type invoiceRegistry struct { nextClientID uint32 notificationClients map[uint32]*invoiceSubscription + newSubscriptions chan *invoiceSubscription + subscriptionCancels chan uint32 + invoiceEvents chan *invoiceEvent + // debugInvoices is a map which stores special "debug" invoices which // should be only created/used when manual tests require an invoice // that *all* nodes are able to fully settle. debugInvoices map[chainhash.Hash]*channeldb.Invoice + + wg sync.WaitGroup + quit chan struct{} } // newInvoiceRegistry creates a new invoice registry. The invoice registry @@ -52,9 +62,137 @@ func newInvoiceRegistry(cdb *channeldb.DB) *invoiceRegistry { cdb: cdb, debugInvoices: make(map[chainhash.Hash]*channeldb.Invoice), notificationClients: make(map[uint32]*invoiceSubscription), + newSubscriptions: make(chan *invoiceSubscription), + subscriptionCancels: make(chan uint32), + invoiceEvents: make(chan *invoiceEvent), + quit: make(chan struct{}), } } +// Start starts the registry and all goroutines it needs to carry out its task. +func (i *invoiceRegistry) Start() error { + i.wg.Add(1) + + go i.invoiceEventNotifier() + + return nil +} + +// Stop signals the registry for a graceful shutdown. +func (i *invoiceRegistry) Stop() { + close(i.quit) + + i.wg.Wait() +} + +// invoiceEvent represents a new event that has modified on invoice on disk. +// Only two event types are currently supported: newly created invoices, and +// instance where invoices are settled. +type invoiceEvent struct { + isSettle bool + + invoice *channeldb.Invoice +} + +// invoiceEventNotifier is the dedicated goroutine responsible for accepting +// new notification subscriptions, cancelling old subscriptions, and +// dispatching new invoice events. +func (i *invoiceRegistry) invoiceEventNotifier() { + defer i.wg.Done() + + for { + select { + // A new invoice subscription has just arrived! We'll query for + // any backlog notifications, then add it to the set of + // clients. + case newClient := <-i.newSubscriptions: + // Before we add the client to our set of active + // clients, we'll first attempt to deliver any backlog + // invoice events. + err := i.deliverBacklogEvents(newClient) + if err != nil { + ltndLog.Errorf("unable to deliver backlog invoice "+ + "notifications: %v", err) + } + + ltndLog.Infof("New invoice subscription "+ + "client: id=%v", newClient.id) + + // With the backlog notifications delivered (if any), + // we'll add this to our active subscriptions and + // continue. + i.notificationClients[newClient.id] = newClient + + // A client no longer wishes to receive invoice notifications. + // So we'll remove them from the set of active clients. + case clientID := <-i.subscriptionCancels: + ltndLog.Infof("Cancelling invoice subscription for "+ + "client=%v", clientID) + + delete(i.notificationClients, clientID) + + // A sub-systems has just modified the invoice state, so we'll + // dispatch notifications to all registered clients. + case event := <-i.invoiceEvents: + for _, client := range i.notificationClients { + select { + case client.ntfnQueue.ChanIn() <- &invoiceEvent{ + isSettle: event.isSettle, + invoice: event.invoice, + }: + case <-i.quit: + return + } + } + + case <-i.quit: + return + } + } +} + +// deliverBacklogEvents will attempts to query the invoice database for any +// notifications that the client has missed since it reconnected last. +func (i *invoiceRegistry) deliverBacklogEvents(client *invoiceSubscription) error { + // First, we'll query the database to see if based on the provided + // addIndex and settledIndex we need to deliver any backlog + // notifications. + addEvents, err := i.cdb.InvoicesAddedSince(client.addIndex) + if err != nil { + return err + } + settleEvents, err := i.cdb.InvoicesSettledSince(client.settleIndex) + if err != nil { + return err + } + + // If we have any to deliver, then we'll append them to the end of the + // notification queue in order to catch up the client before delivering + // any new notifications. + for _, addEvent := range addEvents { + select { + case client.ntfnQueue.ChanIn() <- &invoiceEvent{ + isSettle: false, + invoice: &addEvent, + }: + case <-i.quit: + return fmt.Errorf("registry shutting down") + } + } + for _, settleEvent := range settleEvents { + select { + case client.ntfnQueue.ChanIn() <- &invoiceEvent{ + isSettle: true, + invoice: &settleEvent, + }: + case <-i.quit: + return fmt.Errorf("registry shutting down") + } + } + + return nil +} + // AddDebugInvoice adds a debug invoice for the specified amount, identified // by the passed preimage. Once this invoice is added, subsystems within the // daemon add/forward HTLCs that are able to obtain the proper preimage @@ -82,18 +220,22 @@ func (i *invoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash // AddInvoice adds a regular invoice for the specified amount, identified by // the passed preimage. Additionally, any memo or receipt data provided will // also be stored on-disk. Once this invoice is added, subsystems within the -// daemon add/forward HTLCs are able to obtain the proper preimage required -// for redemption in the case that we're the final destination. +// daemon add/forward HTLCs are able to obtain the proper preimage required for +// redemption in the case that we're the final destination. func (i *invoiceRegistry) AddInvoice(invoice *channeldb.Invoice) error { ltndLog.Debugf("Adding invoice %v", newLogClosure(func() string { return spew.Sdump(invoice) })) - // TODO(roasbeef): also check in memory for quick lookups/settles? - return i.cdb.AddInvoice(invoice) + if err := i.cdb.AddInvoice(invoice); err != nil { + return err + } - // TODO(roasbeef): re-enable? - //go i.notifyClients(invoice, false) + // We'll launch a new goroutine to notify all of our active listeners + // that a new invoice was just added. + i.notifyClients(invoice, false) + + return nil } // LookupInvoice looks up an invoice by its payment hash (R-Hash), if found @@ -107,12 +249,12 @@ func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice // First check the in-memory debug invoice index to see if this is an // existing invoice added for debugging. i.RLock() - invoice, ok := i.debugInvoices[rHash] + debugInv, ok := i.debugInvoices[rHash] i.RUnlock() // If found, then simply return the invoice directly. if ok { - return *invoice, 0, nil + return *debugInv, 0, nil } // Otherwise, we'll check the database to see if there's an existing @@ -135,7 +277,9 @@ func (i *invoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice // SettleInvoice attempts to mark an invoice as settled. If the invoice is a // debug invoice, then this method is a noop as debug invoices are never fully // settled. -func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash) error { +func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash, + amtPaid lnwire.MilliSatoshi) error { + ltndLog.Debugf("Settling invoice %x", rHash[:]) // First check the in-memory debug invoice index to see if this is an @@ -152,7 +296,7 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash) error { // If this isn't a debug invoice, then we'll attempt to settle an // invoice matching this rHash on disk (if one exists). - if err := i.cdb.SettleInvoice(rHash); err != nil { + if err := i.cdb.SettleInvoice(rHash, amtPaid); err != nil { return err } @@ -167,7 +311,7 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash) error { ltndLog.Infof("Payment received: %v", spew.Sdump(invoice)) - i.notifyClients(invoice, true) + i.notifyClients(&invoice, true) }() return nil @@ -176,20 +320,14 @@ func (i *invoiceRegistry) SettleInvoice(rHash chainhash.Hash) error { // notifyClients notifies all currently registered invoice notification clients // of a newly added/settled invoice. func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice, settle bool) { - i.clientMtx.Lock() - defer i.clientMtx.Unlock() + event := &invoiceEvent{ + isSettle: settle, + invoice: invoice, + } - for _, client := range i.notificationClients { - var eventChan chan *channeldb.Invoice - if settle { - eventChan = client.SettledInvoices - } else { - eventChan = client.NewInvoices - } - - go func() { - eventChan <- invoice - }() + select { + case i.invoiceEvents <- event: + case <-i.quit: } } @@ -199,36 +337,127 @@ func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice, settle bool) // invoice, a copy of the invoice will be sent over the SettledInvoices // channel. type invoiceSubscription struct { - NewInvoices chan *channeldb.Invoice + cancelled uint32 // To be used atomically. + + // NewInvoices is a channel that we'll use to send all newly created + // invoices with an invoice index greater than the specified + // StartingInvoiceIndex field. + NewInvoices chan *channeldb.Invoice + + // SettledInvoices is a channel that we'll use to send all setted + // invoices with an invoices index greater than the specified + // StartingInvoiceIndex field. SettledInvoices chan *channeldb.Invoice + // addIndex is the highest add index the caller knows of. We'll use + // this information to send out an event backlog to the notifications + // subscriber. Any new add events with an index greater than this will + // be dispatched before any new notifications are sent out. + addIndex uint64 + + // settleIndex is the highest settle index the caller knows of. We'll + // use this information to send out an event backlog to the + // notifications subscriber. Any new settle events with an index + // greater than this will be dispatched before any new notifications + // are sent out. + settleIndex uint64 + + ntfnQueue *chainntnfs.ConcurrentQueue + + id uint32 + inv *invoiceRegistry - id uint32 + + cancelChan chan struct{} + + wg sync.WaitGroup } // Cancel unregisters the invoiceSubscription, freeing any previously allocated // resources. func (i *invoiceSubscription) Cancel() { - i.inv.clientMtx.Lock() - delete(i.inv.notificationClients, i.id) - i.inv.clientMtx.Unlock() + if !atomic.CompareAndSwapUint32(&i.cancelled, 0, 1) { + return + } + + select { + case i.inv.subscriptionCancels <- i.id: + case <-i.inv.quit: + } + + i.ntfnQueue.Stop() + close(i.cancelChan) + + i.wg.Wait() } // SubscribeNotifications returns an invoiceSubscription which allows the // caller to receive async notifications when any invoices are settled or -// added. -func (i *invoiceRegistry) SubscribeNotifications() *invoiceSubscription { +// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start +// by first sending out all new events with an invoice index _greater_ than +// this value. Afterwards, we'll send out real-time notifications. +func (i *invoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *invoiceSubscription { client := &invoiceSubscription{ NewInvoices: make(chan *channeldb.Invoice), SettledInvoices: make(chan *channeldb.Invoice), + addIndex: addIndex, + settleIndex: settleIndex, inv: i, + ntfnQueue: chainntnfs.NewConcurrentQueue(20), + cancelChan: make(chan struct{}), } + client.ntfnQueue.Start() i.clientMtx.Lock() - i.notificationClients[i.nextClientID] = client client.id = i.nextClientID i.nextClientID++ i.clientMtx.Unlock() + // Before we register this new invoice subscription, we'll launch a new + // goroutine that will proxy all notifications appended to the end of + // the concurrent queue to the two client-side channels the caller will + // feed off of. + i.wg.Add(1) + go func() { + defer i.wg.Done() + + for { + select { + // A new invoice event has been sent by the + // invoiceRegistry! We'll figure out if this is an add + // event or a settle event, then dispatch the event to + // the client. + case ntfn := <-client.ntfnQueue.ChanOut(): + invoiceEvent := ntfn.(*invoiceEvent) + + targetChan := client.NewInvoices + if invoiceEvent.isSettle { + targetChan = client.SettledInvoices + } + + select { + case targetChan <- invoiceEvent.invoice: + + case <-client.cancelChan: + return + + case <-i.quit: + return + } + + case <-client.cancelChan: + return + + case <-i.quit: + return + } + } + }() + + select { + case i.newSubscriptions <- client: + case <-i.quit: + } + return client }