channeldb: for AMP, store set ID w/ invoice num in settle index

This change allows us to deliver notifications to a user of all the
settled recurring payments to the same payment_addr in the order that
they occurred.
This commit is contained in:
Olaoluwa Osuntokun
2021-10-14 18:25:11 +02:00
parent 3b28ad6d72
commit f969d81e1a

View File

@ -1389,13 +1389,28 @@ func (d *DB) InvoicesSettledSince(sinceSettleIndex uint64) ([]Invoice, error) {
// We'll seek to the starting index, then manually advance the
// cursor in order to skip the entry with the since add index.
invoiceCursor.Seek(startIndex[:])
seqNo, invoiceKey := invoiceCursor.Next()
seqNo, indexValue := invoiceCursor.Next()
for ; seqNo != nil && bytes.Compare(seqNo, startIndex[:]) > 0; seqNo, invoiceKey = invoiceCursor.Next() {
for ; seqNo != nil && bytes.Compare(seqNo, startIndex[:]) > 0; seqNo, indexValue = invoiceCursor.Next() {
// Depending on the length of the index value, this may
// or may not be an AMP invoice, so we'll extract the
// invoice value into two components: the invoice num,
// and the setID (may not be there).
var (
invoiceKey [4]byte
setID *SetID
)
valueLen := copy(invoiceKey[:], indexValue)
if len(indexValue) == invoiceSetIDKeyLen {
setID = new(SetID)
copy(setID[:], indexValue[valueLen:])
}
// For each key found, we'll look up the actual
// invoice, then accumulate it into our return value.
invoice, err := fetchInvoice(invoiceKey, invoices)
invoice, err := fetchInvoice(invoiceKey[:], invoices, setID)
if err != nil {
return err
}
@ -1844,6 +1859,48 @@ func fetchInvoice(invoiceNum []byte, invoices kvdb.RBucket, setIDs ...*SetID) (I
return invoice, nil
}
// fetchInvoiceStateAMP retrieves the state of all the relevant sub-invoice for
// an AMP invoice. This methods only decode the relevant state vs the entire
// invoice.
func fetchInvoiceStateAMP(invoiceNum []byte,
invoices kvdb.RBucket) (AMPInvoiceState, error) {
// Fetch the raw invoice bytes.
invoiceBytes := invoices.Get(invoiceNum)
if invoiceBytes == nil {
return nil, ErrInvoiceNotFound
}
r := bytes.NewReader(invoiceBytes)
var bodyLen int64
err := binary.Read(r, byteOrder, &bodyLen)
if err != nil {
return nil, err
}
// Next, we'll make a new TLV stream that only attempts to decode the
// bytes we actually need.
ampState := make(AMPInvoiceState)
tlvStream, err := tlv.NewStream(
// Invoice AMP state.
tlv.MakeDynamicRecord(
invoiceAmpStateType, &ampState, nil,
ampStateEncoder, ampStateDecoder,
),
)
if err != nil {
return nil, err
}
invoiceReader := io.LimitReader(r, bodyLen)
if err = tlvStream.Decode(invoiceReader); err != nil {
return nil, err
}
return ampState, nil
}
func deserializeInvoice(r io.Reader) (Invoice, error) {
var (
preimageBytes [32]byte
@ -2646,7 +2703,7 @@ func (d *DB) updateInvoice(hash *lntypes.Hash, invoices,
// setSettleMetaFields.
if !invoiceIsAMP && update.State.NewState == ContractSettled {
err := setSettleMetaFields(
settleIndex, invoiceNum, &invoice, now,
settleIndex, invoiceNum, &invoice, now, nil,
)
if err != nil {
return nil, err
@ -2754,6 +2811,18 @@ func (d *DB) updateInvoice(hash *lntypes.Hash, invoices,
invoice.AmtPaid += amtPaid
}
// As we don't update the settle index above for AMP invoices, we'll do
// it here for each sub-AMP invoice that was settled.
for settledSetID := range settledSetIDs {
settledSetID := settledSetID
err := setSettleMetaFields(
settleIndex, invoiceNum, &invoice, now, &settledSetID,
)
if err != nil {
return nil, err
}
}
// Reserialize and update invoice.
var buf bytes.Buffer
if err := serializeInvoice(&buf, &invoice); err != nil {
@ -2986,9 +3055,11 @@ func updateHtlc(resolveTime time.Time, htlc *InvoiceHTLC,
}
// setSettleMetaFields updates the metadata associated with settlement of an
// invoice.
// invoice. If a non-nil setID is passed in, then the value will be append to
// the invoice number as well, in order to allow us to detect repeated payments
// to the same AMP invoices "across time".
func setSettleMetaFields(settleIndex kvdb.RwBucket, invoiceNum []byte,
invoice *Invoice, now time.Time) error {
invoice *Invoice, now time.Time, setID *SetID) error {
// Now that we know the invoice hasn't already been settled, we'll
// update the settle index so we can place this settle event in the
@ -2998,14 +3069,39 @@ func setSettleMetaFields(settleIndex kvdb.RwBucket, invoiceNum []byte,
return err
}
// Make a new byte array on the stack that can potentially store the 4
// byte invoice number along w/ the 32 byte set ID. We capture valueLen
// here which is the number of bytes copied so we can only store the 4
// bytes if this is a non-AMP invoice.
var indexKey [invoiceSetIDKeyLen]byte
valueLen := copy(indexKey[:], invoiceNum)
if setID != nil {
valueLen += copy(indexKey[valueLen:], setID[:])
}
var seqNoBytes [8]byte
byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo)
if err := settleIndex.Put(seqNoBytes[:], invoiceNum); err != nil {
if err := settleIndex.Put(seqNoBytes[:], indexKey[:valueLen]); err != nil {
return err
}
invoice.SettleDate = now
invoice.SettleIndex = nextSettleSeqNo
// If the setID is nil, then this means that this is a non-AMP settle,
// so we'll update the invoice settle index directly.
if setID == nil {
invoice.SettleDate = now
invoice.SettleIndex = nextSettleSeqNo
} else {
// If the set ID isn't blank, we'll update the AMP state map
// which tracks when each of the setIDs associated with a given
// AMP invoice are settled.
ampState := invoice.AMPState[*setID]
ampState.SettleDate = now
ampState.SettleIndex = nextSettleSeqNo
invoice.AMPState[*setID] = ampState
}
return nil
}
@ -3036,6 +3132,38 @@ func delAMPInvoices(invoiceNum []byte, invoiceBucket kvdb.RwBucket) error {
return nil
}
// delAMPSettleIndex removes all the entries in the settle index associated
// with a given AMP invoice.
func delAMPSettleIndex(invoiceNum []byte, invoices, settleIndex kvdb.RwBucket) error {
// First, we need to grab the AMP invoice state to see if there's
// anything that we even need to delete.
ampState, err := fetchInvoiceStateAMP(invoiceNum, invoices)
if err != nil {
return err
}
// If there's no AMP state at all (non-AMP invoice), then we can return early.
if len(ampState) == 0 {
return nil
}
// Otherwise, we'll need to iterate and delete each settle index within
// the set of returned entries.
var settleIndexKey [8]byte
for _, subState := range ampState {
byteOrder.PutUint64(
settleIndexKey[:], subState.SettleIndex,
)
if err := settleIndex.Delete(settleIndexKey[:]); err != nil {
return err
}
}
return nil
}
// InvoiceDeleteRef holds a reference to an invoice to be deleted.
type InvoiceDeleteRef struct {
// PayHash is the payment hash of the target invoice. All invoices are
// currently indexed by payment hash.
@ -3165,6 +3293,12 @@ func (d *DB) DeleteInvoice(invoicesToDelete []InvoiceDeleteRef) error {
// this is an AMP invoice, then we'll also need to
// delete the set HTLC set stored as a key prefix. For
// non-AMP invoices, this'll be a noop.
err = delAMPSettleIndex(
invoiceKey, invoices, settleIndex,
)
if err != nil {
return err
}
err = delAMPInvoices(invoiceKey, invoices)
if err != nil {
return err