diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 1b80adab3..6ba8966bf 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -52,10 +52,6 @@ type packetHandler interface { // // NOTE: This function should block as little as possible. handleSwitchPacket(*htlcPacket) error - - // handleLocalAddPacket handles a locally-initiated UpdateAddHTLC - // packet. It will be processed synchronously. - handleLocalAddPacket(*htlcPacket) error } // dustHandler is an interface used exclusively by the Switch to evaluate diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 6c553e31c..626cfb67f 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -296,14 +296,6 @@ type ChannelLinkConfig struct { HtlcNotifier htlcNotifier } -// localUpdateAddMsg contains a locally initiated htlc and a channel that will -// receive the outcome of the link processing. This channel must be buffered to -// prevent the link from blocking. -type localUpdateAddMsg struct { - pkt *htlcPacket - err chan error -} - // shutdownReq contains an error channel that will be used by the channelLink // to send an error if shutdown failed. If shutdown succeeded, the channel will // be closed. @@ -371,10 +363,6 @@ type channelLink struct { // by the HTLC switch. downstream chan *htlcPacket - // localUpdateAdd is a channel to which locally initiated HTLCs are - // sent across. - localUpdateAdd chan *localUpdateAddMsg - // shutdownRequest is a channel that the channelLink will listen on to // service shutdown requests from ShutdownIfChannelClean calls. shutdownRequest chan *shutdownReq @@ -428,7 +416,6 @@ func NewChannelLink(cfg ChannelLinkConfig, hodlQueue: queue.NewConcurrentQueue(10), log: build.NewPrefixLog(logPrefix, log), quit: make(chan struct{}), - localUpdateAdd: make(chan *localUpdateAddMsg), } } @@ -1057,7 +1044,21 @@ func (l *channelLink) htlcManager() { // the channel is not pending, otherwise we should have no htlcs to // reforward. if l.ShortChanID() != hop.Source { - if err := l.resolveFwdPkgs(); err != nil { + err := l.resolveFwdPkgs() + switch err { + // No error was encountered, success. + case nil: + + // If the duplicate keystone error was encountered, we'll fail + // without sending an Error message to the peer. + case ErrDuplicateKeystone: + l.fail(LinkFailureError{code: ErrCircuitError}, + "temporary circuit error: %v", err) + return + + // A non-nil error was encountered, send an Error message to + // the peer. + default: l.fail(LinkFailureError{code: ErrInternalError}, "unable to resolve fwd pkgs: %v", err) return @@ -1180,10 +1181,6 @@ func (l *channelLink) htlcManager() { case pkt := <-l.downstream: l.handleDownstreamPkt(pkt) - // A message containing a locally initiated add was received. - case msg := <-l.localUpdateAdd: - msg.err <- l.handleDownstreamUpdateAdd(msg.pkt) - // A message from the connected peer was just received. This // indicates that we have a new incoming HTLC, either directly // for us, or part of a multi-hop HTLC circuit. @@ -1195,12 +1192,27 @@ func (l *channelLink) htlcManager() { case hodlItem := <-l.hodlQueue.ChanOut(): htlcResolution := hodlItem.(invoices.HtlcResolution) err := l.processHodlQueue(htlcResolution) - if err != nil { - l.fail(LinkFailureError{code: ErrInternalError}, - fmt.Sprintf("process hodl queue: %v", - err.Error()), + switch err { + // No error, success. + case nil: + + // If the duplicate keystone error was encountered, + // fail back gracefully. + case ErrDuplicateKeystone: + l.fail(LinkFailureError{code: ErrCircuitError}, + fmt.Sprintf("process hodl queue: "+ + "temporary circuit error: %v", + err, + ), + ) + + // Send an Error message to the peer. + default: + l.fail(LinkFailureError{code: ErrInternalError}, + fmt.Sprintf("process hodl queue: "+ + "unable to update commitment:"+ + " %v", err), ) - return } case req := <-l.shutdownRequest: @@ -1259,7 +1271,7 @@ loop: // Update the commitment tx. if err := l.updateCommitTx(); err != nil { - return fmt.Errorf("unable to update commitment: %v", err) + return err } return nil @@ -2081,7 +2093,21 @@ func (l *channelLink) ackDownStreamPackets() error { // updateCommitTxOrFail updates the commitment tx and if that fails, it fails // the link. func (l *channelLink) updateCommitTxOrFail() bool { - if err := l.updateCommitTx(); err != nil { + err := l.updateCommitTx() + switch err { + // No error encountered, success. + case nil: + + // A duplicate keystone error should be resolved and is not fatal, so + // we won't send an Error message to the peer. + case ErrDuplicateKeystone: + l.fail(LinkFailureError{code: ErrCircuitError}, + "temporary circuit error: %v", err) + return false + + // Any other error is treated results in an Error message being sent to + // the peer. + default: l.fail(LinkFailureError{code: ErrInternalError}, "unable to update commitment: %v", err) return false @@ -2099,6 +2125,8 @@ func (l *channelLink) updateCommitTx() error { // sign a commitment state. err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...) if err != nil { + // If ErrDuplicateKeystone is returned, the caller will catch + // it. return err } @@ -2568,33 +2596,6 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error { return l.mailBox.AddPacket(pkt) } -// handleLocalAddPacket handles a locally-initiated UpdateAddHTLC packet. It -// will be processed synchronously. -// -// NOTE: Part of the packetHandler interface. -func (l *channelLink) handleLocalAddPacket(pkt *htlcPacket) error { - l.log.Tracef("received switch packet outkey=%v", pkt.outKey()) - - // Create a buffered result channel to prevent the link from blocking. - errChan := make(chan error, 1) - - select { - case l.localUpdateAdd <- &localUpdateAddMsg{ - pkt: pkt, - err: errChan, - }: - case <-l.quit: - return ErrLinkShuttingDown - } - - select { - case err := <-errChan: - return err - case <-l.quit: - return ErrLinkShuttingDown - } -} - // HandleChannelUpdate handles the htlc requests as settle/add/fail which sent // to us from remote peer we have a channel with. // diff --git a/htlcswitch/linkfailure.go b/htlcswitch/linkfailure.go index 2f4be531e..1f454a7bb 100644 --- a/htlcswitch/linkfailure.go +++ b/htlcswitch/linkfailure.go @@ -46,6 +46,11 @@ const ( // remote party to force close the channel out on chain now as a // result. ErrRecoveryError + + // ErrCircuitError indicates a duplicate keystone error was hit in the + // circuit map. This is non-fatal and will resolve itself (usually + // within several minutes). + ErrCircuitError ) // LinkFailureError encapsulates an error that will make us fail the current @@ -94,6 +99,8 @@ func (e LinkFailureError) Error() string { return "invalid revocation" case ErrRecoveryError: return "unable to resume channel, recovery required" + case ErrCircuitError: + return "non-fatal circuit map error" default: return "unknown error" } diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 2ae5e41d8..5acfeeb81 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -729,11 +729,6 @@ func (f *mockChannelLink) handleSwitchPacket(pkt *htlcPacket) error { return nil } -func (f *mockChannelLink) handleLocalAddPacket(pkt *htlcPacket) error { - _ = f.mailBox.AddPacket(pkt) - return nil -} - func (f *mockChannelLink) getDustSum(remote bool) lnwire.MilliSatoshi { return 0 } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index d6af62acc..b6e3e2537 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -484,6 +484,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64, incomingHTLCID: attemptID, outgoingChanID: firstHop, htlc: htlc, + amount: htlc.Amount, } // Attempt to fetch the target link before creating a circuit so that @@ -547,10 +548,11 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64, return ErrLocalAddFailed } - // Send packet to link. + // Give the packet to the link's mailbox so that HTLC's are properly + // canceled back if the mailbox timeout elapses. packet.circuit = circuit - return link.handleLocalAddPacket(packet) + return link.handleSwitchPacket(packet) } // UpdateForwardingPolicies sends a message to the switch to update the diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 750061126..39ff5c2b0 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -3600,8 +3600,6 @@ func TestSwitchDustForwarding(t *testing.T) { // We'll test that once the default threshold is exceeded on the // Alice -> Bob channel, either side's calls to SendHTLC will fail. - // This does not rely on the mailbox sum since there's no intermediate - // hop. // // Alice will send 357 HTLC's of 700sats. Bob will also send 357 HTLC's // of 700sats. If either side attempts to send a dust HTLC, it will @@ -3630,6 +3628,47 @@ func TestSwitchDustForwarding(t *testing.T) { OnionBlob: blob, } + checkAlmostDust := func(link *channelLink, mbox MailBox, + remote bool) bool { + + timeout := time.After(15 * time.Second) + pollInterval := 300 * time.Millisecond + expectedDust := 357 * 2 * amt + + for { + <-time.After(pollInterval) + + select { + case <-timeout: + return false + default: + } + + linkDust := link.getDustSum(remote) + localMailDust, remoteMailDust := mbox.DustPackets() + + totalDust := linkDust + if remote { + totalDust += remoteMailDust + } else { + totalDust += localMailDust + } + + if totalDust == expectedDust { + break + } + } + + return true + } + + // Wait until Bob is almost at the dust threshold. + bobMbox := n.bobServer.htlcSwitch.mailOrchestrator.GetOrCreateMailBox( + n.firstBobChannelLink.ChanID(), + n.firstBobChannelLink.ShortChanID(), + ) + require.True(t, checkAlmostDust(n.firstBobChannelLink, bobMbox, false)) + // Assert that the HTLC is failed due to the dust threshold. err = n.bobServer.htlcSwitch.SendHTLC( aliceBobFirstHop, uint64(357), failingHtlc, @@ -3723,6 +3762,14 @@ func TestSwitchDustForwarding(t *testing.T) { OnionBlob: blob, } + // Wait until Alice's expected dust for the remote commitment is just + // under the dust threshold. + aliceOrch := n.aliceServer.htlcSwitch.mailOrchestrator + aliceMbox := aliceOrch.GetOrCreateMailBox( + n.aliceChannelLink.ChanID(), n.aliceChannelLink.ShortChanID(), + ) + require.True(t, checkAlmostDust(n.aliceChannelLink, aliceMbox, true)) + err = n.aliceServer.htlcSwitch.SendHTLC( n.aliceChannelLink.ShortChanID(), uint64(357), aliceMultihopHtlc, @@ -3792,8 +3839,17 @@ func sendDustHtlcs(t *testing.T, n *threeHopNetwork, alice bool, OnionBlob: blob, } - err = sendingSwitch.SendHTLC(sid, attemptID, htlc) - require.NoError(t, err) + for { + // It may be the case that the dust threshold is hit + // before all 357*2 HTLC's are sent due to double + // counting. Get around this by continuing to send + // until successful. + err = sendingSwitch.SendHTLC(sid, attemptID, htlc) + if err == nil { + break + } + } + attemptID++ } }