channeldb: add subscription to control tower

Allows other sub-systems to subscribe to payment success and fail
events.
This commit is contained in:
Joost Jager
2019-04-30 13:24:37 +02:00
parent 87d3207baf
commit eb2647e8fc
5 changed files with 512 additions and 23 deletions

View File

@@ -8,6 +8,7 @@ import (
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/routing/route"
)
var (
@@ -33,6 +34,10 @@ var (
// ErrUnknownPaymentStatus is returned when we do not recognize the
// existing state of a payment.
ErrUnknownPaymentStatus = errors.New("unknown payment status")
// errNoAttemptInfo is returned when no attempt info is stored yet.
errNoAttemptInfo = errors.New("unable to find attempt info for " +
"inflight payment")
)
// PaymentControl implements persistence for payments and payment attempts.
@@ -187,9 +192,12 @@ func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
// duplicate payments to the same payment hash. The provided preimage is
// atomically saved to the DB for record keeping.
func (p *PaymentControl) Success(paymentHash lntypes.Hash,
preimage lntypes.Preimage) error {
preimage lntypes.Preimage) (*route.Route, error) {
var updateErr error
var (
updateErr error
route *route.Route
)
err := p.db.Batch(func(tx *bbolt.Tx) error {
// Reset the update error, to avoid carrying over an error
// from a previous execution of the batched db transaction.
@@ -211,14 +219,26 @@ func (p *PaymentControl) Success(paymentHash lntypes.Hash,
// Record the successful payment info atomically to the
// payments record.
return bucket.Put(paymentSettleInfoKey, preimage[:])
err = bucket.Put(paymentSettleInfoKey, preimage[:])
if err != nil {
return err
}
// Retrieve attempt info for the notification.
attempt, err := fetchPaymentAttempt(bucket)
if err != nil {
return err
}
route = &attempt.Route
return nil
})
if err != nil {
return err
return nil, err
}
return updateErr
return route, updateErr
}
// Fail transitions a payment into the Failed state, and records the reason the
@@ -259,6 +279,28 @@ func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
return updateErr
}
// FetchPayment returns information about a payment from the database.
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
*Payment, error) {
var payment *Payment
err := p.db.View(func(tx *bbolt.Tx) error {
bucket, err := fetchPaymentBucket(tx, paymentHash)
if err != nil {
return err
}
payment, err = fetchPayment(bucket)
return err
})
if err != nil {
return nil, err
}
return payment, nil
}
// createPaymentBucket creates or fetches the sub-bucket assigned to this
// payment hash.
func createPaymentBucket(tx *bbolt.Tx, paymentHash lntypes.Hash) (
@@ -357,6 +399,17 @@ func ensureInFlight(bucket *bbolt.Bucket) error {
}
}
// fetchPaymentAttempt fetches the payment attempt from the bucket.
func fetchPaymentAttempt(bucket *bbolt.Bucket) (*PaymentAttemptInfo, error) {
attemptData := bucket.Get(paymentAttemptInfoKey)
if attemptData == nil {
return nil, errNoAttemptInfo
}
r := bytes.NewReader(attemptData)
return deserializePaymentAttemptInfo(r)
}
// InFlightPayment is a wrapper around a payment that has status InFlight.
type InFlightPayment struct {
// Info is the PaymentCreationInfo of the in-flight payment.
@@ -408,15 +461,11 @@ func (p *PaymentControl) FetchInFlightPayments() ([]*InFlightPayment, error) {
return err
}
// Now get the attempt info, which may or may not be
// available.
attempt := bucket.Get(paymentAttemptInfoKey)
if attempt != nil {
r = bytes.NewReader(attempt)
inFlight.Attempt, err = deserializePaymentAttemptInfo(r)
if err != nil {
return err
}
// Now get the attempt info. It could be that there is
// no attempt info yet.
inFlight.Attempt, err = fetchPaymentAttempt(bucket)
if err != nil && err != errNoAttemptInfo {
return err
}
inFlights = append(inFlights, inFlight)