mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-31 17:51:33 +02:00
multi: add more logging when fetching invoices and payments.
This commit is contained in:
@@ -142,6 +142,10 @@ const (
|
||||
ampStateSettleDateType tlv.Type = 3
|
||||
ampStateCircuitKeysType tlv.Type = 4
|
||||
ampStateAmtPaidType tlv.Type = 5
|
||||
|
||||
// invoiceScanBatchSize is the number we use limiting the logging output
|
||||
// of invoice processing.
|
||||
invoiceScanBatchSize = 1000
|
||||
)
|
||||
|
||||
// AddInvoice inserts the targeted invoice into the database. If the invoice has
|
||||
@@ -241,7 +245,11 @@ func (d *DB) AddInvoice(_ context.Context, newInvoice *invpkg.Invoice,
|
||||
func (d *DB) InvoicesAddedSince(_ context.Context, sinceAddIndex uint64) (
|
||||
[]invpkg.Invoice, error) {
|
||||
|
||||
var newInvoices []invpkg.Invoice
|
||||
var (
|
||||
newInvoices []invpkg.Invoice
|
||||
start = time.Now()
|
||||
processedCount int
|
||||
)
|
||||
|
||||
// If an index of zero was specified, then in order to maintain
|
||||
// backwards compat, we won't send out any new invoices.
|
||||
@@ -274,7 +282,6 @@ func (d *DB) InvoicesAddedSince(_ context.Context, sinceAddIndex uint64) (
|
||||
addSeqNo, invoiceKey := invoiceCursor.Next()
|
||||
|
||||
for ; addSeqNo != nil && bytes.Compare(addSeqNo, startIndex[:]) > 0; addSeqNo, invoiceKey = invoiceCursor.Next() {
|
||||
|
||||
// For each key found, we'll look up the actual
|
||||
// invoice, then accumulate it into our return value.
|
||||
invoice, err := fetchInvoice(
|
||||
@@ -285,6 +292,13 @@ func (d *DB) InvoicesAddedSince(_ context.Context, sinceAddIndex uint64) (
|
||||
}
|
||||
|
||||
newInvoices = append(newInvoices, invoice)
|
||||
|
||||
processedCount++
|
||||
if processedCount%invoiceScanBatchSize == 0 {
|
||||
log.Debugf("Processed %d invoices since "+
|
||||
"invoice with add index %v",
|
||||
processedCount, sinceAddIndex)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -295,6 +309,12 @@ func (d *DB) InvoicesAddedSince(_ context.Context, sinceAddIndex uint64) (
|
||||
return nil, err
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
log.Debugf("Completed scanning invoices added since index %v: "+
|
||||
"total_processed=%d, found_invoices=%d, elapsed=%v",
|
||||
sinceAddIndex, processedCount, len(newInvoices),
|
||||
elapsed.Round(time.Millisecond))
|
||||
|
||||
return newInvoices, nil
|
||||
}
|
||||
|
||||
@@ -1045,7 +1065,11 @@ func (k *kvInvoiceUpdater) serializeAndStoreInvoice() error {
|
||||
func (d *DB) InvoicesSettledSince(_ context.Context, sinceSettleIndex uint64) (
|
||||
[]invpkg.Invoice, error) {
|
||||
|
||||
var settledInvoices []invpkg.Invoice
|
||||
var (
|
||||
settledInvoices []invpkg.Invoice
|
||||
start = time.Now()
|
||||
processedCount int
|
||||
)
|
||||
|
||||
// If an index of zero was specified, then in order to maintain
|
||||
// backwards compat, we won't send out any new invoices.
|
||||
@@ -1104,6 +1128,13 @@ func (d *DB) InvoicesSettledSince(_ context.Context, sinceSettleIndex uint64) (
|
||||
}
|
||||
|
||||
settledInvoices = append(settledInvoices, invoice)
|
||||
|
||||
processedCount++
|
||||
if processedCount%invoiceScanBatchSize == 0 {
|
||||
log.Debugf("Processed %d settled invoices "+
|
||||
"since invoice with settle index %v",
|
||||
processedCount, sinceSettleIndex)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -1114,6 +1145,12 @@ func (d *DB) InvoicesSettledSince(_ context.Context, sinceSettleIndex uint64) (
|
||||
return nil, err
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
log.Debugf("Completed scanning invoices settled since index %v: "+
|
||||
"total_processed=%d, found_invoices=%d, elapsed=%v",
|
||||
sinceSettleIndex, processedCount, len(settledInvoices),
|
||||
elapsed.Round(time.Millisecond))
|
||||
|
||||
return settledInvoices, nil
|
||||
}
|
||||
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
@@ -16,6 +17,10 @@ const (
|
||||
// paymentSeqBlockSize is the block size used when we batch allocate
|
||||
// payment sequences for future payments.
|
||||
paymentSeqBlockSize = 1000
|
||||
|
||||
// paymentBatchSize is the number we use limiting the logging output
|
||||
// of payment processing.
|
||||
paymentBatchSize = 1000
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -770,7 +775,12 @@ func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
|
||||
|
||||
// FetchInFlightPayments returns all payments with status InFlight.
|
||||
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
|
||||
var inFlights []*MPPayment
|
||||
var (
|
||||
inFlights []*MPPayment
|
||||
start = time.Now()
|
||||
processedCount int
|
||||
)
|
||||
|
||||
err := kvdb.View(p.db, func(tx kvdb.RTx) error {
|
||||
payments := tx.ReadBucket(paymentsRootBucket)
|
||||
if payments == nil {
|
||||
@@ -788,6 +798,14 @@ func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
|
||||
return err
|
||||
}
|
||||
|
||||
processedCount++
|
||||
if processedCount%paymentBatchSize == 0 {
|
||||
log.Debugf("Scanning inflight payments "+
|
||||
"(in progress), processed %d, last "+
|
||||
"processed payment: %v", processedCount,
|
||||
p.Info)
|
||||
}
|
||||
|
||||
// Skip the payment if it's terminated.
|
||||
if p.Terminated() {
|
||||
return nil
|
||||
@@ -803,5 +821,10 @@ func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
log.Debugf("Completed scanning inflight payments: total_processed=%d, "+
|
||||
"found_inflight=%d, elapsed=%v", processedCount, len(inFlights),
|
||||
elapsed.Round(time.Millisecond))
|
||||
|
||||
return inFlights, nil
|
||||
}
|
||||
|
@@ -202,6 +202,12 @@ type PaymentCreationInfo struct {
|
||||
FirstHopCustomRecords lnwire.CustomRecords
|
||||
}
|
||||
|
||||
func (p *PaymentCreationInfo) String() string {
|
||||
return fmt.Sprintf("payment_id=%v, amount=%v, created_at=%v, "+
|
||||
"payment_request=%v", p.PaymentIdentifier, p.Value,
|
||||
p.CreationTime, p.PaymentRequest)
|
||||
}
|
||||
|
||||
// htlcBucketKey creates a composite key from prefix and id where the result is
|
||||
// simply the two concatenated.
|
||||
func htlcBucketKey(prefix, id []byte) []byte {
|
||||
|
@@ -484,16 +484,25 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
|
||||
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
|
||||
client *InvoiceSubscription) error {
|
||||
|
||||
log.Debugf("Collecting added invoices since %v for client %v",
|
||||
client.addIndex, client.id)
|
||||
|
||||
addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("Collecting settled invoices since %v for client %v",
|
||||
client.settleIndex, client.id)
|
||||
|
||||
settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("Delivering %d added invoices and %d settled invoices "+
|
||||
"for client %v", len(addEvents), len(settleEvents), client.id)
|
||||
|
||||
// If we have any to deliver, then we'll append them to the end of the
|
||||
// notification queue in order to catch up the client before delivering
|
||||
// any new notifications.
|
||||
|
@@ -24,6 +24,10 @@ const (
|
||||
// defaultQueryPaginationLimit is used in the LIMIT clause of the SQL
|
||||
// queries to limit the number of rows returned.
|
||||
defaultQueryPaginationLimit = 100
|
||||
|
||||
// invoiceScanBatchSize is the number we use limiting the logging output
|
||||
// when scanning invoices.
|
||||
invoiceScanBatchSize = 1000
|
||||
)
|
||||
|
||||
// SQLInvoiceQueries is an interface that defines the set of operations that can
|
||||
@@ -785,7 +789,11 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) (
|
||||
func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
|
||||
[]Invoice, error) {
|
||||
|
||||
var invoices []Invoice
|
||||
var (
|
||||
invoices []Invoice
|
||||
start = time.Now()
|
||||
processedCount int
|
||||
)
|
||||
|
||||
if idx == 0 {
|
||||
return invoices, nil
|
||||
@@ -819,6 +827,14 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
|
||||
}
|
||||
|
||||
invoices = append(invoices, *invoice)
|
||||
|
||||
processedCount++
|
||||
if processedCount%invoiceScanBatchSize == 0 {
|
||||
log.Debugf("Processed %d settled "+
|
||||
"invoices since invoice with "+
|
||||
"settle index %v",
|
||||
processedCount, idx)
|
||||
}
|
||||
}
|
||||
|
||||
return len(rows), nil
|
||||
@@ -873,6 +889,13 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
|
||||
}
|
||||
|
||||
invoices = append(invoices, *invoice)
|
||||
|
||||
processedCount++
|
||||
if processedCount%invoiceScanBatchSize == 0 {
|
||||
log.Debugf("Processed %d settled invoices "+
|
||||
"since invoice with settle index %v",
|
||||
processedCount, idx)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -884,6 +907,12 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
|
||||
"index (excluding) %d: %w", idx, err)
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
log.Debugf("Completed scanning invoices settled since index %v: "+
|
||||
"total_processed=%d, found_invoices=%d, elapsed=%v",
|
||||
idx, processedCount, len(invoices),
|
||||
elapsed.Round(time.Millisecond))
|
||||
|
||||
return invoices, nil
|
||||
}
|
||||
|
||||
@@ -896,7 +925,11 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
|
||||
func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
|
||||
[]Invoice, error) {
|
||||
|
||||
var result []Invoice
|
||||
var (
|
||||
result []Invoice
|
||||
start = time.Now()
|
||||
processedCount int
|
||||
)
|
||||
|
||||
if idx == 0 {
|
||||
return result, nil
|
||||
@@ -928,6 +961,13 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
|
||||
}
|
||||
|
||||
result = append(result, *invoice)
|
||||
|
||||
processedCount++
|
||||
if processedCount%invoiceScanBatchSize == 0 {
|
||||
log.Debugf("Processed %d invoices "+
|
||||
"added since invoice with add "+
|
||||
"index %v", processedCount, idx)
|
||||
}
|
||||
}
|
||||
|
||||
return len(rows), nil
|
||||
@@ -941,6 +981,12 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
|
||||
"index %d: %w", idx, err)
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
log.Debugf("Completed scanning invoices added since index %v: "+
|
||||
"total_processed=%d, found_invoices=%d, elapsed=%v",
|
||||
idx, processedCount, len(result),
|
||||
elapsed.Round(time.Millisecond))
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
@@ -365,10 +365,13 @@ func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) {
|
||||
p.subscriberIndex++
|
||||
p.subscribersMtx.Unlock()
|
||||
|
||||
log.Debugf("Scanning for inflight payments")
|
||||
inflightPayments, err := p.db.FetchInFlightPayments()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Debugf("Scanning for inflight payments finished",
|
||||
len(inflightPayments))
|
||||
|
||||
for index := range inflightPayments {
|
||||
// Always write current payment state to the channel.
|
||||
|
@@ -1400,11 +1400,15 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi],
|
||||
// lifecycles.
|
||||
func (r *ChannelRouter) resumePayments() error {
|
||||
// Get all payments that are inflight.
|
||||
log.Debugf("Scanning for inflight payments")
|
||||
payments, err := r.cfg.Control.FetchInFlightPayments()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("Scanning finished, found %d inflight payments",
|
||||
len(payments))
|
||||
|
||||
// Before we restart existing payments and start accepting more
|
||||
// payments to be made, we clean the network result store of the
|
||||
// Switch. We do this here at startup to ensure no more payments can be
|
||||
|
Reference in New Issue
Block a user