routerrpc: convert sendpayment to async

Modify the routerrpc SendPayment api to asynchronous. This allows
callers to pick up a payment after the rpc connection was lost or lnd
was restarted.
This commit is contained in:
Joost Jager
2019-03-22 10:21:25 +01:00
parent 07d289c14e
commit f03533c67a
8 changed files with 571 additions and 239 deletions

View File

@@ -10,15 +10,18 @@ import (
"os"
"path/filepath"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/macaroon-bakery.v2/bakery"
)
@@ -53,6 +56,10 @@ var (
Entity: "offchain",
Action: "write",
}},
"/routerrpc.Router/TrackPayment": {{
Entity: "offchain",
Action: "read",
}},
"/routerrpc.Router/EstimateRouteFee": {{
Entity: "offchain",
Action: "read",
@@ -185,23 +192,35 @@ func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error {
// payment, or cannot find a route that satisfies the constraints in the
// PaymentRequest, then an error will be returned. Otherwise, the payment
// pre-image, along with the final route will be returned.
func (s *Server) SendPayment(ctx context.Context,
req *PaymentRequest) (*PaymentResponse, error) {
func (s *Server) SendPayment(req *SendPaymentRequest,
stream Router_SendPaymentServer) error {
payment, err := s.cfg.RouterBackend.extractIntentFromSendRequest(req)
if err != nil {
return nil, err
return err
}
preImage, _, err := s.cfg.Router.SendPayment(payment)
err = s.cfg.Router.SendPaymentAsync(payment)
if err != nil {
return nil, err
// Transform user errors to grpc code.
if err == channeldb.ErrPaymentInFlight ||
err == channeldb.ErrAlreadyPaid {
log.Debugf("SendPayment async result for hash %x: %v",
payment.PaymentHash, err)
return status.Error(
codes.AlreadyExists, err.Error(),
)
}
log.Errorf("SendPayment async error for hash %x: %v",
payment.PaymentHash, err)
return err
}
return &PaymentResponse{
PayHash: payment.PaymentHash[:],
PreImage: preImage[:],
}, nil
return s.trackPayment(payment.PaymentHash, stream)
}
// EstimateRouteFee allows callers to obtain a lower bound w.r.t how much it
@@ -460,3 +479,89 @@ func (s *Server) QueryMissionControl(ctx context.Context,
return &response, nil
}
// TrackPayment returns a stream of payment state updates. The stream is
// closed when the payment completes.
func (s *Server) TrackPayment(request *TrackPaymentRequest,
stream Router_TrackPaymentServer) error {
paymentHash, err := lntypes.MakeHash(request.PaymentHash)
if err != nil {
return err
}
log.Debugf("TrackPayment called for payment %v", paymentHash)
return s.trackPayment(paymentHash, stream)
}
// trackPayment writes payment status updates to the provided stream.
func (s *Server) trackPayment(paymentHash lntypes.Hash,
stream Router_TrackPaymentServer) error {
// Subscribe to the outcome of this payment.
inFlight, resultChan, err := s.cfg.RouterBackend.Tower.SubscribePayment(
paymentHash,
)
switch {
case err == channeldb.ErrPaymentNotInitiated:
return status.Error(codes.NotFound, err.Error())
case err != nil:
return err
}
// If it is in flight, send a state update to the client. Payment status
// update streams are expected to always send the current payment state
// immediately.
if inFlight {
err = stream.Send(&PaymentStatus{
State: PaymentState_IN_FLIGHT,
})
if err != nil {
return err
}
}
// Wait for the outcome of the payment. For payments that have
// completed, the result should already be waiting on the channel.
select {
case result := <-resultChan:
// Marshall result to rpc type.
var status PaymentStatus
if result.Success {
log.Debugf("Payment %v successfully completed",
paymentHash)
status.State = PaymentState_SUCCEEDED
status.Preimage = result.Preimage[:]
status.Route = s.cfg.RouterBackend.MarshallRoute(
result.Route,
)
} else {
switch result.FailureReason {
case channeldb.FailureReasonTimeout:
status.State = PaymentState_FAILED_TIMEOUT
case channeldb.FailureReasonNoRoute:
status.State = PaymentState_FAILED_NO_ROUTE
default:
return errors.New("unknown failure reason")
}
}
// Send event to the client.
err = stream.Send(&status)
if err != nil {
return err
}
case <-stream.Context().Done():
log.Debugf("Payment status stream %v canceled", paymentHash)
return stream.Context().Err()
}
return nil
}