mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-28 22:50:58 +02:00
routing: always update payment in the same goroutine
This commit refactors `collectResultAsync` such that this method is now only responsible for collecting results from the switch. The method `decideNextStep` is expanded to process these results in the same goroutine where we fetch the payment from db, to make sure the lifecycle loop always have a consistent view of a given payment.
This commit is contained in:
@@ -24,6 +24,17 @@ import (
|
||||
// the payment lifecycle is exiting .
|
||||
var ErrPaymentLifecycleExiting = errors.New("payment lifecycle exiting")
|
||||
|
||||
// switchResult is the result sent back from the switch after processing the
|
||||
// HTLC.
|
||||
type switchResult struct {
|
||||
// attempt is the HTLC sent to the switch.
|
||||
attempt *channeldb.HTLCAttempt
|
||||
|
||||
// result is sent from the switch which contains either a preimage if
|
||||
// ths HTLC is settled or an error if it's failed.
|
||||
result *htlcswitch.PaymentResult
|
||||
}
|
||||
|
||||
// paymentLifecycle holds all information about the current state of a payment
|
||||
// needed to resume if from any point.
|
||||
type paymentLifecycle struct {
|
||||
@@ -39,11 +50,9 @@ type paymentLifecycle struct {
|
||||
// to stop.
|
||||
quit chan struct{}
|
||||
|
||||
// resultCollected is used to signal that the result of an attempt has
|
||||
// been collected. A nil error means the attempt is either successful
|
||||
// or failed with temporary error. Otherwise, we should exit the
|
||||
// lifecycle loop as a terminal error has occurred.
|
||||
resultCollected chan error
|
||||
// resultCollected is used to send the result returned from the switch
|
||||
// for a given HTLC attempt.
|
||||
resultCollected chan *switchResult
|
||||
|
||||
// resultCollector is a function that is used to collect the result of
|
||||
// an HTLC attempt, which is always mounted to `p.collectResultAsync`
|
||||
@@ -66,7 +75,7 @@ func newPaymentLifecycle(r *ChannelRouter, feeLimit lnwire.MilliSatoshi,
|
||||
shardTracker: shardTracker,
|
||||
currentHeight: currentHeight,
|
||||
quit: make(chan struct{}),
|
||||
resultCollected: make(chan error, 1),
|
||||
resultCollected: make(chan *switchResult, 1),
|
||||
firstHopCustomRecords: firstHopCustomRecords,
|
||||
}
|
||||
|
||||
@@ -137,20 +146,27 @@ func (p *paymentLifecycle) decideNextStep(
|
||||
log.Tracef("Waiting for attempt results for payment %v",
|
||||
p.identifier)
|
||||
|
||||
// Otherwise we wait for one HTLC attempt then continue
|
||||
// the lifecycle.
|
||||
//
|
||||
// NOTE: we don't check `p.quit` since `decideNextStep` is
|
||||
// running in the same goroutine as `resumePayment`.
|
||||
// Otherwise we wait for the result for one HTLC attempt then
|
||||
// continue the lifecycle.
|
||||
select {
|
||||
case err := <-p.resultCollected:
|
||||
// If an error is returned, exit with it.
|
||||
case r := <-p.resultCollected:
|
||||
log.Tracef("Received attempt result for payment %v",
|
||||
p.identifier)
|
||||
|
||||
// Handle the result here. If there's no error, we will
|
||||
// return stepSkip and move to the next lifecycle
|
||||
// iteration, which will refresh the payment and wait
|
||||
// for the next attempt result, if any.
|
||||
_, err := p.handleAttemptResult(r.attempt, r.result)
|
||||
|
||||
// We would only get a DB-related error here, which will
|
||||
// cause us to abort the payment flow.
|
||||
if err != nil {
|
||||
return stepExit, err
|
||||
}
|
||||
|
||||
log.Tracef("Received attempt result for payment %v",
|
||||
p.identifier)
|
||||
case <-p.quit:
|
||||
return stepExit, ErrPaymentLifecycleExiting
|
||||
|
||||
case <-p.router.quit:
|
||||
return stepExit, ErrRouterShuttingDown
|
||||
@@ -430,50 +446,57 @@ type attemptResult struct {
|
||||
}
|
||||
|
||||
// collectResultAsync launches a goroutine that will wait for the result of the
|
||||
// given HTLC attempt to be available then handle its result. Once received, it
|
||||
// will send a nil error to channel `resultCollected` to indicate there's a
|
||||
// result.
|
||||
// given HTLC attempt to be available then save its result in a map. Once
|
||||
// received, it will send the result returned from the switch to channel
|
||||
// `resultCollected`.
|
||||
func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) {
|
||||
log.Debugf("Collecting result for attempt %v in payment %v",
|
||||
attempt.AttemptID, p.identifier)
|
||||
|
||||
go func() {
|
||||
// Block until the result is available.
|
||||
_, err := p.collectResult(attempt)
|
||||
result, err := p.collectResult(attempt)
|
||||
if err != nil {
|
||||
log.Errorf("Error collecting result for attempt %v "+
|
||||
"in payment %v: %v", attempt.AttemptID,
|
||||
log.Errorf("Error collecting result for attempt %v in "+
|
||||
"payment %v: %v", attempt.AttemptID,
|
||||
p.identifier, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("Result collected for attempt %v in payment %v",
|
||||
attempt.AttemptID, p.identifier)
|
||||
|
||||
// Once the result is collected, we signal it by writing the
|
||||
// error to `resultCollected`.
|
||||
// Create a switch result and send it to the resultCollected
|
||||
// chan, which gets processed when the lifecycle is waiting for
|
||||
// a result to be received in decideNextStep.
|
||||
r := &switchResult{
|
||||
attempt: attempt,
|
||||
result: result,
|
||||
}
|
||||
|
||||
// Signal that a result has been collected.
|
||||
select {
|
||||
// Send the signal or quit.
|
||||
case p.resultCollected <- err:
|
||||
// Send the result so decideNextStep can proceed.
|
||||
case p.resultCollected <- r:
|
||||
|
||||
case <-p.quit:
|
||||
log.Debugf("Lifecycle exiting while collecting "+
|
||||
"result for payment %v", p.identifier)
|
||||
|
||||
case <-p.router.quit:
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// collectResult waits for the result for the given attempt to be available
|
||||
// from the Switch, then records the attempt outcome with the control tower.
|
||||
// An attemptResult is returned, indicating the final outcome of this HTLC
|
||||
// attempt.
|
||||
func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
|
||||
*attemptResult, error) {
|
||||
// collectResult waits for the result of the given HTLC attempt to be sent by
|
||||
// the switch and returns it.
|
||||
func (p *paymentLifecycle) collectResult(
|
||||
attempt *channeldb.HTLCAttempt) (*htlcswitch.PaymentResult, error) {
|
||||
|
||||
log.Tracef("Collecting result for attempt %v", spew.Sdump(attempt))
|
||||
|
||||
result := &htlcswitch.PaymentResult{}
|
||||
|
||||
// Regenerate the circuit for this attempt.
|
||||
circuit, err := attempt.Circuit()
|
||||
|
||||
@@ -489,8 +512,7 @@ func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
|
||||
if err != nil {
|
||||
log.Debugf("Unable to generate circuit for attempt %v: %v",
|
||||
attempt.AttemptID, err)
|
||||
|
||||
return p.failAttempt(attempt.AttemptID, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Using the created circuit, initialize the error decrypter, so we can
|
||||
@@ -516,22 +538,21 @@ func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
|
||||
log.Errorf("Failed getting result for attemptID %d "+
|
||||
"from switch: %v", attempt.AttemptID, err)
|
||||
|
||||
return p.handleSwitchErr(attempt, err)
|
||||
result.Error = err
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// The switch knows about this payment, we'll wait for a result to be
|
||||
// available.
|
||||
var (
|
||||
result *htlcswitch.PaymentResult
|
||||
ok bool
|
||||
)
|
||||
|
||||
select {
|
||||
case result, ok = <-resultChan:
|
||||
case r, ok := <-resultChan:
|
||||
if !ok {
|
||||
return nil, htlcswitch.ErrSwitchExiting
|
||||
}
|
||||
|
||||
result = r
|
||||
|
||||
case <-p.quit:
|
||||
return nil, ErrPaymentLifecycleExiting
|
||||
|
||||
@@ -539,46 +560,7 @@ func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
|
||||
return nil, ErrRouterShuttingDown
|
||||
}
|
||||
|
||||
// In case of a payment failure, fail the attempt with the control
|
||||
// tower and return.
|
||||
if result.Error != nil {
|
||||
return p.handleSwitchErr(attempt, result.Error)
|
||||
}
|
||||
|
||||
// We successfully got a payment result back from the switch.
|
||||
log.Debugf("Payment %v succeeded with pid=%v",
|
||||
p.identifier, attempt.AttemptID)
|
||||
|
||||
// Report success to mission control.
|
||||
err = p.router.cfg.MissionControl.ReportPaymentSuccess(
|
||||
attempt.AttemptID, &attempt.Route,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Error reporting payment success to mc: %v", err)
|
||||
}
|
||||
|
||||
// In case of success we atomically store settle result to the DB move
|
||||
// the shard to the settled state.
|
||||
htlcAttempt, err := p.router.cfg.Control.SettleAttempt(
|
||||
p.identifier, attempt.AttemptID,
|
||||
&channeldb.HTLCSettleInfo{
|
||||
Preimage: result.Preimage,
|
||||
SettleTime: p.router.cfg.Clock.Now(),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Error settling attempt %v for payment %v with "+
|
||||
"preimage %v: %v", attempt.AttemptID, p.identifier,
|
||||
result.Preimage, err)
|
||||
|
||||
// We won't mark the attempt as failed since we already have
|
||||
// the preimage.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &attemptResult{
|
||||
attempt: htlcAttempt,
|
||||
}, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// registerAttempt is responsible for creating and saving an HTLC attempt in db
|
||||
@@ -1111,3 +1093,65 @@ func (p *paymentLifecycle) reloadPayment() (DBMPPayment,
|
||||
|
||||
return payment, ps, nil
|
||||
}
|
||||
|
||||
// handleAttemptResult processes the result of an HTLC attempt returned from
|
||||
// the htlcswitch.
|
||||
func (p *paymentLifecycle) handleAttemptResult(attempt *channeldb.HTLCAttempt,
|
||||
result *htlcswitch.PaymentResult) (*attemptResult, error) {
|
||||
|
||||
// If the result has an error, we need to further process it by failing
|
||||
// the attempt and maybe fail the payment.
|
||||
if result.Error != nil {
|
||||
return p.handleSwitchErr(attempt, result.Error)
|
||||
}
|
||||
|
||||
// We got an attempt settled result back from the switch.
|
||||
log.Debugf("Payment(%v): attempt(%v) succeeded", p.identifier,
|
||||
attempt.AttemptID)
|
||||
|
||||
// Report success to mission control.
|
||||
err := p.router.cfg.MissionControl.ReportPaymentSuccess(
|
||||
attempt.AttemptID, &attempt.Route,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Error reporting payment success to mc: %v", err)
|
||||
}
|
||||
|
||||
// In case of success we atomically store settle result to the DB and
|
||||
// move the shard to the settled state.
|
||||
htlcAttempt, err := p.router.cfg.Control.SettleAttempt(
|
||||
p.identifier, attempt.AttemptID,
|
||||
&channeldb.HTLCSettleInfo{
|
||||
Preimage: result.Preimage,
|
||||
SettleTime: p.router.cfg.Clock.Now(),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Error settling attempt %v for payment %v with "+
|
||||
"preimage %v: %v", attempt.AttemptID, p.identifier,
|
||||
result.Preimage, err)
|
||||
|
||||
// We won't mark the attempt as failed since we already have
|
||||
// the preimage.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &attemptResult{
|
||||
attempt: htlcAttempt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// collectAndHandleResult waits for the result for the given attempt to be
|
||||
// available from the Switch, then records the attempt outcome with the control
|
||||
// tower. An attemptResult is returned, indicating the final outcome of this
|
||||
// HTLC attempt.
|
||||
func (p *paymentLifecycle) collectAndHandleResult(
|
||||
attempt *channeldb.HTLCAttempt) (*attemptResult, error) {
|
||||
|
||||
result, err := p.collectResult(attempt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p.handleAttemptResult(attempt, result)
|
||||
}
|
||||
|
Reference in New Issue
Block a user