channeldb: optimize updateInvoice for AMP by only reading relevant HTLCs

In this commit, we update the logic in `updateInvoice` to allow callers
to pass in either a hint, or the setID in the update callback. This
makes things more efficient for AMP invoices with thousands of recurring
payments, as we no longer need to read out _all_ the invoices each time
we go to update the state of a few HTLCs.
This commit is contained in:
Olaoluwa Osuntokun 2021-10-14 18:29:41 +02:00
parent f969d81e1a
commit a4f8842831
No known key found for this signature in database
GPG Key ID: 3BBD59E99B280306
3 changed files with 117 additions and 34 deletions

View File

@ -215,7 +215,7 @@ func testInvoiceWorkflow(t *testing.T, test invWorkflowTest) {
// now have the settled bit toggle to true and a non-default
// SettledDate
payAmt := fakeInvoice.Terms.Value * 2
_, err = db.UpdateInvoice(ref, getUpdateInvoice(payAmt))
_, err = db.UpdateInvoice(ref, nil, getUpdateInvoice(payAmt))
if err != nil {
t.Fatalf("unable to settle invoice: %v", err)
}
@ -432,7 +432,7 @@ func TestInvRefEquivocation(t *testing.T) {
nop := func(_ *Invoice) (*InvoiceUpdateDesc, error) {
return nil, nil
}
_, err = db.UpdateInvoice(ref, nop)
_, err = db.UpdateInvoice(ref, nil, nop)
require.Error(t, err, ErrInvRefEquivocation)
}
@ -471,7 +471,7 @@ func TestInvoiceCancelSingleHtlc(t *testing.T) {
}
ref := InvoiceRefByHash(paymentHash)
invoice, err := db.UpdateInvoice(ref,
invoice, err := db.UpdateInvoice(ref, nil,
func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
return &InvoiceUpdateDesc{
AddHtlcs: map[CircuitKey]*HtlcAcceptDesc{
@ -490,7 +490,7 @@ func TestInvoiceCancelSingleHtlc(t *testing.T) {
}
// Cancel the htlc again.
invoice, err = db.UpdateInvoice(ref,
invoice, err = db.UpdateInvoice(ref, nil,
func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
return &InvoiceUpdateDesc{
CancelHtlcs: map[CircuitKey]struct{}{
@ -608,7 +608,7 @@ func TestInvoiceAddTimeSeries(t *testing.T) {
ref := InvoiceRefByHash(paymentHash)
_, err := db.UpdateInvoice(
ref, getUpdateInvoice(invoice.Terms.Value),
ref, nil, getUpdateInvoice(invoice.Terms.Value),
)
if err != nil {
t.Fatalf("unable to settle invoice: %v", err)
@ -753,7 +753,7 @@ func TestDuplicateSettleInvoice(t *testing.T) {
// With the invoice in the DB, we'll now attempt to settle the invoice.
ref := InvoiceRefByHash(payHash)
dbInvoice, err := db.UpdateInvoice(ref, getUpdateInvoice(amt))
dbInvoice, err := db.UpdateInvoice(ref, nil, getUpdateInvoice(amt))
if err != nil {
t.Fatalf("unable to settle invoice: %v", err)
}
@ -779,7 +779,7 @@ func TestDuplicateSettleInvoice(t *testing.T) {
// If we try to settle the invoice again, then we should get the very
// same invoice back, but with an error this time.
dbInvoice, err = db.UpdateInvoice(ref, getUpdateInvoice(amt))
dbInvoice, err = db.UpdateInvoice(ref, nil, getUpdateInvoice(amt))
if err != ErrInvoiceAlreadySettled {
t.Fatalf("expected ErrInvoiceAlreadySettled")
}
@ -827,7 +827,7 @@ func TestQueryInvoices(t *testing.T) {
// We'll only settle half of all invoices created.
if i%2 == 0 {
ref := InvoiceRefByHash(paymentHash)
_, err := db.UpdateInvoice(ref, getUpdateInvoice(amt))
_, err := db.UpdateInvoice(ref, nil, getUpdateInvoice(amt))
if err != nil {
t.Fatalf("unable to settle invoice: %v", err)
}
@ -1143,7 +1143,7 @@ func TestCustomRecords(t *testing.T) {
}
ref := InvoiceRefByHash(paymentHash)
_, err = db.UpdateInvoice(ref,
_, err = db.UpdateInvoice(ref, nil,
func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
return &InvoiceUpdateDesc{
AddHtlcs: map[CircuitKey]*HtlcAcceptDesc{
@ -1216,7 +1216,7 @@ func testInvoiceHtlcAMPFields(t *testing.T, isAMP bool) {
}
ref := InvoiceRefByHash(payHash)
_, err = db.UpdateInvoice(ref,
_, err = db.UpdateInvoice(ref, nil,
func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
return &InvoiceUpdateDesc{
AddHtlcs: map[CircuitKey]*HtlcAcceptDesc{
@ -1411,7 +1411,7 @@ func TestSetIDIndex(t *testing.T) {
// invoice.
ref := InvoiceRefByHashAndAddr(payHash, invoice.Terms.PaymentAddr)
dbInvoice, err := db.UpdateInvoice(
ref, updateAcceptAMPHtlc(0, amt, setID, true),
ref, (*SetID)(setID), updateAcceptAMPHtlc(0, amt, setID, true),
)
require.Nil(t, err)
@ -1456,7 +1456,7 @@ func TestSetIDIndex(t *testing.T) {
ref2 := InvoiceRefByHashAndAddr(payHash2, invoice2.Terms.PaymentAddr)
_, err = db.UpdateInvoice(
ref2, updateAcceptAMPHtlc(0, amt, setID, true),
ref2, (*SetID)(setID), updateAcceptAMPHtlc(0, amt, setID, true),
)
require.Equal(t, ErrDuplicateSetID{setID: *setID}, err)
@ -1465,11 +1465,11 @@ func TestSetIDIndex(t *testing.T) {
setID2 := &[32]byte{2}
_, err = db.UpdateInvoice(
ref, updateAcceptAMPHtlc(1, amt, setID2, false),
ref, (*SetID)(setID2), updateAcceptAMPHtlc(1, amt, setID2, false),
)
require.Nil(t, err)
dbInvoice, err = db.UpdateInvoice(
ref, updateAcceptAMPHtlc(2, amt, setID2, false),
ref, (*SetID)(setID2), updateAcceptAMPHtlc(2, amt, setID2, false),
)
require.Nil(t, err)
@ -1518,7 +1518,7 @@ func TestSetIDIndex(t *testing.T) {
// Now attempt to settle a non-existent HTLC set, this set ID is the
// zero setID so it isn't used for anything internally.
_, err = db.UpdateInvoice(
ref,
ref, nil,
getUpdateInvoiceAMPSettle(&[32]byte{}, [32]byte{}, CircuitKey{HtlcID: 99}),
)
require.Equal(t, ErrEmptyHTLCSet, err)
@ -1527,7 +1527,7 @@ func TestSetIDIndex(t *testing.T) {
// the accepted state and shouldn't be canceled, since we permit an
// invoice to be settled multiple times.
_, err = db.UpdateInvoice(
ref,
ref, (*SetID)(setID),
getUpdateInvoiceAMPSettle(setID, preimage, CircuitKey{HtlcID: 0}),
)
require.Nil(t, err)
@ -1557,7 +1557,7 @@ func TestSetIDIndex(t *testing.T) {
// If we try to settle the same set ID again, then we should get an
// error, as it's already been settled.
_, err = db.UpdateInvoice(
ref,
ref, (*SetID)(setID),
getUpdateInvoiceAMPSettle(setID, preimage, CircuitKey{HtlcID: 0}),
)
require.Equal(t, ErrEmptyHTLCSet, err)
@ -1567,7 +1567,7 @@ func TestSetIDIndex(t *testing.T) {
// settle an invoice with a new setID after one has already been fully
// settled.
_, err = db.UpdateInvoice(
ref,
ref, (*SetID)(setID2),
getUpdateInvoiceAMPSettle(
setID2, preimage, CircuitKey{HtlcID: 1}, CircuitKey{HtlcID: 2},
),
@ -1714,7 +1714,7 @@ func TestUnexpectedInvoicePreimage(t *testing.T) {
// in order to settle an MPP invoice, the InvoiceRef must present a
// payment hash against which to validate the preimage.
_, err = db.UpdateInvoice(
InvoiceRefByAddr(invoice.Terms.PaymentAddr),
InvoiceRefByAddr(invoice.Terms.PaymentAddr), nil,
getUpdateInvoice(invoice.Terms.Value),
)
@ -1780,7 +1780,7 @@ func testUpdateHTLCPreimages(t *testing.T, test updateHTLCPreimageTestCase) {
// invoice.
ref := InvoiceRefByAddr(invoice.Terms.PaymentAddr)
dbInvoice, err := db.UpdateInvoice(
ref, updateAcceptAMPHtlc(0, amt, setID, true),
ref, (*SetID)(setID), updateAcceptAMPHtlc(0, amt, setID, true),
)
require.Nil(t, err)
@ -1809,7 +1809,7 @@ func testUpdateHTLCPreimages(t *testing.T, test updateHTLCPreimageTestCase) {
}
// Now settle the HTLC set and assert the resulting error.
_, err = db.UpdateInvoice(ref, updateInvoice)
_, err = db.UpdateInvoice(ref, (*SetID)(setID), updateInvoice)
require.Equal(t, test.expError, err)
}
@ -2477,7 +2477,7 @@ func TestUpdateHTLC(t *testing.T) {
func testUpdateHTLC(t *testing.T, test updateHTLCTest) {
htlc := test.input.Copy()
err, _ := updateHtlc(testNow, htlc, test.invState, test.setID)
_, err := updateHtlc(testNow, htlc, test.invState, test.setID)
require.Equal(t, test.expErr, err)
require.Equal(t, test.output, *htlc)
}
@ -2506,7 +2506,7 @@ func TestDeleteInvoices(t *testing.T) {
// Settle the second invoice.
if i == 1 {
invoice, err = db.UpdateInvoice(
InvoiceRefByHash(paymentHash),
InvoiceRefByHash(paymentHash), nil,
getUpdateInvoice(invoice.Terms.Value),
)
require.NoError(t, err, "unable to settle invoice")
@ -2564,7 +2564,6 @@ func TestDeleteInvoices(t *testing.T) {
// Delete should succeed with all the valid references.
require.NoError(t, db.DeleteInvoice(invoicesToDelete))
assertInvoiceCount(0)
}
// TestAddInvoiceInvalidFeatureDeps asserts that inserting an invoice with

View File

@ -227,6 +227,26 @@ const (
ampStateAmtPaidType tlv.Type = 5
)
// RefModifier is a modification on top of a base invoice ref. It allows the
// caller to opt to skip out on HTLCs for a given payAddr, or only return the
// set of specified HTLCs for a given setID.
type RefModifier uint8
const (
// DefaultModifier is the base modifier that doesn't change any behavior.
DefaultModifier RefModifier = iota
// HtlcSetOnlyModifier can only be used with a setID based invoice ref, and
// specifies that only the set of HTLCs related to that setID are to be
// returned.
HtlcSetOnlyModifier
// HtlcSetOnlyModifier can only be used with a payAddr based invoice ref,
// and specifies that the returned invoice shouldn't include any HTLCs at
// all.
HtlcSetBlankModifier
)
// InvoiceRef is a composite identifier for invoices. Invoices can be referenced
// by various combinations of payment hash and payment addr, in certain contexts
// only some of these are known. An InvoiceRef and its constructors thus
@ -252,6 +272,10 @@ type InvoiceRef struct {
// invoice registry will always query for the invoice by
// payHash+payAddr.
setID *[32]byte
// refModifier allows an invoice ref to include or exclude specific
// HTLC sets based on the payAddr or setId.
refModifier RefModifier
}
// InvoiceRefByHash creates an InvoiceRef that queries for an invoice only by
@ -282,6 +306,16 @@ func InvoiceRefByAddr(addr [32]byte) InvoiceRef {
}
}
// InvoiceRefByAddrBlankHtlc creates an InvoiceRef that queries the payment addr index
// for an invoice with the provided payment address, but excludes any of the
// core HTLC information.
func InvoiceRefByAddrBlankHtlc(addr [32]byte) InvoiceRef {
return InvoiceRef{
payAddr: &addr,
refModifier: HtlcSetBlankModifier,
}
}
// InvoiceRefBySetID creates an InvoiceRef that queries the set id index for an
// invoice with the provided setID. If the invoice is not found, the query will
// not fallback to payHash or payAddr.
@ -291,6 +325,16 @@ func InvoiceRefBySetID(setID [32]byte) InvoiceRef {
}
}
// InvoiceRefBySetIDFiltered is similar to the InvoiceRefBySetID identifier,
// but it specifies that the returned set of HTLCs should be filtered to only
// include HTLCs that are part of that set.
func InvoiceRefBySetIDFiltered(setID [32]byte) InvoiceRef {
return InvoiceRef{
setID: &setID,
refModifier: HtlcSetOnlyModifier,
}
}
// PayHash returns the optional payment hash of the target invoice.
//
// NOTE: This value may be nil.
@ -324,6 +368,12 @@ func (r InvoiceRef) SetID() *[32]byte {
return nil
}
// Modifier defines the set of available modifications to the base invoice ref
// look up that are available.
func (r InvoiceRef) Modifier() RefModifier {
return r.refModifier
}
// String returns a human-readable representation of an InvoiceRef.
func (r InvoiceRef) String() string {
var ids []string
@ -775,6 +825,11 @@ type InvoiceUpdateDesc struct {
// AddHtlcs describes the newly accepted htlcs that need to be added to
// the invoice.
AddHtlcs map[CircuitKey]*HtlcAcceptDesc
// SetID is an optional set ID for AMP invoices that allows operations
// to be more efficient by ensuring we don't need to read out the
// entire HTLC set each timee an HTLC is to be cancelled.
SetID *SetID
}
// InvoiceStateUpdateDesc describes an invoice-level state transition.
@ -1023,9 +1078,25 @@ func (d *DB) LookupInvoice(ref InvoiceRef) (Invoice, error) {
return err
}
var setID *SetID
switch {
// If this is a payment address ref, and the blank modified was
// specified, then we'll use the zero set ID to indicate that
// we won't want any HTLCs returned.
case ref.PayAddr() != nil && ref.Modifier() == HtlcSetBlankModifier:
var zeroSetID SetID
setID = &zeroSetID
// If this is a set ID ref, and the htlc set only modified was
// specified, then we'll pass through the specified setID so
// only that will be returned.
case ref.SetID() != nil && ref.Modifier() == HtlcSetOnlyModifier:
setID = (*SetID)(ref.SetID())
}
// An invoice was found, retrieve the remainder of the invoice
// body.
i, err := fetchInvoice(invoiceNum, invoices)
i, err := fetchInvoice(invoiceNum, invoices, setID)
if err != nil {
return err
}
@ -1147,6 +1218,7 @@ func (d *DB) ScanInvoices(
return nil
}
// Skip sub-buckets.
if v == nil {
return nil
}
@ -1304,7 +1376,7 @@ func (d *DB) QueryInvoices(q InvoiceQuery) (InvoiceSlice, error) {
// The update is performed inside the same database transaction that fetches the
// invoice and is therefore atomic. The fields to update are controlled by the
// supplied callback.
func (d *DB) UpdateInvoice(ref InvoiceRef,
func (d *DB) UpdateInvoice(ref InvoiceRef, setIDHint *SetID,
callback InvoiceUpdateCallback) (*Invoice, error) {
var updatedInvoice *Invoice
@ -1339,7 +1411,7 @@ func (d *DB) UpdateInvoice(ref InvoiceRef,
payHash := ref.PayHash()
updatedInvoice, err = d.updateInvoice(
payHash, invoices, settleIndex, setIDIndex,
payHash, setIDHint, invoices, settleIndex, setIDIndex,
invoiceNum, callback,
)
@ -1819,7 +1891,6 @@ func fetchInvoice(invoiceNum []byte, invoices kvdb.RBucket, setIDs ...*SetID) (I
invoiceReader := bytes.NewReader(invoiceBytes)
return deserializeInvoice(invoiceReader)
invoice, err := deserializeInvoice(invoiceReader)
if err != nil {
return Invoice{}, err
@ -2538,11 +2609,19 @@ func settleHtlcsAmp(invoice *Invoice,
// updateInvoice fetches the invoice, obtains the update descriptor from the
// callback and applies the updates in a single db transaction.
func (d *DB) updateInvoice(hash *lntypes.Hash, invoices,
func (d *DB) updateInvoice(hash *lntypes.Hash, refSetID *SetID, invoices,
settleIndex, setIDIndex kvdb.RwBucket, invoiceNum []byte,
callback InvoiceUpdateCallback) (*Invoice, error) {
invoice, err := fetchInvoice(invoiceNum, invoices)
// If the set ID is non-nil, then we'll use that to filter out the
// HTLCs for AMP invoice so we don't need to read them all out to
// satisfy the invoice callback below. If it's nil, then we pass in the
// zero set ID which means no HTLCs will be read out.
var invSetID SetID
if refSetID != nil {
invSetID = *refSetID
}
invoice, err := fetchInvoice(invoiceNum, invoices, &invSetID)
if err != nil {
return nil, err
}
@ -2573,6 +2652,11 @@ func (d *DB) updateInvoice(hash *lntypes.Hash, invoices,
if update.State != nil {
setID = update.State.SetID
newState = update.State.NewState
} else if update.SetID != nil {
// When we go to cancel HTLCs, there's no new state, but the
// set of HTLCs to be cancelled along with the setID affected
// will be passed in.
setID = (*[32]byte)(update.SetID)
}
now := d.clock.Now()
@ -2741,7 +2825,6 @@ func (d *DB) updateInvoice(hash *lntypes.Hash, invoices,
// identical.
case ok && *htlc.AMP.Preimage != preimage:
return nil, ErrHTLCPreimageAlreadyExists
}
}

View File

@ -688,7 +688,7 @@ func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef channeldb.InvoiceRef,
// Intercept the update descriptor to set the local updated variable. If
// no invoice update is performed, we can return early.
var updated bool
invoice, err := i.cdb.UpdateInvoice(invoiceRef,
invoice, err := i.cdb.UpdateInvoice(invoiceRef, nil,
func(invoice *channeldb.Invoice) (
*channeldb.InvoiceUpdateDesc, error) {
@ -979,6 +979,7 @@ func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
)
invoice, err := i.cdb.UpdateInvoice(
ctx.invoiceRef(),
(*channeldb.SetID)(ctx.setID()),
func(inv *channeldb.Invoice) (
*channeldb.InvoiceUpdateDesc, error) {
@ -1181,7 +1182,7 @@ func (i *InvoiceRegistry) SettleHodlInvoice(preimage lntypes.Preimage) error {
hash := preimage.Hash()
invoiceRef := channeldb.InvoiceRefByHash(hash)
invoice, err := i.cdb.UpdateInvoice(invoiceRef, updateInvoice)
invoice, err := i.cdb.UpdateInvoice(invoiceRef, nil, updateInvoice)
if err != nil {
log.Errorf("SettleHodlInvoice with preimage %v: %v",
preimage, err)
@ -1265,7 +1266,7 @@ func (i *InvoiceRegistry) cancelInvoiceImpl(payHash lntypes.Hash,
}
invoiceRef := channeldb.InvoiceRefByHash(payHash)
invoice, err := i.cdb.UpdateInvoice(invoiceRef, updateInvoice)
invoice, err := i.cdb.UpdateInvoice(invoiceRef, nil, updateInvoice)
// Implement idempotency by returning success if the invoice was already
// canceled.