mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-06-02 11:09:38 +02:00
htlcswitch: remove synchronous link handoff, special-case keystone err
This allows Switch-initiated payments to be failed back if they don't make it into a commitment. Prior to this commit, a Switch-initiated HTLC could get "lost" meaning the circuit wouldn't get deleted except if conditions were "right" and the network result store would never be made aware of the HTLC's fate. Switch-initiated HTLC's are now passed to the link's mailbox to ensure they can be failed back. This change also special-cases the ErrDuplicateKeystone error from OpenCircuits(...) so that callers of updateCommitTx() in the link don't send an Error to the peer if they encounter the keystone error. With the first async change, the keystone error should now always be recoverable.
This commit is contained in:
parent
c2adb03e38
commit
87a486f1f9
@ -52,10 +52,6 @@ type packetHandler interface {
|
|||||||
//
|
//
|
||||||
// NOTE: This function should block as little as possible.
|
// NOTE: This function should block as little as possible.
|
||||||
handleSwitchPacket(*htlcPacket) error
|
handleSwitchPacket(*htlcPacket) error
|
||||||
|
|
||||||
// handleLocalAddPacket handles a locally-initiated UpdateAddHTLC
|
|
||||||
// packet. It will be processed synchronously.
|
|
||||||
handleLocalAddPacket(*htlcPacket) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// dustHandler is an interface used exclusively by the Switch to evaluate
|
// dustHandler is an interface used exclusively by the Switch to evaluate
|
||||||
|
@ -296,14 +296,6 @@ type ChannelLinkConfig struct {
|
|||||||
HtlcNotifier htlcNotifier
|
HtlcNotifier htlcNotifier
|
||||||
}
|
}
|
||||||
|
|
||||||
// localUpdateAddMsg contains a locally initiated htlc and a channel that will
|
|
||||||
// receive the outcome of the link processing. This channel must be buffered to
|
|
||||||
// prevent the link from blocking.
|
|
||||||
type localUpdateAddMsg struct {
|
|
||||||
pkt *htlcPacket
|
|
||||||
err chan error
|
|
||||||
}
|
|
||||||
|
|
||||||
// shutdownReq contains an error channel that will be used by the channelLink
|
// shutdownReq contains an error channel that will be used by the channelLink
|
||||||
// to send an error if shutdown failed. If shutdown succeeded, the channel will
|
// to send an error if shutdown failed. If shutdown succeeded, the channel will
|
||||||
// be closed.
|
// be closed.
|
||||||
@ -371,10 +363,6 @@ type channelLink struct {
|
|||||||
// by the HTLC switch.
|
// by the HTLC switch.
|
||||||
downstream chan *htlcPacket
|
downstream chan *htlcPacket
|
||||||
|
|
||||||
// localUpdateAdd is a channel to which locally initiated HTLCs are
|
|
||||||
// sent across.
|
|
||||||
localUpdateAdd chan *localUpdateAddMsg
|
|
||||||
|
|
||||||
// shutdownRequest is a channel that the channelLink will listen on to
|
// shutdownRequest is a channel that the channelLink will listen on to
|
||||||
// service shutdown requests from ShutdownIfChannelClean calls.
|
// service shutdown requests from ShutdownIfChannelClean calls.
|
||||||
shutdownRequest chan *shutdownReq
|
shutdownRequest chan *shutdownReq
|
||||||
@ -428,7 +416,6 @@ func NewChannelLink(cfg ChannelLinkConfig,
|
|||||||
hodlQueue: queue.NewConcurrentQueue(10),
|
hodlQueue: queue.NewConcurrentQueue(10),
|
||||||
log: build.NewPrefixLog(logPrefix, log),
|
log: build.NewPrefixLog(logPrefix, log),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
localUpdateAdd: make(chan *localUpdateAddMsg),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1057,7 +1044,21 @@ func (l *channelLink) htlcManager() {
|
|||||||
// the channel is not pending, otherwise we should have no htlcs to
|
// the channel is not pending, otherwise we should have no htlcs to
|
||||||
// reforward.
|
// reforward.
|
||||||
if l.ShortChanID() != hop.Source {
|
if l.ShortChanID() != hop.Source {
|
||||||
if err := l.resolveFwdPkgs(); err != nil {
|
err := l.resolveFwdPkgs()
|
||||||
|
switch err {
|
||||||
|
// No error was encountered, success.
|
||||||
|
case nil:
|
||||||
|
|
||||||
|
// If the duplicate keystone error was encountered, we'll fail
|
||||||
|
// without sending an Error message to the peer.
|
||||||
|
case ErrDuplicateKeystone:
|
||||||
|
l.fail(LinkFailureError{code: ErrCircuitError},
|
||||||
|
"temporary circuit error: %v", err)
|
||||||
|
return
|
||||||
|
|
||||||
|
// A non-nil error was encountered, send an Error message to
|
||||||
|
// the peer.
|
||||||
|
default:
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
l.fail(LinkFailureError{code: ErrInternalError},
|
||||||
"unable to resolve fwd pkgs: %v", err)
|
"unable to resolve fwd pkgs: %v", err)
|
||||||
return
|
return
|
||||||
@ -1180,10 +1181,6 @@ func (l *channelLink) htlcManager() {
|
|||||||
case pkt := <-l.downstream:
|
case pkt := <-l.downstream:
|
||||||
l.handleDownstreamPkt(pkt)
|
l.handleDownstreamPkt(pkt)
|
||||||
|
|
||||||
// A message containing a locally initiated add was received.
|
|
||||||
case msg := <-l.localUpdateAdd:
|
|
||||||
msg.err <- l.handleDownstreamUpdateAdd(msg.pkt)
|
|
||||||
|
|
||||||
// A message from the connected peer was just received. This
|
// A message from the connected peer was just received. This
|
||||||
// indicates that we have a new incoming HTLC, either directly
|
// indicates that we have a new incoming HTLC, either directly
|
||||||
// for us, or part of a multi-hop HTLC circuit.
|
// for us, or part of a multi-hop HTLC circuit.
|
||||||
@ -1195,12 +1192,27 @@ func (l *channelLink) htlcManager() {
|
|||||||
case hodlItem := <-l.hodlQueue.ChanOut():
|
case hodlItem := <-l.hodlQueue.ChanOut():
|
||||||
htlcResolution := hodlItem.(invoices.HtlcResolution)
|
htlcResolution := hodlItem.(invoices.HtlcResolution)
|
||||||
err := l.processHodlQueue(htlcResolution)
|
err := l.processHodlQueue(htlcResolution)
|
||||||
if err != nil {
|
switch err {
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
// No error, success.
|
||||||
fmt.Sprintf("process hodl queue: %v",
|
case nil:
|
||||||
err.Error()),
|
|
||||||
|
// If the duplicate keystone error was encountered,
|
||||||
|
// fail back gracefully.
|
||||||
|
case ErrDuplicateKeystone:
|
||||||
|
l.fail(LinkFailureError{code: ErrCircuitError},
|
||||||
|
fmt.Sprintf("process hodl queue: "+
|
||||||
|
"temporary circuit error: %v",
|
||||||
|
err,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Send an Error message to the peer.
|
||||||
|
default:
|
||||||
|
l.fail(LinkFailureError{code: ErrInternalError},
|
||||||
|
fmt.Sprintf("process hodl queue: "+
|
||||||
|
"unable to update commitment:"+
|
||||||
|
" %v", err),
|
||||||
)
|
)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case req := <-l.shutdownRequest:
|
case req := <-l.shutdownRequest:
|
||||||
@ -1259,7 +1271,7 @@ loop:
|
|||||||
|
|
||||||
// Update the commitment tx.
|
// Update the commitment tx.
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
return fmt.Errorf("unable to update commitment: %v", err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -2081,7 +2093,21 @@ func (l *channelLink) ackDownStreamPackets() error {
|
|||||||
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
|
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
|
||||||
// the link.
|
// the link.
|
||||||
func (l *channelLink) updateCommitTxOrFail() bool {
|
func (l *channelLink) updateCommitTxOrFail() bool {
|
||||||
if err := l.updateCommitTx(); err != nil {
|
err := l.updateCommitTx()
|
||||||
|
switch err {
|
||||||
|
// No error encountered, success.
|
||||||
|
case nil:
|
||||||
|
|
||||||
|
// A duplicate keystone error should be resolved and is not fatal, so
|
||||||
|
// we won't send an Error message to the peer.
|
||||||
|
case ErrDuplicateKeystone:
|
||||||
|
l.fail(LinkFailureError{code: ErrCircuitError},
|
||||||
|
"temporary circuit error: %v", err)
|
||||||
|
return false
|
||||||
|
|
||||||
|
// Any other error is treated results in an Error message being sent to
|
||||||
|
// the peer.
|
||||||
|
default:
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
l.fail(LinkFailureError{code: ErrInternalError},
|
||||||
"unable to update commitment: %v", err)
|
"unable to update commitment: %v", err)
|
||||||
return false
|
return false
|
||||||
@ -2099,6 +2125,8 @@ func (l *channelLink) updateCommitTx() error {
|
|||||||
// sign a commitment state.
|
// sign a commitment state.
|
||||||
err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
|
err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// If ErrDuplicateKeystone is returned, the caller will catch
|
||||||
|
// it.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2568,33 +2596,6 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
|
|||||||
return l.mailBox.AddPacket(pkt)
|
return l.mailBox.AddPacket(pkt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleLocalAddPacket handles a locally-initiated UpdateAddHTLC packet. It
|
|
||||||
// will be processed synchronously.
|
|
||||||
//
|
|
||||||
// NOTE: Part of the packetHandler interface.
|
|
||||||
func (l *channelLink) handleLocalAddPacket(pkt *htlcPacket) error {
|
|
||||||
l.log.Tracef("received switch packet outkey=%v", pkt.outKey())
|
|
||||||
|
|
||||||
// Create a buffered result channel to prevent the link from blocking.
|
|
||||||
errChan := make(chan error, 1)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case l.localUpdateAdd <- &localUpdateAddMsg{
|
|
||||||
pkt: pkt,
|
|
||||||
err: errChan,
|
|
||||||
}:
|
|
||||||
case <-l.quit:
|
|
||||||
return ErrLinkShuttingDown
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-errChan:
|
|
||||||
return err
|
|
||||||
case <-l.quit:
|
|
||||||
return ErrLinkShuttingDown
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
|
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
|
||||||
// to us from remote peer we have a channel with.
|
// to us from remote peer we have a channel with.
|
||||||
//
|
//
|
||||||
|
@ -46,6 +46,11 @@ const (
|
|||||||
// remote party to force close the channel out on chain now as a
|
// remote party to force close the channel out on chain now as a
|
||||||
// result.
|
// result.
|
||||||
ErrRecoveryError
|
ErrRecoveryError
|
||||||
|
|
||||||
|
// ErrCircuitError indicates a duplicate keystone error was hit in the
|
||||||
|
// circuit map. This is non-fatal and will resolve itself (usually
|
||||||
|
// within several minutes).
|
||||||
|
ErrCircuitError
|
||||||
)
|
)
|
||||||
|
|
||||||
// LinkFailureError encapsulates an error that will make us fail the current
|
// LinkFailureError encapsulates an error that will make us fail the current
|
||||||
@ -94,6 +99,8 @@ func (e LinkFailureError) Error() string {
|
|||||||
return "invalid revocation"
|
return "invalid revocation"
|
||||||
case ErrRecoveryError:
|
case ErrRecoveryError:
|
||||||
return "unable to resume channel, recovery required"
|
return "unable to resume channel, recovery required"
|
||||||
|
case ErrCircuitError:
|
||||||
|
return "non-fatal circuit map error"
|
||||||
default:
|
default:
|
||||||
return "unknown error"
|
return "unknown error"
|
||||||
}
|
}
|
||||||
|
@ -729,11 +729,6 @@ func (f *mockChannelLink) handleSwitchPacket(pkt *htlcPacket) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *mockChannelLink) handleLocalAddPacket(pkt *htlcPacket) error {
|
|
||||||
_ = f.mailBox.AddPacket(pkt)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *mockChannelLink) getDustSum(remote bool) lnwire.MilliSatoshi {
|
func (f *mockChannelLink) getDustSum(remote bool) lnwire.MilliSatoshi {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -484,6 +484,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
|
|||||||
incomingHTLCID: attemptID,
|
incomingHTLCID: attemptID,
|
||||||
outgoingChanID: firstHop,
|
outgoingChanID: firstHop,
|
||||||
htlc: htlc,
|
htlc: htlc,
|
||||||
|
amount: htlc.Amount,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to fetch the target link before creating a circuit so that
|
// Attempt to fetch the target link before creating a circuit so that
|
||||||
@ -547,10 +548,11 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
|
|||||||
return ErrLocalAddFailed
|
return ErrLocalAddFailed
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send packet to link.
|
// Give the packet to the link's mailbox so that HTLC's are properly
|
||||||
|
// canceled back if the mailbox timeout elapses.
|
||||||
packet.circuit = circuit
|
packet.circuit = circuit
|
||||||
|
|
||||||
return link.handleLocalAddPacket(packet)
|
return link.handleSwitchPacket(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateForwardingPolicies sends a message to the switch to update the
|
// UpdateForwardingPolicies sends a message to the switch to update the
|
||||||
|
@ -3600,8 +3600,6 @@ func TestSwitchDustForwarding(t *testing.T) {
|
|||||||
|
|
||||||
// We'll test that once the default threshold is exceeded on the
|
// We'll test that once the default threshold is exceeded on the
|
||||||
// Alice -> Bob channel, either side's calls to SendHTLC will fail.
|
// Alice -> Bob channel, either side's calls to SendHTLC will fail.
|
||||||
// This does not rely on the mailbox sum since there's no intermediate
|
|
||||||
// hop.
|
|
||||||
//
|
//
|
||||||
// Alice will send 357 HTLC's of 700sats. Bob will also send 357 HTLC's
|
// Alice will send 357 HTLC's of 700sats. Bob will also send 357 HTLC's
|
||||||
// of 700sats. If either side attempts to send a dust HTLC, it will
|
// of 700sats. If either side attempts to send a dust HTLC, it will
|
||||||
@ -3630,6 +3628,47 @@ func TestSwitchDustForwarding(t *testing.T) {
|
|||||||
OnionBlob: blob,
|
OnionBlob: blob,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkAlmostDust := func(link *channelLink, mbox MailBox,
|
||||||
|
remote bool) bool {
|
||||||
|
|
||||||
|
timeout := time.After(15 * time.Second)
|
||||||
|
pollInterval := 300 * time.Millisecond
|
||||||
|
expectedDust := 357 * 2 * amt
|
||||||
|
|
||||||
|
for {
|
||||||
|
<-time.After(pollInterval)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
linkDust := link.getDustSum(remote)
|
||||||
|
localMailDust, remoteMailDust := mbox.DustPackets()
|
||||||
|
|
||||||
|
totalDust := linkDust
|
||||||
|
if remote {
|
||||||
|
totalDust += remoteMailDust
|
||||||
|
} else {
|
||||||
|
totalDust += localMailDust
|
||||||
|
}
|
||||||
|
|
||||||
|
if totalDust == expectedDust {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until Bob is almost at the dust threshold.
|
||||||
|
bobMbox := n.bobServer.htlcSwitch.mailOrchestrator.GetOrCreateMailBox(
|
||||||
|
n.firstBobChannelLink.ChanID(),
|
||||||
|
n.firstBobChannelLink.ShortChanID(),
|
||||||
|
)
|
||||||
|
require.True(t, checkAlmostDust(n.firstBobChannelLink, bobMbox, false))
|
||||||
|
|
||||||
// Assert that the HTLC is failed due to the dust threshold.
|
// Assert that the HTLC is failed due to the dust threshold.
|
||||||
err = n.bobServer.htlcSwitch.SendHTLC(
|
err = n.bobServer.htlcSwitch.SendHTLC(
|
||||||
aliceBobFirstHop, uint64(357), failingHtlc,
|
aliceBobFirstHop, uint64(357), failingHtlc,
|
||||||
@ -3723,6 +3762,14 @@ func TestSwitchDustForwarding(t *testing.T) {
|
|||||||
OnionBlob: blob,
|
OnionBlob: blob,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait until Alice's expected dust for the remote commitment is just
|
||||||
|
// under the dust threshold.
|
||||||
|
aliceOrch := n.aliceServer.htlcSwitch.mailOrchestrator
|
||||||
|
aliceMbox := aliceOrch.GetOrCreateMailBox(
|
||||||
|
n.aliceChannelLink.ChanID(), n.aliceChannelLink.ShortChanID(),
|
||||||
|
)
|
||||||
|
require.True(t, checkAlmostDust(n.aliceChannelLink, aliceMbox, true))
|
||||||
|
|
||||||
err = n.aliceServer.htlcSwitch.SendHTLC(
|
err = n.aliceServer.htlcSwitch.SendHTLC(
|
||||||
n.aliceChannelLink.ShortChanID(), uint64(357),
|
n.aliceChannelLink.ShortChanID(), uint64(357),
|
||||||
aliceMultihopHtlc,
|
aliceMultihopHtlc,
|
||||||
@ -3792,8 +3839,17 @@ func sendDustHtlcs(t *testing.T, n *threeHopNetwork, alice bool,
|
|||||||
OnionBlob: blob,
|
OnionBlob: blob,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = sendingSwitch.SendHTLC(sid, attemptID, htlc)
|
for {
|
||||||
require.NoError(t, err)
|
// It may be the case that the dust threshold is hit
|
||||||
|
// before all 357*2 HTLC's are sent due to double
|
||||||
|
// counting. Get around this by continuing to send
|
||||||
|
// until successful.
|
||||||
|
err = sendingSwitch.SendHTLC(sid, attemptID, htlc)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
attemptID++
|
attemptID++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user