From f969d81e1a92d1c4e9869693d23c52c0ce1e53b5 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 14 Oct 2021 18:25:11 +0200 Subject: [PATCH] channeldb: for AMP, store set ID w/ invoice num in settle index This change allows us to deliver notifications to a user of all the settled recurring payments to the same payment_addr in the order that they occurred. --- channeldb/invoices.go | 152 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 143 insertions(+), 9 deletions(-) diff --git a/channeldb/invoices.go b/channeldb/invoices.go index a98197587..0544ee651 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -1389,13 +1389,28 @@ func (d *DB) InvoicesSettledSince(sinceSettleIndex uint64) ([]Invoice, error) { // We'll seek to the starting index, then manually advance the // cursor in order to skip the entry with the since add index. invoiceCursor.Seek(startIndex[:]) - seqNo, invoiceKey := invoiceCursor.Next() + seqNo, indexValue := invoiceCursor.Next() - for ; seqNo != nil && bytes.Compare(seqNo, startIndex[:]) > 0; seqNo, invoiceKey = invoiceCursor.Next() { + for ; seqNo != nil && bytes.Compare(seqNo, startIndex[:]) > 0; seqNo, indexValue = invoiceCursor.Next() { + + // Depending on the length of the index value, this may + // or may not be an AMP invoice, so we'll extract the + // invoice value into two components: the invoice num, + // and the setID (may not be there). + var ( + invoiceKey [4]byte + setID *SetID + ) + + valueLen := copy(invoiceKey[:], indexValue) + if len(indexValue) == invoiceSetIDKeyLen { + setID = new(SetID) + copy(setID[:], indexValue[valueLen:]) + } // For each key found, we'll look up the actual // invoice, then accumulate it into our return value. - invoice, err := fetchInvoice(invoiceKey, invoices) + invoice, err := fetchInvoice(invoiceKey[:], invoices, setID) if err != nil { return err } @@ -1844,6 +1859,48 @@ func fetchInvoice(invoiceNum []byte, invoices kvdb.RBucket, setIDs ...*SetID) (I return invoice, nil } +// fetchInvoiceStateAMP retrieves the state of all the relevant sub-invoice for +// an AMP invoice. This methods only decode the relevant state vs the entire +// invoice. +func fetchInvoiceStateAMP(invoiceNum []byte, + invoices kvdb.RBucket) (AMPInvoiceState, error) { + + // Fetch the raw invoice bytes. + invoiceBytes := invoices.Get(invoiceNum) + if invoiceBytes == nil { + return nil, ErrInvoiceNotFound + } + + r := bytes.NewReader(invoiceBytes) + + var bodyLen int64 + err := binary.Read(r, byteOrder, &bodyLen) + if err != nil { + return nil, err + } + + // Next, we'll make a new TLV stream that only attempts to decode the + // bytes we actually need. + ampState := make(AMPInvoiceState) + tlvStream, err := tlv.NewStream( + // Invoice AMP state. + tlv.MakeDynamicRecord( + invoiceAmpStateType, &State, nil, + ampStateEncoder, ampStateDecoder, + ), + ) + if err != nil { + return nil, err + } + + invoiceReader := io.LimitReader(r, bodyLen) + if err = tlvStream.Decode(invoiceReader); err != nil { + return nil, err + } + + return ampState, nil +} + func deserializeInvoice(r io.Reader) (Invoice, error) { var ( preimageBytes [32]byte @@ -2646,7 +2703,7 @@ func (d *DB) updateInvoice(hash *lntypes.Hash, invoices, // setSettleMetaFields. if !invoiceIsAMP && update.State.NewState == ContractSettled { err := setSettleMetaFields( - settleIndex, invoiceNum, &invoice, now, + settleIndex, invoiceNum, &invoice, now, nil, ) if err != nil { return nil, err @@ -2754,6 +2811,18 @@ func (d *DB) updateInvoice(hash *lntypes.Hash, invoices, invoice.AmtPaid += amtPaid } + // As we don't update the settle index above for AMP invoices, we'll do + // it here for each sub-AMP invoice that was settled. + for settledSetID := range settledSetIDs { + settledSetID := settledSetID + err := setSettleMetaFields( + settleIndex, invoiceNum, &invoice, now, &settledSetID, + ) + if err != nil { + return nil, err + } + } + // Reserialize and update invoice. var buf bytes.Buffer if err := serializeInvoice(&buf, &invoice); err != nil { @@ -2986,9 +3055,11 @@ func updateHtlc(resolveTime time.Time, htlc *InvoiceHTLC, } // setSettleMetaFields updates the metadata associated with settlement of an -// invoice. +// invoice. If a non-nil setID is passed in, then the value will be append to +// the invoice number as well, in order to allow us to detect repeated payments +// to the same AMP invoices "across time". func setSettleMetaFields(settleIndex kvdb.RwBucket, invoiceNum []byte, - invoice *Invoice, now time.Time) error { + invoice *Invoice, now time.Time, setID *SetID) error { // Now that we know the invoice hasn't already been settled, we'll // update the settle index so we can place this settle event in the @@ -2998,14 +3069,39 @@ func setSettleMetaFields(settleIndex kvdb.RwBucket, invoiceNum []byte, return err } + // Make a new byte array on the stack that can potentially store the 4 + // byte invoice number along w/ the 32 byte set ID. We capture valueLen + // here which is the number of bytes copied so we can only store the 4 + // bytes if this is a non-AMP invoice. + var indexKey [invoiceSetIDKeyLen]byte + valueLen := copy(indexKey[:], invoiceNum) + + if setID != nil { + valueLen += copy(indexKey[valueLen:], setID[:]) + } + var seqNoBytes [8]byte byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo) - if err := settleIndex.Put(seqNoBytes[:], invoiceNum); err != nil { + if err := settleIndex.Put(seqNoBytes[:], indexKey[:valueLen]); err != nil { return err } - invoice.SettleDate = now - invoice.SettleIndex = nextSettleSeqNo + // If the setID is nil, then this means that this is a non-AMP settle, + // so we'll update the invoice settle index directly. + if setID == nil { + invoice.SettleDate = now + invoice.SettleIndex = nextSettleSeqNo + } else { + // If the set ID isn't blank, we'll update the AMP state map + // which tracks when each of the setIDs associated with a given + // AMP invoice are settled. + ampState := invoice.AMPState[*setID] + + ampState.SettleDate = now + ampState.SettleIndex = nextSettleSeqNo + + invoice.AMPState[*setID] = ampState + } return nil } @@ -3036,6 +3132,38 @@ func delAMPInvoices(invoiceNum []byte, invoiceBucket kvdb.RwBucket) error { return nil } +// delAMPSettleIndex removes all the entries in the settle index associated +// with a given AMP invoice. +func delAMPSettleIndex(invoiceNum []byte, invoices, settleIndex kvdb.RwBucket) error { + // First, we need to grab the AMP invoice state to see if there's + // anything that we even need to delete. + ampState, err := fetchInvoiceStateAMP(invoiceNum, invoices) + if err != nil { + return err + } + + // If there's no AMP state at all (non-AMP invoice), then we can return early. + if len(ampState) == 0 { + return nil + } + + // Otherwise, we'll need to iterate and delete each settle index within + // the set of returned entries. + var settleIndexKey [8]byte + for _, subState := range ampState { + byteOrder.PutUint64( + settleIndexKey[:], subState.SettleIndex, + ) + + if err := settleIndex.Delete(settleIndexKey[:]); err != nil { + return err + } + } + + return nil +} + +// InvoiceDeleteRef holds a reference to an invoice to be deleted. type InvoiceDeleteRef struct { // PayHash is the payment hash of the target invoice. All invoices are // currently indexed by payment hash. @@ -3165,6 +3293,12 @@ func (d *DB) DeleteInvoice(invoicesToDelete []InvoiceDeleteRef) error { // this is an AMP invoice, then we'll also need to // delete the set HTLC set stored as a key prefix. For // non-AMP invoices, this'll be a noop. + err = delAMPSettleIndex( + invoiceKey, invoices, settleIndex, + ) + if err != nil { + return err + } err = delAMPInvoices(invoiceKey, invoices) if err != nil { return err