From 5d20c02ea8c768b34089e587003805bc77afe0b4 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 24 Apr 2018 20:57:30 -0700 Subject: [PATCH] channeldb: add new add+settle index to invoice database In this commit, we add two new indexes to the invoice database: the add index, and the settle index. These to indexes essentially form a time series index on top of the existing primary index bucket. Each time an invoice is added, we'll advance the addIndex seqno, and then create a mapping from seqNo -> invoiceNum. Each time an invoice is settled, we'll do the same, but within the settle index. This change is required in order to allow callers to effectively seek into the current invoice database in order to obtain notifications for any invoices they may have missed out on while they were disconnected. This will allow us to implement robust streaming invoice notifications within lnd to ensure that clients never miss an event. --- channeldb/invoices.go | 177 +++++++++++++++++++++++++++++++++++------- 1 file changed, 147 insertions(+), 30 deletions(-) diff --git a/channeldb/invoices.go b/channeldb/invoices.go index 9d4a71721..85338a9c7 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -36,6 +36,26 @@ var ( // stored within the invoiceIndexBucket. Within the invoiceBucket // invoices are uniquely identified by the invoice ID. numInvoicesKey = []byte("nik") + + // addIndexBucket is an index bucket that we'll use to create a + // monotonically increasing set of add indexes. Each time we add a new + // invoice, this sequence number will be incremented and then populated + // within the new invoice. + // + // In addition to this sequence number, we map: + // + // addIndexNo => invoiceIndex + addIndexBucket = []byte("invoice-add-index") + + // settleIndexBucket is an index bucket that we'll use to create a + // monotonically increasing integer for tracking a "settle index". Each + // time an invoice is settled, this sequence number will be incremented + // as populate within the newly settled invoice. + // + // In addition to this sequence number, we map: + // + // settleIndexNo => invoiceIndex + settleIndexBucket = []byte("invoice-settle-index") ) const ( @@ -111,6 +131,25 @@ type Invoice struct { // TODO(roasbeef): later allow for multiple terms to fulfill the final // invoice: payment fragmentation, etc. Terms ContractTerm + + // AddIndex is an auto-incrementing integer that acts as a + // monotonically increasing sequence number for all invoices created. + // Clients can then use this field as a "checkpoint" of sorts when + // implementing a streaming RPC to notify consumers of instances where + // an invoice has been added before they re-connected. + // + // NOTE: This index starts at 1. + AddIndex uint64 + + // SettleIndex is an auto-incrementing integer that acts as a + // monotonically increasing sequence number for all settled invoices. + // Clients can then use this field as a "checkpoint" of sorts when + // implementing a streaming RPC to notify consumers of instances where + // an invoice has been settled before they re-connected. + // + // NOTE: This index starts at 1. + SettleIndex uint64 + // AmtPaid is the final amount that we ultimately accepted for pay for // this invoice. We specify this value independently as it's possible // that the invoice originally didn't specify an amount, or the sender @@ -140,24 +179,35 @@ func validateInvoice(i *Invoice) error { // has *any* payment hashes which already exists within the database, then the // insertion will be aborted and rejected due to the strict policy banning any // duplicate payment hashes. -func (d *DB) AddInvoice(i *Invoice) error { - if err := validateInvoice(i); err != nil { +func (d *DB) AddInvoice(newInvoice *Invoice) error { + if err := validateInvoice(newInvoice); err != nil { return err } - return d.Update(func(tx *bolt.Tx) error { + + err := d.Update(func(tx *bolt.Tx) error { invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) if err != nil { return err } - invoiceIndex, err := invoices.CreateBucketIfNotExists(invoiceIndexBucket) + invoiceIndex, err := invoices.CreateBucketIfNotExists( + invoiceIndexBucket, + ) + if err != nil { + return err + } + addIndex, err := invoices.CreateBucketIfNotExists( + addIndexBucket, + ) if err != nil { return err } // Ensure that an invoice an identical payment hash doesn't // already exist within the index. - paymentHash := sha256.Sum256(i.Terms.PaymentPreimage[:]) + paymentHash := sha256.Sum256( + newInvoice.Terms.PaymentPreimage[:], + ) if invoiceIndex.Get(paymentHash[:]) != nil { return ErrDuplicateInvoice } @@ -169,14 +219,24 @@ func (d *DB) AddInvoice(i *Invoice) error { if invoiceCounter == nil { var scratch [4]byte byteOrder.PutUint32(scratch[:], invoiceNum) - if err := invoiceIndex.Put(numInvoicesKey, scratch[:]); err != nil { + err := invoiceIndex.Put(numInvoicesKey, scratch[:]) + if err != nil { return nil } } else { invoiceNum = byteOrder.Uint32(invoiceCounter) } - return putInvoice(invoices, invoiceIndex, i, invoiceNum) + return putInvoice( + invoices, invoiceIndex, addIndex, newInvoice, invoiceNum, + ) + }) + if err != nil { + return err + } + + return err +} }) } @@ -186,8 +246,8 @@ func (d *DB) AddInvoice(i *Invoice) error { // full invoice is returned. Before setting the incoming HTLC, the values // SHOULD be checked to ensure the payer meets the agreed upon contractual // terms of the payment. -func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) { - var invoice *Invoice +func (d *DB) LookupInvoice(paymentHash [32]byte) (Invoice, error) { + var invoice Invoice err := d.View(func(tx *bolt.Tx) error { invoices := tx.Bucket(invoiceBucket) if invoices == nil { @@ -216,7 +276,7 @@ func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) { return nil }) if err != nil { - return nil, err + return invoice, err } return invoice, nil @@ -225,8 +285,8 @@ func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) { // FetchAllInvoices returns all invoices currently stored within the database. // If the pendingOnly param is true, then only unsettled invoices will be // returned, skipping all invoices that are fully settled. -func (d *DB) FetchAllInvoices(pendingOnly bool) ([]*Invoice, error) { - var invoices []*Invoice +func (d *DB) FetchAllInvoices(pendingOnly bool) ([]Invoice, error) { + var invoices []Invoice err := d.View(func(tx *bolt.Tx) error { invoiceB := tx.Bucket(invoiceBucket) @@ -275,7 +335,15 @@ func (d *DB) SettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) er if err != nil { return err } - invoiceIndex, err := invoices.CreateBucketIfNotExists(invoiceIndexBucket) + invoiceIndex, err := invoices.CreateBucketIfNotExists( + invoiceIndexBucket, + ) + if err != nil { + return err + } + settleIndex, err := invoices.CreateBucketIfNotExists( + settleIndexBucket, + ) if err != nil { return err } @@ -293,7 +361,6 @@ func (d *DB) SettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) er }) } -func putInvoice(invoices *bolt.Bucket, invoiceIndex *bolt.Bucket, i *Invoice, invoiceNum uint32) error { // Create the invoice key which is just the big-endian representation @@ -314,10 +381,30 @@ func putInvoice(invoices *bolt.Bucket, invoiceIndex *bolt.Bucket, // identify if we can settle an incoming payment, and also to possibly // allow a single invoice to have multiple payment installations. paymentHash := sha256.Sum256(i.Terms.PaymentPreimage[:]) - if err := invoiceIndex.Put(paymentHash[:], invoiceKey[:]); err != nil { + err := invoiceIndex.Put(paymentHash[:], invoiceKey[:]) + if err != nil { return err } + // Next, we'll obtain the next add invoice index (sequence + // number), so we can properly place this invoice within this + // event stream. + nextAddSeqNo, err := addIndex.NextSequence() + if err != nil { + return err + } + + // With the next sequence obtained, we'll updating the event series in + // the add index bucket to map this current add counter to the index of + // this new invoice. + var seqNoBytes [8]byte + byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo) + if err := addIndex.Put(seqNoBytes[:], invoiceKey[:]); err != nil { + return err + } + + i.AddIndex = nextAddSeqNo + // Finally, serialize the invoice itself to be written to the disk. var buf bytes.Buffer if err := serializeInvoice(&buf, i); err != nil { @@ -370,16 +457,23 @@ func serializeInvoice(w io.Writer, i *Invoice) error { return err } + if err := binary.Write(w, byteOrder, i.AddIndex); err != nil { + return err + } + if err := binary.Write(w, byteOrder, i.SettleIndex); err != nil { + return err + } if err := binary.Write(w, byteOrder, int64(i.AmtPaid)); err != nil { return err } + return nil } -func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (*Invoice, error) { +func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (Invoice, error) { invoiceBytes := invoices.Get(invoiceNum) if invoiceBytes == nil { - return nil, ErrInvoiceNotFound + return Invoice{}, ErrInvoiceNotFound } invoiceReader := bytes.NewReader(invoiceBytes) @@ -387,52 +481,60 @@ func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (*Invoice, error) { return deserializeInvoice(invoiceReader) } -func deserializeInvoice(r io.Reader) (*Invoice, error) { +func deserializeInvoice(r io.Reader) (Invoice, error) { var err error - invoice := &Invoice{} + invoice := Invoice{} // TODO(roasbeef): use read full everywhere invoice.Memo, err = wire.ReadVarBytes(r, 0, MaxMemoSize, "") if err != nil { - return nil, err + return invoice, err } invoice.Receipt, err = wire.ReadVarBytes(r, 0, MaxReceiptSize, "") if err != nil { - return nil, err + return invoice, err } invoice.PaymentRequest, err = wire.ReadVarBytes(r, 0, MaxPaymentRequestSize, "") if err != nil { - return nil, err + return invoice, err } birthBytes, err := wire.ReadVarBytes(r, 0, 300, "birth") if err != nil { - return nil, err + return invoice, err } if err := invoice.CreationDate.UnmarshalBinary(birthBytes); err != nil { - return nil, err + return invoice, err } settledBytes, err := wire.ReadVarBytes(r, 0, 300, "settled") if err != nil { - return nil, err + return invoice, err } if err := invoice.SettleDate.UnmarshalBinary(settledBytes); err != nil { - return nil, err + return invoice, err } if _, err := io.ReadFull(r, invoice.Terms.PaymentPreimage[:]); err != nil { - return nil, err + return invoice, err } var scratch [8]byte if _, err := io.ReadFull(r, scratch[:]); err != nil { - return nil, err + return invoice, err } invoice.Terms.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:])) if err := binary.Read(r, byteOrder, &invoice.Terms.Settled); err != nil { - return nil, err + return invoice, err + } + + if err := binary.Read(r, byteOrder, &invoice.AddIndex); err != nil { + return invoice, err + } + if err := binary.Read(r, byteOrder, &invoice.SettleIndex); err != nil { + return invoice, err + } if err := binary.Read(r, byteOrder, &invoice.AmtPaid); err != nil { return invoice, err } @@ -454,12 +556,27 @@ func settleInvoice(invoices, settleIndex *bolt.Bucket, invoiceNum []byte, return nil } + // Now that we know the invoice hasn't already been settled, we'll + // update the settle index so we can place this settle event in the + // proper location within our time series. + nextSettleSeqNo, err := settleIndex.NextSequence() + if err != nil { + return err + } + + var seqNoBytes [8]byte + byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo) + if err := settleIndex.Put(seqNoBytes[:], invoiceNum); err != nil { + return err + } + invoice.AmtPaid = amtPaid invoice.Terms.Settled = true invoice.SettleDate = time.Now() + invoice.SettleIndex = nextSettleSeqNo var buf bytes.Buffer - if err := serializeInvoice(&buf, invoice); err != nil { + if err := serializeInvoice(&buf, &invoice); err != nil { return nil }