From 440ed31419db9c1218ef0d4a40920dcd1640e4e1 Mon Sep 17 00:00:00 2001 From: ziggie Date: Fri, 18 Apr 2025 15:27:00 +0200 Subject: [PATCH] multi: add more logging when fetching invoices and payments. --- channeldb/invoices.go | 43 ++++++++++++++++++++++++++++--- channeldb/payment_control.go | 25 +++++++++++++++++- channeldb/payments.go | 6 +++++ invoices/invoiceregistry.go | 9 +++++++ invoices/sql_store.go | 50 ++++++++++++++++++++++++++++++++++-- routing/control_tower.go | 3 +++ routing/router.go | 4 +++ 7 files changed, 134 insertions(+), 6 deletions(-) diff --git a/channeldb/invoices.go b/channeldb/invoices.go index 13439bf3d..1b13c3336 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -142,6 +142,10 @@ const ( ampStateSettleDateType tlv.Type = 3 ampStateCircuitKeysType tlv.Type = 4 ampStateAmtPaidType tlv.Type = 5 + + // invoiceScanBatchSize is the number we use limiting the logging output + // of invoice processing. + invoiceScanBatchSize = 1000 ) // AddInvoice inserts the targeted invoice into the database. If the invoice has @@ -241,7 +245,11 @@ func (d *DB) AddInvoice(_ context.Context, newInvoice *invpkg.Invoice, func (d *DB) InvoicesAddedSince(_ context.Context, sinceAddIndex uint64) ( []invpkg.Invoice, error) { - var newInvoices []invpkg.Invoice + var ( + newInvoices []invpkg.Invoice + start = time.Now() + processedCount int + ) // If an index of zero was specified, then in order to maintain // backwards compat, we won't send out any new invoices. @@ -274,7 +282,6 @@ func (d *DB) InvoicesAddedSince(_ context.Context, sinceAddIndex uint64) ( addSeqNo, invoiceKey := invoiceCursor.Next() for ; addSeqNo != nil && bytes.Compare(addSeqNo, startIndex[:]) > 0; addSeqNo, invoiceKey = invoiceCursor.Next() { - // For each key found, we'll look up the actual // invoice, then accumulate it into our return value. invoice, err := fetchInvoice( @@ -285,6 +292,13 @@ func (d *DB) InvoicesAddedSince(_ context.Context, sinceAddIndex uint64) ( } newInvoices = append(newInvoices, invoice) + + processedCount++ + if processedCount%invoiceScanBatchSize == 0 { + log.Debugf("Processed %d invoices since "+ + "invoice with add index %v", + processedCount, sinceAddIndex) + } } return nil @@ -295,6 +309,12 @@ func (d *DB) InvoicesAddedSince(_ context.Context, sinceAddIndex uint64) ( return nil, err } + elapsed := time.Since(start) + log.Debugf("Completed scanning invoices added since index %v: "+ + "total_processed=%d, found_invoices=%d, elapsed=%v", + sinceAddIndex, processedCount, len(newInvoices), + elapsed.Round(time.Millisecond)) + return newInvoices, nil } @@ -1045,7 +1065,11 @@ func (k *kvInvoiceUpdater) serializeAndStoreInvoice() error { func (d *DB) InvoicesSettledSince(_ context.Context, sinceSettleIndex uint64) ( []invpkg.Invoice, error) { - var settledInvoices []invpkg.Invoice + var ( + settledInvoices []invpkg.Invoice + start = time.Now() + processedCount int + ) // If an index of zero was specified, then in order to maintain // backwards compat, we won't send out any new invoices. @@ -1104,6 +1128,13 @@ func (d *DB) InvoicesSettledSince(_ context.Context, sinceSettleIndex uint64) ( } settledInvoices = append(settledInvoices, invoice) + + processedCount++ + if processedCount%invoiceScanBatchSize == 0 { + log.Debugf("Processed %d settled invoices "+ + "since invoice with settle index %v", + processedCount, sinceSettleIndex) + } } return nil @@ -1114,6 +1145,12 @@ func (d *DB) InvoicesSettledSince(_ context.Context, sinceSettleIndex uint64) ( return nil, err } + elapsed := time.Since(start) + log.Debugf("Completed scanning invoices settled since index %v: "+ + "total_processed=%d, found_invoices=%d, elapsed=%v", + sinceSettleIndex, processedCount, len(settledInvoices), + elapsed.Round(time.Millisecond)) + return settledInvoices, nil } diff --git a/channeldb/payment_control.go b/channeldb/payment_control.go index 2369d2f7e..c369782d1 100644 --- a/channeldb/payment_control.go +++ b/channeldb/payment_control.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lntypes" @@ -16,6 +17,10 @@ const ( // paymentSeqBlockSize is the block size used when we batch allocate // payment sequences for future payments. paymentSeqBlockSize = 1000 + + // paymentBatchSize is the number we use limiting the logging output + // of payment processing. + paymentBatchSize = 1000 ) var ( @@ -770,7 +775,12 @@ func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) { // FetchInFlightPayments returns all payments with status InFlight. func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) { - var inFlights []*MPPayment + var ( + inFlights []*MPPayment + start = time.Now() + processedCount int + ) + err := kvdb.View(p.db, func(tx kvdb.RTx) error { payments := tx.ReadBucket(paymentsRootBucket) if payments == nil { @@ -788,6 +798,14 @@ func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) { return err } + processedCount++ + if processedCount%paymentBatchSize == 0 { + log.Debugf("Scanning inflight payments "+ + "(in progress), processed %d, last "+ + "processed payment: %v", processedCount, + p.Info) + } + // Skip the payment if it's terminated. if p.Terminated() { return nil @@ -803,5 +821,10 @@ func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) { return nil, err } + elapsed := time.Since(start) + log.Debugf("Completed scanning inflight payments: total_processed=%d, "+ + "found_inflight=%d, elapsed=%v", processedCount, len(inFlights), + elapsed.Round(time.Millisecond)) + return inFlights, nil } diff --git a/channeldb/payments.go b/channeldb/payments.go index 9d70cfd95..947ec1ea3 100644 --- a/channeldb/payments.go +++ b/channeldb/payments.go @@ -202,6 +202,12 @@ type PaymentCreationInfo struct { FirstHopCustomRecords lnwire.CustomRecords } +func (p *PaymentCreationInfo) String() string { + return fmt.Sprintf("payment_id=%v, amount=%v, created_at=%v, "+ + "payment_request=%v", p.PaymentIdentifier, p.Value, + p.CreationTime, p.PaymentRequest) +} + // htlcBucketKey creates a composite key from prefix and id where the result is // simply the two concatenated. func htlcBucketKey(prefix, id []byte) []byte { diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index 44b7c7f29..dc9662635 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -484,16 +484,25 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context, client *InvoiceSubscription) error { + log.Debugf("Collecting added invoices since %v for client %v", + client.addIndex, client.id) + addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex) if err != nil { return err } + log.Debugf("Collecting settled invoices since %v for client %v", + client.settleIndex, client.id) + settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex) if err != nil { return err } + log.Debugf("Delivering %d added invoices and %d settled invoices "+ + "for client %v", len(addEvents), len(settleEvents), client.id) + // If we have any to deliver, then we'll append them to the end of the // notification queue in order to catch up the client before delivering // any new notifications. diff --git a/invoices/sql_store.go b/invoices/sql_store.go index 01a140f8d..94149c36a 100644 --- a/invoices/sql_store.go +++ b/invoices/sql_store.go @@ -24,6 +24,10 @@ const ( // defaultQueryPaginationLimit is used in the LIMIT clause of the SQL // queries to limit the number of rows returned. defaultQueryPaginationLimit = 100 + + // invoiceScanBatchSize is the number we use limiting the logging output + // when scanning invoices. + invoiceScanBatchSize = 1000 ) // SQLInvoiceQueries is an interface that defines the set of operations that can @@ -785,7 +789,11 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) ( func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) ( []Invoice, error) { - var invoices []Invoice + var ( + invoices []Invoice + start = time.Now() + processedCount int + ) if idx == 0 { return invoices, nil @@ -819,6 +827,14 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) ( } invoices = append(invoices, *invoice) + + processedCount++ + if processedCount%invoiceScanBatchSize == 0 { + log.Debugf("Processed %d settled "+ + "invoices since invoice with "+ + "settle index %v", + processedCount, idx) + } } return len(rows), nil @@ -873,6 +889,13 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) ( } invoices = append(invoices, *invoice) + + processedCount++ + if processedCount%invoiceScanBatchSize == 0 { + log.Debugf("Processed %d settled invoices "+ + "since invoice with settle index %v", + processedCount, idx) + } } return nil @@ -884,6 +907,12 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) ( "index (excluding) %d: %w", idx, err) } + elapsed := time.Since(start) + log.Debugf("Completed scanning invoices settled since index %v: "+ + "total_processed=%d, found_invoices=%d, elapsed=%v", + idx, processedCount, len(invoices), + elapsed.Round(time.Millisecond)) + return invoices, nil } @@ -896,7 +925,11 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) ( func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) ( []Invoice, error) { - var result []Invoice + var ( + result []Invoice + start = time.Now() + processedCount int + ) if idx == 0 { return result, nil @@ -928,6 +961,13 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) ( } result = append(result, *invoice) + + processedCount++ + if processedCount%invoiceScanBatchSize == 0 { + log.Debugf("Processed %d invoices "+ + "added since invoice with add "+ + "index %v", processedCount, idx) + } } return len(rows), nil @@ -941,6 +981,12 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) ( "index %d: %w", idx, err) } + elapsed := time.Since(start) + log.Debugf("Completed scanning invoices added since index %v: "+ + "total_processed=%d, found_invoices=%d, elapsed=%v", + idx, processedCount, len(result), + elapsed.Round(time.Millisecond)) + return result, nil } diff --git a/routing/control_tower.go b/routing/control_tower.go index 174ff6a7f..61ab33bd2 100644 --- a/routing/control_tower.go +++ b/routing/control_tower.go @@ -365,10 +365,13 @@ func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) { p.subscriberIndex++ p.subscribersMtx.Unlock() + log.Debugf("Scanning for inflight payments") inflightPayments, err := p.db.FetchInFlightPayments() if err != nil { return nil, err } + log.Debugf("Scanning for inflight payments finished", + len(inflightPayments)) for index := range inflightPayments { // Always write current payment state to the channel. diff --git a/routing/router.go b/routing/router.go index e3c96cf03..440373200 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1400,11 +1400,15 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi], // lifecycles. func (r *ChannelRouter) resumePayments() error { // Get all payments that are inflight. + log.Debugf("Scanning for inflight payments") payments, err := r.cfg.Control.FetchInFlightPayments() if err != nil { return err } + log.Debugf("Scanning finished, found %d inflight payments", + len(payments)) + // Before we restart existing payments and start accepting more // payments to be made, we clean the network result store of the // Switch. We do this here at startup to ensure no more payments can be