mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-30 23:53:41 +02:00
invoices: create package
This commit isolates the invoice registry in a separate package. It is a preparation for the creation of an invoices sub server.
This commit is contained in:
536
invoices/invoiceregistry.go
Normal file
536
invoices/invoiceregistry.go
Normal file
@@ -0,0 +1,536 @@
|
||||
package invoices
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcutil"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/queue"
|
||||
"github.com/lightningnetwork/lnd/zpay32"
|
||||
)
|
||||
|
||||
var (
|
||||
// DebugPre is the default debug preimage which is inserted into the
|
||||
// invoice registry if the --debughtlc flag is activated on start up.
|
||||
// All nodes initialized with the flag active will immediately settle
|
||||
// any incoming HTLC whose rHash corresponds with the debug
|
||||
// preimage.
|
||||
DebugPre, _ = chainhash.NewHash(bytes.Repeat([]byte{1}, 32))
|
||||
|
||||
// DebugHash is the hash of the default preimage.
|
||||
DebugHash = chainhash.Hash(sha256.Sum256(DebugPre[:]))
|
||||
)
|
||||
|
||||
// InvoiceRegistry is a central registry of all the outstanding invoices
|
||||
// created by the daemon. The registry is a thin wrapper around a map in order
|
||||
// to ensure that all updates/reads are thread safe.
|
||||
type InvoiceRegistry struct {
|
||||
sync.RWMutex
|
||||
|
||||
cdb *channeldb.DB
|
||||
|
||||
clientMtx sync.Mutex
|
||||
nextClientID uint32
|
||||
notificationClients map[uint32]*InvoiceSubscription
|
||||
|
||||
newSubscriptions chan *InvoiceSubscription
|
||||
subscriptionCancels chan uint32
|
||||
invoiceEvents chan *invoiceEvent
|
||||
|
||||
// debugInvoices is a map which stores special "debug" invoices which
|
||||
// should be only created/used when manual tests require an invoice
|
||||
// that *all* nodes are able to fully settle.
|
||||
debugInvoices map[chainhash.Hash]*channeldb.Invoice
|
||||
|
||||
activeNetParams *chaincfg.Params
|
||||
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewRegistry creates a new invoice registry. The invoice registry
|
||||
// wraps the persistent on-disk invoice storage with an additional in-memory
|
||||
// layer. The in-memory layer is in place such that debug invoices can be added
|
||||
// which are volatile yet available system wide within the daemon.
|
||||
func NewRegistry(cdb *channeldb.DB,
|
||||
activeNetParams *chaincfg.Params) *InvoiceRegistry {
|
||||
|
||||
return &InvoiceRegistry{
|
||||
cdb: cdb,
|
||||
debugInvoices: make(map[chainhash.Hash]*channeldb.Invoice),
|
||||
notificationClients: make(map[uint32]*InvoiceSubscription),
|
||||
newSubscriptions: make(chan *InvoiceSubscription),
|
||||
subscriptionCancels: make(chan uint32),
|
||||
invoiceEvents: make(chan *invoiceEvent, 100),
|
||||
activeNetParams: activeNetParams,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the registry and all goroutines it needs to carry out its task.
|
||||
func (i *InvoiceRegistry) Start() error {
|
||||
i.wg.Add(1)
|
||||
|
||||
go i.invoiceEventNotifier()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop signals the registry for a graceful shutdown.
|
||||
func (i *InvoiceRegistry) Stop() {
|
||||
close(i.quit)
|
||||
|
||||
i.wg.Wait()
|
||||
}
|
||||
|
||||
// invoiceEvent represents a new event that has modified on invoice on disk.
|
||||
// Only two event types are currently supported: newly created invoices, and
|
||||
// instance where invoices are settled.
|
||||
type invoiceEvent struct {
|
||||
state channeldb.ContractState
|
||||
invoice *channeldb.Invoice
|
||||
}
|
||||
|
||||
// invoiceEventNotifier is the dedicated goroutine responsible for accepting
|
||||
// new notification subscriptions, cancelling old subscriptions, and
|
||||
// dispatching new invoice events.
|
||||
func (i *InvoiceRegistry) invoiceEventNotifier() {
|
||||
defer i.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
// A new invoice subscription has just arrived! We'll query for
|
||||
// any backlog notifications, then add it to the set of
|
||||
// clients.
|
||||
case newClient := <-i.newSubscriptions:
|
||||
// Before we add the client to our set of active
|
||||
// clients, we'll first attempt to deliver any backlog
|
||||
// invoice events.
|
||||
err := i.deliverBacklogEvents(newClient)
|
||||
if err != nil {
|
||||
log.Errorf("unable to deliver backlog invoice "+
|
||||
"notifications: %v", err)
|
||||
}
|
||||
|
||||
log.Infof("New invoice subscription "+
|
||||
"client: id=%v", newClient.id)
|
||||
|
||||
// With the backlog notifications delivered (if any),
|
||||
// we'll add this to our active subscriptions and
|
||||
// continue.
|
||||
i.notificationClients[newClient.id] = newClient
|
||||
|
||||
// A client no longer wishes to receive invoice notifications.
|
||||
// So we'll remove them from the set of active clients.
|
||||
case clientID := <-i.subscriptionCancels:
|
||||
log.Infof("Cancelling invoice subscription for "+
|
||||
"client=%v", clientID)
|
||||
|
||||
delete(i.notificationClients, clientID)
|
||||
|
||||
// A sub-systems has just modified the invoice state, so we'll
|
||||
// dispatch notifications to all registered clients.
|
||||
case event := <-i.invoiceEvents:
|
||||
for clientID, client := range i.notificationClients {
|
||||
// Before we dispatch this event, we'll check
|
||||
// to ensure that this client hasn't already
|
||||
// received this notification in order to
|
||||
// ensure we don't duplicate any events.
|
||||
invoice := event.invoice
|
||||
switch {
|
||||
// If we've already sent this settle event to
|
||||
// the client, then we can skip this.
|
||||
case event.state == channeldb.ContractSettled &&
|
||||
client.settleIndex >= invoice.SettleIndex:
|
||||
continue
|
||||
|
||||
// Similarly, if we've already sent this add to
|
||||
// the client then we can skip this one.
|
||||
case event.state == channeldb.ContractOpen &&
|
||||
client.addIndex >= invoice.AddIndex:
|
||||
continue
|
||||
|
||||
// These two states should never happen, but we
|
||||
// log them just in case so we can detect this
|
||||
// instance.
|
||||
case event.state == channeldb.ContractOpen &&
|
||||
client.addIndex+1 != invoice.AddIndex:
|
||||
log.Warnf("client=%v for invoice "+
|
||||
"notifications missed an update, "+
|
||||
"add_index=%v, new add event index=%v",
|
||||
clientID, client.addIndex,
|
||||
invoice.AddIndex)
|
||||
case event.state == channeldb.ContractSettled &&
|
||||
client.settleIndex+1 != invoice.SettleIndex:
|
||||
log.Warnf("client=%v for invoice "+
|
||||
"notifications missed an update, "+
|
||||
"settle_index=%v, new settle event index=%v",
|
||||
clientID, client.settleIndex,
|
||||
invoice.SettleIndex)
|
||||
}
|
||||
|
||||
select {
|
||||
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
|
||||
state: event.state,
|
||||
invoice: invoice,
|
||||
}:
|
||||
case <-i.quit:
|
||||
return
|
||||
}
|
||||
|
||||
// Each time we send a notification to a
|
||||
// client, we'll record the latest add/settle
|
||||
// index it has. We'll use this to ensure we
|
||||
// don't send a notification twice, which can
|
||||
// happen if a new event is added while we're
|
||||
// catching up a new client.
|
||||
switch event.state {
|
||||
case channeldb.ContractSettled:
|
||||
client.settleIndex = invoice.SettleIndex
|
||||
case channeldb.ContractOpen:
|
||||
client.addIndex = invoice.AddIndex
|
||||
default:
|
||||
log.Errorf("unknown invoice "+
|
||||
"state: %v", event.state)
|
||||
}
|
||||
}
|
||||
|
||||
case <-i.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// deliverBacklogEvents will attempts to query the invoice database for any
|
||||
// notifications that the client has missed since it reconnected last.
|
||||
func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error {
|
||||
// First, we'll query the database to see if based on the provided
|
||||
// addIndex and settledIndex we need to deliver any backlog
|
||||
// notifications.
|
||||
addEvents, err := i.cdb.InvoicesAddedSince(client.addIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
settleEvents, err := i.cdb.InvoicesSettledSince(client.settleIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If we have any to deliver, then we'll append them to the end of the
|
||||
// notification queue in order to catch up the client before delivering
|
||||
// any new notifications.
|
||||
for _, addEvent := range addEvents {
|
||||
// We re-bind the loop variable to ensure we don't hold onto
|
||||
// the loop reference causing is to point to the same item.
|
||||
addEvent := addEvent
|
||||
|
||||
select {
|
||||
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
|
||||
state: channeldb.ContractOpen,
|
||||
invoice: &addEvent,
|
||||
}:
|
||||
case <-i.quit:
|
||||
return fmt.Errorf("registry shutting down")
|
||||
}
|
||||
}
|
||||
for _, settleEvent := range settleEvents {
|
||||
// We re-bind the loop variable to ensure we don't hold onto
|
||||
// the loop reference causing is to point to the same item.
|
||||
settleEvent := settleEvent
|
||||
|
||||
select {
|
||||
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
|
||||
state: channeldb.ContractSettled,
|
||||
invoice: &settleEvent,
|
||||
}:
|
||||
case <-i.quit:
|
||||
return fmt.Errorf("registry shutting down")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddDebugInvoice adds a debug invoice for the specified amount, identified
|
||||
// by the passed preimage. Once this invoice is added, subsystems within the
|
||||
// daemon add/forward HTLCs that are able to obtain the proper preimage
|
||||
// required for redemption in the case that we're the final destination.
|
||||
func (i *InvoiceRegistry) AddDebugInvoice(amt btcutil.Amount, preimage chainhash.Hash) {
|
||||
paymentHash := chainhash.Hash(sha256.Sum256(preimage[:]))
|
||||
|
||||
invoice := &channeldb.Invoice{
|
||||
CreationDate: time.Now(),
|
||||
Terms: channeldb.ContractTerm{
|
||||
Value: lnwire.NewMSatFromSatoshis(amt),
|
||||
PaymentPreimage: preimage,
|
||||
},
|
||||
}
|
||||
|
||||
i.Lock()
|
||||
i.debugInvoices[paymentHash] = invoice
|
||||
i.Unlock()
|
||||
|
||||
log.Debugf("Adding debug invoice %v", newLogClosure(func() string {
|
||||
return spew.Sdump(invoice)
|
||||
}))
|
||||
}
|
||||
|
||||
// AddInvoice adds a regular invoice for the specified amount, identified by
|
||||
// the passed preimage. Additionally, any memo or receipt data provided will
|
||||
// also be stored on-disk. Once this invoice is added, subsystems within the
|
||||
// daemon add/forward HTLCs are able to obtain the proper preimage required for
|
||||
// redemption in the case that we're the final destination. We also return the
|
||||
// addIndex of the newly created invoice which monotonically increases for each
|
||||
// new invoice added.
|
||||
func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice) (uint64, error) {
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
log.Debugf("Adding invoice %v", newLogClosure(func() string {
|
||||
return spew.Sdump(invoice)
|
||||
}))
|
||||
|
||||
addIndex, err := i.cdb.AddInvoice(invoice)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Now that we've added the invoice, we'll send dispatch a message to
|
||||
// notify the clients of this new invoice.
|
||||
i.notifyClients(invoice, channeldb.ContractOpen)
|
||||
|
||||
return addIndex, nil
|
||||
}
|
||||
|
||||
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
|
||||
// then we're able to pull the funds pending within an HTLC. We'll also return
|
||||
// what the expected min final CLTV delta is, pre-parsed from the payment
|
||||
// request. This may be used by callers to determine if an HTLC is well formed
|
||||
// according to the cltv delta.
|
||||
//
|
||||
// TODO(roasbeef): ignore if settled?
|
||||
func (i *InvoiceRegistry) LookupInvoice(rHash chainhash.Hash) (channeldb.Invoice, uint32, error) {
|
||||
// First check the in-memory debug invoice index to see if this is an
|
||||
// existing invoice added for debugging.
|
||||
i.RLock()
|
||||
debugInv, ok := i.debugInvoices[rHash]
|
||||
i.RUnlock()
|
||||
|
||||
// If found, then simply return the invoice directly.
|
||||
if ok {
|
||||
return *debugInv, 0, nil
|
||||
}
|
||||
|
||||
// Otherwise, we'll check the database to see if there's an existing
|
||||
// matching invoice.
|
||||
invoice, err := i.cdb.LookupInvoice(rHash)
|
||||
if err != nil {
|
||||
return channeldb.Invoice{}, 0, err
|
||||
}
|
||||
|
||||
payReq, err := zpay32.Decode(
|
||||
string(invoice.PaymentRequest), i.activeNetParams,
|
||||
)
|
||||
if err != nil {
|
||||
return channeldb.Invoice{}, 0, err
|
||||
}
|
||||
|
||||
return invoice, uint32(payReq.MinFinalCLTVExpiry()), nil
|
||||
}
|
||||
|
||||
// SettleInvoice attempts to mark an invoice as settled. If the invoice is a
|
||||
// debug invoice, then this method is a noop as debug invoices are never fully
|
||||
// settled.
|
||||
func (i *InvoiceRegistry) SettleInvoice(rHash chainhash.Hash,
|
||||
amtPaid lnwire.MilliSatoshi) error {
|
||||
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
log.Debugf("Settling invoice %x", rHash[:])
|
||||
|
||||
// First check the in-memory debug invoice index to see if this is an
|
||||
// existing invoice added for debugging.
|
||||
if _, ok := i.debugInvoices[rHash]; ok {
|
||||
// Debug invoices are never fully settled, so we simply return
|
||||
// immediately in this case.
|
||||
return nil
|
||||
}
|
||||
|
||||
// If this isn't a debug invoice, then we'll attempt to settle an
|
||||
// invoice matching this rHash on disk (if one exists).
|
||||
invoice, err := i.cdb.SettleInvoice(rHash, amtPaid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Payment received: %v", spew.Sdump(invoice))
|
||||
|
||||
i.notifyClients(invoice, channeldb.ContractSettled)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// notifyClients notifies all currently registered invoice notification clients
|
||||
// of a newly added/settled invoice.
|
||||
func (i *InvoiceRegistry) notifyClients(invoice *channeldb.Invoice,
|
||||
state channeldb.ContractState) {
|
||||
|
||||
event := &invoiceEvent{
|
||||
state: state,
|
||||
invoice: invoice,
|
||||
}
|
||||
|
||||
select {
|
||||
case i.invoiceEvents <- event:
|
||||
case <-i.quit:
|
||||
}
|
||||
}
|
||||
|
||||
// InvoiceSubscription represents an intent to receive updates for newly added
|
||||
// or settled invoices. For each newly added invoice, a copy of the invoice
|
||||
// will be sent over the NewInvoices channel. Similarly, for each newly settled
|
||||
// invoice, a copy of the invoice will be sent over the SettledInvoices
|
||||
// channel.
|
||||
type InvoiceSubscription struct {
|
||||
cancelled uint32 // To be used atomically.
|
||||
|
||||
// NewInvoices is a channel that we'll use to send all newly created
|
||||
// invoices with an invoice index greater than the specified
|
||||
// StartingInvoiceIndex field.
|
||||
NewInvoices chan *channeldb.Invoice
|
||||
|
||||
// SettledInvoices is a channel that we'll use to send all setted
|
||||
// invoices with an invoices index greater than the specified
|
||||
// StartingInvoiceIndex field.
|
||||
SettledInvoices chan *channeldb.Invoice
|
||||
|
||||
// addIndex is the highest add index the caller knows of. We'll use
|
||||
// this information to send out an event backlog to the notifications
|
||||
// subscriber. Any new add events with an index greater than this will
|
||||
// be dispatched before any new notifications are sent out.
|
||||
addIndex uint64
|
||||
|
||||
// settleIndex is the highest settle index the caller knows of. We'll
|
||||
// use this information to send out an event backlog to the
|
||||
// notifications subscriber. Any new settle events with an index
|
||||
// greater than this will be dispatched before any new notifications
|
||||
// are sent out.
|
||||
settleIndex uint64
|
||||
|
||||
ntfnQueue *queue.ConcurrentQueue
|
||||
|
||||
id uint32
|
||||
|
||||
inv *InvoiceRegistry
|
||||
|
||||
cancelChan chan struct{}
|
||||
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
|
||||
// resources.
|
||||
func (i *InvoiceSubscription) Cancel() {
|
||||
if !atomic.CompareAndSwapUint32(&i.cancelled, 0, 1) {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case i.inv.subscriptionCancels <- i.id:
|
||||
case <-i.inv.quit:
|
||||
}
|
||||
|
||||
i.ntfnQueue.Stop()
|
||||
close(i.cancelChan)
|
||||
|
||||
i.wg.Wait()
|
||||
}
|
||||
|
||||
// SubscribeNotifications returns an InvoiceSubscription which allows the
|
||||
// caller to receive async notifications when any invoices are settled or
|
||||
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
|
||||
// by first sending out all new events with an invoice index _greater_ than
|
||||
// this value. Afterwards, we'll send out real-time notifications.
|
||||
func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *InvoiceSubscription {
|
||||
client := &InvoiceSubscription{
|
||||
NewInvoices: make(chan *channeldb.Invoice),
|
||||
SettledInvoices: make(chan *channeldb.Invoice),
|
||||
addIndex: addIndex,
|
||||
settleIndex: settleIndex,
|
||||
inv: i,
|
||||
ntfnQueue: queue.NewConcurrentQueue(20),
|
||||
cancelChan: make(chan struct{}),
|
||||
}
|
||||
client.ntfnQueue.Start()
|
||||
|
||||
i.clientMtx.Lock()
|
||||
client.id = i.nextClientID
|
||||
i.nextClientID++
|
||||
i.clientMtx.Unlock()
|
||||
|
||||
// Before we register this new invoice subscription, we'll launch a new
|
||||
// goroutine that will proxy all notifications appended to the end of
|
||||
// the concurrent queue to the two client-side channels the caller will
|
||||
// feed off of.
|
||||
i.wg.Add(1)
|
||||
go func() {
|
||||
defer i.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
// A new invoice event has been sent by the
|
||||
// invoiceRegistry! We'll figure out if this is an add
|
||||
// event or a settle event, then dispatch the event to
|
||||
// the client.
|
||||
case ntfn := <-client.ntfnQueue.ChanOut():
|
||||
invoiceEvent := ntfn.(*invoiceEvent)
|
||||
|
||||
var targetChan chan *channeldb.Invoice
|
||||
switch invoiceEvent.state {
|
||||
case channeldb.ContractOpen:
|
||||
targetChan = client.NewInvoices
|
||||
case channeldb.ContractSettled:
|
||||
targetChan = client.SettledInvoices
|
||||
default:
|
||||
log.Errorf("unknown invoice "+
|
||||
"state: %v", invoiceEvent.state)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case targetChan <- invoiceEvent.invoice:
|
||||
|
||||
case <-client.cancelChan:
|
||||
return
|
||||
|
||||
case <-i.quit:
|
||||
return
|
||||
}
|
||||
|
||||
case <-client.cancelChan:
|
||||
return
|
||||
|
||||
case <-i.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case i.newSubscriptions <- client:
|
||||
case <-i.quit:
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
45
invoices/log.go
Normal file
45
invoices/log.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package invoices
|
||||
|
||||
import (
|
||||
"github.com/btcsuite/btclog"
|
||||
"github.com/lightningnetwork/lnd/build"
|
||||
)
|
||||
|
||||
// log is a logger that is initialized with no output filters. This
|
||||
// means the package will not perform any logging by default until the caller
|
||||
// requests it.
|
||||
var log btclog.Logger
|
||||
|
||||
// The default amount of logging is none.
|
||||
func init() {
|
||||
UseLogger(build.NewSubLogger("INVC", nil))
|
||||
}
|
||||
|
||||
// DisableLog disables all library log output. Logging output is disabled
|
||||
// by default until UseLogger is called.
|
||||
func DisableLog() {
|
||||
UseLogger(btclog.Disabled)
|
||||
}
|
||||
|
||||
// UseLogger uses a specified Logger to output package logging info.
|
||||
// This should be used in preference to SetLogWriter if the caller is also
|
||||
// using btclog.
|
||||
func UseLogger(logger btclog.Logger) {
|
||||
log = logger
|
||||
}
|
||||
|
||||
// logClosure is used to provide a closure over expensive logging operations so
|
||||
// don't have to be performed when the logging level doesn't warrant it.
|
||||
type logClosure func() string
|
||||
|
||||
// String invokes the underlying function and returns the result.
|
||||
func (c logClosure) String() string {
|
||||
return c()
|
||||
}
|
||||
|
||||
// newLogClosure returns a new closure over a function that returns a string
|
||||
// which itself provides a Stringer interface so that it can be used with the
|
||||
// logging system.
|
||||
func newLogClosure(c func() string) logClosure {
|
||||
return logClosure(c)
|
||||
}
|
Reference in New Issue
Block a user