From 5b38ed0b3e769fc3bbbd703ab159ba0171d53835 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:47:47 -0700 Subject: [PATCH 01/13] htlcswitch/link: correct link log statement --- htlcswitch/link.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index b8a50eeab..bee3e6be8 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1179,7 +1179,7 @@ func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution, // Settle htlcs that returned a settle resolution using the preimage // in the resolution. case *invoices.HtlcSettleResolution: - l.log.Debugf("received settle resolution for %v"+ + l.log.Debugf("received settle resolution for %v "+ "with outcome: %v", circuitKey, res.Outcome) return l.settleHTLC(res.Preimage, htlc.pd) From f3051efeb308958920d4de9a32679a9bf28247fb Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:48:06 -0700 Subject: [PATCH 02/13] htlcswitch/mailbox: block until mailbox shutdown Fixes a bug where Stop() wouldn't actually wait for the mailbox to exit. --- htlcswitch/mailbox.go | 47 +++++++++++++++++++++++++++++++++----- htlcswitch/mailbox_test.go | 22 ++++++++++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 04c5cd6d8..e32d75a78 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -78,8 +78,9 @@ type memoryMailBox struct { pktOutbox chan *htlcPacket pktReset chan chan struct{} - wg sync.WaitGroup - quit chan struct{} + wireShutdown chan struct{} + pktShutdown chan struct{} + quit chan struct{} } // newMemoryMailBox creates a new instance of the memoryMailBox. @@ -92,6 +93,8 @@ func newMemoryMailBox() *memoryMailBox { msgReset: make(chan chan struct{}, 1), pktReset: make(chan chan struct{}, 1), pktIndex: make(map[CircuitKey]*list.Element), + wireShutdown: make(chan struct{}), + pktShutdown: make(chan struct{}), quit: make(chan struct{}), } box.wireCond = sync.NewCond(&box.wireMtx) @@ -122,7 +125,6 @@ const ( // NOTE: This method is part of the MailBox interface. func (m *memoryMailBox) Start() { m.started.Do(func() { - m.wg.Add(2) go m.mailCourier(wireCourier) go m.mailCourier(pktCourier) }) @@ -157,6 +159,7 @@ func (m *memoryMailBox) signalUntilReset(cType courierType, done chan struct{}) error { for { + switch cType { case wireCourier: m.wireCond.Signal() @@ -209,17 +212,49 @@ func (m *memoryMailBox) Stop() { m.stopped.Do(func() { close(m.quit) - m.wireCond.Signal() - m.pktCond.Signal() + m.signalUntilShutdown(wireCourier) + m.signalUntilShutdown(pktCourier) }) } +// signalUntilShutdown strobes the condition variable of the passed courier +// type, blocking until the worker has exited. +func (m *memoryMailBox) signalUntilShutdown(cType courierType) { + var ( + cond *sync.Cond + shutdown chan struct{} + ) + + switch cType { + case wireCourier: + cond = m.wireCond + shutdown = m.wireShutdown + case pktCourier: + cond = m.pktCond + shutdown = m.pktShutdown + } + + for { + select { + case <-time.After(time.Millisecond): + cond.Signal() + case <-shutdown: + return + } + } +} + // mailCourier is a dedicated goroutine whose job is to reliably deliver // messages of a particular type. There are two types of couriers: wire // couriers, and mail couriers. Depending on the passed courierType, this // goroutine will assume one of two roles. func (m *memoryMailBox) mailCourier(cType courierType) { - defer m.wg.Done() + switch cType { + case wireCourier: + defer close(m.wireShutdown) + case pktCourier: + defer close(m.pktShutdown) + } // TODO(roasbeef): refactor... diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index e8356c97d..83dfc282e 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -148,6 +148,28 @@ func TestMailBoxCouriers(t *testing.T) { } } +// TestMailBoxResetAfterShutdown tests that ResetMessages and ResetPackets +// return ErrMailBoxShuttingDown after the mailbox has been stopped. +func TestMailBoxResetAfterShutdown(t *testing.T) { + t.Parallel() + + m := newMemoryMailBox() + m.Start() + + // Stop the mailbox, then try to reset the message and packet couriers. + m.Stop() + + err := m.ResetMessages() + if err != ErrMailBoxShuttingDown { + t.Fatalf("expected ErrMailBoxShuttingDown, got: %v", err) + } + + err = m.ResetPackets() + if err != ErrMailBoxShuttingDown { + t.Fatalf("expected ErrMailBoxShuttingDown, got: %v", err) + } +} + // TestMailOrchestrator asserts that the orchestrator properly buffers packets // for channels that haven't been made live, such that they are delivered // immediately after BindLiveShortChanID. It also tests that packets are delivered From 12bbf28e651d9854f755eaa306708f608e3088b6 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:48:23 -0700 Subject: [PATCH 03/13] htlcswitch: make handleBatchFwdErrors a pure function --- htlcswitch/link.go | 22 +--------------------- htlcswitch/switch.go | 7 ++++--- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index bee3e6be8..9298ceb15 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -2948,27 +2948,7 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) { } errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...) - go l.handleBatchFwdErrs(errChan) -} - -// handleBatchFwdErrs waits on the given errChan until it is closed, logging -// the errors returned from any unsuccessful forwarding attempts. -func (l *channelLink) handleBatchFwdErrs(errChan chan error) { - for { - err, ok := <-errChan - if !ok { - // Err chan has been drained or switch is shutting - // down. Either way, return. - return - } - - if err == nil { - continue - } - - l.log.Errorf("unhandled error while forwarding htlc packet over "+ - "htlcswitch: %v", err) - } + go handleBatchFwdErrs(errChan, l.log) } // sendHTLCError functions cancels HTLC and send cancel message back to the diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 52d05be8b..78ff10050 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -9,6 +9,7 @@ import ( "time" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btclog" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" @@ -1972,13 +1973,13 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) { // link quit channel, meaning the send will fail only if the // switch receives a shutdown request. errChan := s.ForwardPackets(nil, switchPackets...) - go handleBatchFwdErrs(errChan) + go handleBatchFwdErrs(errChan, log) } } // handleBatchFwdErrs waits on the given errChan until it is closed, logging the // errors returned from any unsuccessful forwarding attempts. -func handleBatchFwdErrs(errChan chan error) { +func handleBatchFwdErrs(errChan chan error, l btclog.Logger) { for { err, ok := <-errChan if !ok { @@ -1991,7 +1992,7 @@ func handleBatchFwdErrs(errChan chan error) { continue } - log.Errorf("unhandled error while reforwarding htlc "+ + l.Errorf("Unhandled error while reforwarding htlc "+ "settle/fail over htlcswitch: %v", err) } } From 564534c829b230ed58fbcab25d84fd9306ff0da7 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:48:40 -0700 Subject: [PATCH 04/13] htlcswitch: move packet failure to mailbox This commit moves the current logic for sending failures out of the link and into the mailbox in preparation for our failing delayed htlcs. We do so because the mailbox may need to fail packets while the link is offline, and needs to be able to complete the task without member methods on the link. --- htlcswitch/link.go | 68 +---------------- htlcswitch/mailbox.go | 149 ++++++++++++++++++++++++++++++++++--- htlcswitch/mailbox_test.go | 148 ++++++++++++++++++++++++++++++++++-- htlcswitch/switch.go | 18 +++-- 4 files changed, 293 insertions(+), 90 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 9298ceb15..afdd50cc0 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1272,72 +1272,6 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { l.log.Warnf("Unable to handle downstream add HTLC: %v", err) - var ( - localFailure = false - reason lnwire.OpaqueReason - ) - - // Create a temporary channel failure which we will send - // back to our peer if this is a forward, or report to - // the user if the failed payment was locally initiated. - failure := l.createFailureWithUpdate( - func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage { - return lnwire.NewTemporaryChannelFailure( - upd, - ) - }, - ) - - // If the payment was locally initiated (which is - // indicated by a nil obfuscator), we do not need to - // encrypt it back to the sender. - if pkt.obfuscator == nil { - var b bytes.Buffer - err := lnwire.EncodeFailure(&b, failure, 0) - if err != nil { - l.log.Errorf("unable to encode "+ - "failure: %v", err) - l.mailBox.AckPacket(pkt.inKey()) - return - } - reason = lnwire.OpaqueReason(b.Bytes()) - localFailure = true - } else { - // If the packet is part of a forward, - // (identified by a non-nil obfuscator) we need - // to encrypt the error back to the source. - var err error - reason, err = pkt.obfuscator.EncryptFirstHop(failure) - if err != nil { - l.log.Errorf("unable to "+ - "obfuscate error: %v", err) - l.mailBox.AckPacket(pkt.inKey()) - return - } - } - - // Create a link error containing the temporary channel - // failure and a detail which indicates the we failed to - // add the htlc. - linkError := NewDetailedLinkError( - failure, OutgoingFailureDownstreamHtlcAdd, - ) - - failPkt := &htlcPacket{ - incomingChanID: pkt.incomingChanID, - incomingHTLCID: pkt.incomingHTLCID, - circuit: pkt.circuit, - sourceRef: pkt.sourceRef, - hasSource: true, - localFailure: localFailure, - linkFailure: linkError, - htlc: &lnwire.UpdateFailHTLC{ - Reason: reason, - }, - } - - go l.forwardBatch(failPkt) - // Remove this packet from the link's mailbox, this // prevents it from being reprocessed if the link // restarts and resets it mailbox. If this response @@ -1346,7 +1280,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { // the switch, since the circuit was never fully opened, // and the forwarding package shows it as // unacknowledged. - l.mailBox.AckPacket(pkt.inKey()) + l.mailBox.FailAdd(pkt) return } diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index e32d75a78..54c918b85 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -1,6 +1,7 @@ package htlcswitch import ( + "bytes" "container/list" "errors" "sync" @@ -31,8 +32,17 @@ type MailBox interface { // AckPacket removes a packet from the mailboxes in-memory replay // buffer. This will prevent a packet from being delivered after a link - // restarts if the switch has remained online. - AckPacket(CircuitKey) + // restarts if the switch has remained online. The returned boolean + // indicates whether or not a packet with the passed incoming circuit + // key was removed. + AckPacket(CircuitKey) bool + + // FailAdd fails an UpdateAddHTLC that exists within the mailbox, + // removing it from the in-memory replay buffer. This will prevent the + // packet from being delivered after the link restarts if the switch has + // remained online. The generated LinkError will show an + // OutgoingFailureDownstreamHtlcAdd FailureDetail. + FailAdd(pkt *htlcPacket) // MessageOutBox returns a channel that any new messages ready for // delivery will be sent on. @@ -56,12 +66,29 @@ type MailBox interface { Stop() } +type mailBoxConfig struct { + // shortChanID is the short channel id of the channel this mailbox + // belongs to. + shortChanID lnwire.ShortChannelID + + // fetchUpdate retreives the most recent channel update for the channel + // this mailbox belongs to. + fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) + + // forwardPackets send a varidic number of htlcPackets to the switch to + // be routed. A quit channel should be provided so that the call can + // properly exit during shutdown. + forwardPackets func(chan struct{}, ...*htlcPacket) chan error +} + // memoryMailBox is an implementation of the MailBox struct backed by purely // in-memory queues. type memoryMailBox struct { started sync.Once stopped sync.Once + cfg *mailBoxConfig + wireMessages *list.List wireMtx sync.Mutex wireCond *sync.Cond @@ -84,8 +111,9 @@ type memoryMailBox struct { } // newMemoryMailBox creates a new instance of the memoryMailBox. -func newMemoryMailBox() *memoryMailBox { +func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox { box := &memoryMailBox{ + cfg: cfg, wireMessages: list.New(), htlcPkts: list.New(), messageOutbox: make(chan lnwire.Message), @@ -179,20 +207,23 @@ func (m *memoryMailBox) signalUntilReset(cType courierType, } // AckPacket removes the packet identified by it's incoming circuit key from the -// queue of packets to be delivered. +// queue of packets to be delivered. The returned boolean indicates whether or +// not a packet with the passed incoming circuit key was removed. // // NOTE: It is safe to call this method multiple times for the same circuit key. -func (m *memoryMailBox) AckPacket(inKey CircuitKey) { +func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { m.pktCond.L.Lock() entry, ok := m.pktIndex[inKey] if !ok { m.pktCond.L.Unlock() - return + return false } m.htlcPkts.Remove(entry) delete(m.pktIndex, inKey) m.pktCond.L.Unlock() + + return true } // HasPacket queries the packets for a circuit key, this is used to drop packets @@ -410,6 +441,80 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { return nil } +// FailAdd fails an UpdateAddHTLC that exists within the mailbox, removing it +// from the in-memory replay buffer. This will prevent the packet from being +// delivered after the link restarts if the switch has remained online. The +// generated LinkError will show an OutgoingFailureDownstreamHtlcAdd +// FailureDetail. +func (m *memoryMailBox) FailAdd(pkt *htlcPacket) { + // First, remove the packet from mailbox. If we didn't find the packet + // because it has already been acked, we'll exit early to avoid sending + // a duplicate fail message through the switch. + if !m.AckPacket(pkt.inKey()) { + return + } + + var ( + localFailure = false + reason lnwire.OpaqueReason + ) + + // Create a temporary channel failure which we will send back to our + // peer if this is a forward, or report to the user if the failed + // payment was locally initiated. + var failure lnwire.FailureMessage + update, err := m.cfg.fetchUpdate(m.cfg.shortChanID) + if err != nil { + failure = &lnwire.FailTemporaryNodeFailure{} + } else { + failure = lnwire.NewTemporaryChannelFailure(update) + } + + // If the payment was locally initiated (which is indicated by a nil + // obfuscator), we do not need to encrypt it back to the sender. + if pkt.obfuscator == nil { + var b bytes.Buffer + err := lnwire.EncodeFailure(&b, failure, 0) + if err != nil { + log.Errorf("Unable to encode failure: %v", err) + return + } + reason = lnwire.OpaqueReason(b.Bytes()) + localFailure = true + } else { + // If the packet is part of a forward, (identified by a non-nil + // obfuscator) we need to encrypt the error back to the source. + var err error + reason, err = pkt.obfuscator.EncryptFirstHop(failure) + if err != nil { + log.Errorf("Unable to obfuscate error: %v", err) + return + } + } + + // Create a link error containing the temporary channel failure and a + // detail which indicates the we failed to add the htlc. + linkError := NewDetailedLinkError( + failure, OutgoingFailureDownstreamHtlcAdd, + ) + + failPkt := &htlcPacket{ + incomingChanID: pkt.incomingChanID, + incomingHTLCID: pkt.incomingHTLCID, + circuit: pkt.circuit, + sourceRef: pkt.sourceRef, + hasSource: true, + localFailure: localFailure, + linkFailure: linkError, + htlc: &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + } + + errChan := m.cfg.forwardPackets(m.quit, failPkt) + go handleBatchFwdErrs(errChan, log) +} + // MessageOutBox returns a channel that any new messages ready for delivery // will be sent on. // @@ -434,6 +539,8 @@ func (m *memoryMailBox) PacketOutBox() chan *htlcPacket { type mailOrchestrator struct { mu sync.RWMutex + cfg *mailOrchConfig + // mailboxes caches exactly one mailbox for all known channels. mailboxes map[lnwire.ChannelID]MailBox @@ -454,9 +561,21 @@ type mailOrchestrator struct { unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket } +type mailOrchConfig struct { + // forwardPackets send a varidic number of htlcPackets to the switch to + // be routed. A quit channel should be provided so that the call can + // properly exit during shutdown. + forwardPackets func(chan struct{}, ...*htlcPacket) chan error + + // fetchUpdate retreives the most recent channel update for the channel + // this mailbox belongs to. + fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) +} + // newMailOrchestrator initializes a fresh mailOrchestrator. -func newMailOrchestrator() *mailOrchestrator { +func newMailOrchestrator(cfg *mailOrchConfig) *mailOrchestrator { return &mailOrchestrator{ + cfg: cfg, mailboxes: make(map[lnwire.ChannelID]MailBox), liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID), unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket), @@ -472,7 +591,9 @@ func (mo *mailOrchestrator) Stop() { // GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or // creates and returns a new mailbox if none is found. -func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox { +func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID, + shortChanID lnwire.ShortChannelID) MailBox { + // First, try lookup the mailbox directly using only the shared mutex. mo.mu.RLock() mailbox, ok := mo.mailboxes[chanID] @@ -485,7 +606,7 @@ func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox // Otherwise, we will try again with exclusive lock, creating a mailbox // if one still has not been created. mo.mu.Lock() - mailbox = mo.exclusiveGetOrCreateMailBox(chanID) + mailbox = mo.exclusiveGetOrCreateMailBox(chanID, shortChanID) mo.mu.Unlock() return mailbox @@ -497,11 +618,15 @@ func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox // // NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock. func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox( - chanID lnwire.ChannelID) MailBox { + chanID lnwire.ChannelID, shortChanID lnwire.ShortChannelID) MailBox { mailbox, ok := mo.mailboxes[chanID] if !ok { - mailbox = newMemoryMailBox() + mailbox = newMemoryMailBox(&mailBoxConfig{ + shortChanID: shortChanID, + fetchUpdate: mo.cfg.fetchUpdate, + forwardPackets: mo.cfg.forwardPackets, + }) mailbox.Start() mo.mailboxes[chanID] = mailbox } @@ -581,7 +706,7 @@ func (mo *mailOrchestrator) Deliver( // index should only be set if the mailbox had been initialized // beforehand. However, this does ensure that this case is // handled properly in the event that it could happen. - mailbox = mo.exclusiveGetOrCreateMailBox(chanID) + mailbox = mo.exclusiveGetOrCreateMailBox(chanID, sid) mo.mu.Unlock() // Deliver the packet to the mailbox if it was found or created. diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index 83dfc282e..ccf35e879 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -19,7 +19,7 @@ func TestMailBoxCouriers(t *testing.T) { // First, we'll create new instance of the current default mailbox // type. - mailBox := newMemoryMailBox() + mailBox := newMemoryMailBox(&mailBoxConfig{}) mailBox.Start() defer mailBox.Stop() @@ -153,7 +153,7 @@ func TestMailBoxCouriers(t *testing.T) { func TestMailBoxResetAfterShutdown(t *testing.T) { t.Parallel() - m := newMemoryMailBox() + m := newMemoryMailBox(&mailBoxConfig{}) m.Start() // Stop the mailbox, then try to reset the message and packet couriers. @@ -170,6 +170,144 @@ func TestMailBoxResetAfterShutdown(t *testing.T) { } } +type mailboxContext struct { + t *testing.T + mailbox MailBox + forwards chan *htlcPacket +} + +func newMailboxContext(t *testing.T) *mailboxContext { + + ctx := &mailboxContext{ + t: t, + forwards: make(chan *htlcPacket, 1), + } + ctx.mailbox = newMemoryMailBox(&mailBoxConfig{ + fetchUpdate: func(sid lnwire.ShortChannelID) ( + *lnwire.ChannelUpdate, error) { + return &lnwire.ChannelUpdate{ + ShortChannelID: sid, + }, nil + }, + forwardPackets: ctx.forward, + }) + ctx.mailbox.Start() + + return ctx +} + +func (c *mailboxContext) forward(_ chan struct{}, + pkts ...*htlcPacket) chan error { + + for _, pkt := range pkts { + c.forwards <- pkt + } + + errChan := make(chan error) + close(errChan) + + return errChan +} + +func (c *mailboxContext) sendAdds(start, num int) []*htlcPacket { + c.t.Helper() + + sentPackets := make([]*htlcPacket, num) + for i := 0; i < num; i++ { + pkt := &htlcPacket{ + outgoingChanID: lnwire.NewShortChanIDFromInt( + uint64(prand.Int63())), + incomingChanID: lnwire.NewShortChanIDFromInt( + uint64(prand.Int63())), + incomingHTLCID: uint64(start + i), + amount: lnwire.MilliSatoshi(prand.Int63()), + htlc: &lnwire.UpdateAddHTLC{ + ID: uint64(start + i), + }, + } + sentPackets[i] = pkt + + err := c.mailbox.AddPacket(pkt) + if err != nil { + c.t.Fatalf("unable to add packet: %v", err) + } + } + + return sentPackets +} + +func (c *mailboxContext) receivePkts(pkts []*htlcPacket) { + c.t.Helper() + + for i, expPkt := range pkts { + select { + case pkt := <-c.mailbox.PacketOutBox(): + if reflect.DeepEqual(expPkt, pkt) { + continue + } + + c.t.Fatalf("inkey mismatch #%d, want: %v vs "+ + "got: %v", i, expPkt.inKey(), pkt.inKey()) + + case <-time.After(50 * time.Millisecond): + c.t.Fatalf("did not receive fail for index %d", i) + } + } +} + +func (c *mailboxContext) checkFails(adds []*htlcPacket) { + c.t.Helper() + + for i, add := range adds { + select { + case fail := <-c.forwards: + if add.inKey() == fail.inKey() { + continue + } + c.t.Fatalf("inkey mismatch #%d, add: %v vs fail: %v", + i, add.inKey(), fail.inKey()) + + case <-time.After(50 * time.Millisecond): + c.t.Fatalf("did not receive fail for index %d", i) + } + } + + select { + case pkt := <-c.forwards: + c.t.Fatalf("unexpected forward: %v", pkt) + case <-time.After(50 * time.Millisecond): + } +} + +// TestMailBoxFailAdd asserts that FailAdd returns a response to the switch +// under various interleavings with other operations on the mailbox. +func TestMailBoxFailAdd(t *testing.T) { + ctx := newMailboxContext(t) + defer ctx.mailbox.Stop() + + failAdds := func(adds []*htlcPacket) { + for _, add := range adds { + ctx.mailbox.FailAdd(add) + } + } + + const numBatchPackets = 5 + + // Send 10 adds, and pull them from the mailbox. + adds := ctx.sendAdds(0, numBatchPackets) + ctx.receivePkts(adds) + + // Fail all of these adds, simulating an error adding the HTLCs to the + // commitment. We should see a failure message for each. + go failAdds(adds) + ctx.checkFails(adds) + + // As a sanity check, Fail all of them again and assert that no + // duplicate fails are sent. + go failAdds(adds) + ctx.checkFails(nil) +} + // TestMailOrchestrator asserts that the orchestrator properly buffers packets // for channels that haven't been made live, such that they are delivered // immediately after BindLiveShortChanID. It also tests that packets are delivered @@ -178,7 +316,7 @@ func TestMailOrchestrator(t *testing.T) { t.Parallel() // First, we'll create a new instance of our orchestrator. - mo := newMailOrchestrator() + mo := newMailOrchestrator(&mailOrchConfig{}) defer mo.Stop() // We'll be delivering 10 htlc packets via the orchestrator. @@ -203,7 +341,7 @@ func TestMailOrchestrator(t *testing.T) { } // Now, initialize a new mailbox for Alice's chanid. - mailbox := mo.GetOrCreateMailBox(chanID1) + mailbox := mo.GetOrCreateMailBox(chanID1, aliceChanID) // Verify that no messages are received, since Alice's mailbox has not // been made live. @@ -248,7 +386,7 @@ func TestMailOrchestrator(t *testing.T) { // For the second half of the test, create a new mailbox for Bob and // immediately make it live with an assigned short chan id. - mailbox = mo.GetOrCreateMailBox(chanID2) + mailbox = mo.GetOrCreateMailBox(chanID2, bobChanID) mo.BindLiveShortChanID(mailbox, chanID2, bobChanID) // Create the second half of our htlcs, and deliver them via the diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 78ff10050..ba57ed11f 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -283,12 +283,11 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { return nil, err } - return &Switch{ + s := &Switch{ bestHeight: currentHeight, cfg: &cfg, circuits: circuitMap, linkIndex: make(map[lnwire.ChannelID]ChannelLink), - mailOrchestrator: newMailOrchestrator(), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), @@ -297,7 +296,14 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { chanCloseRequests: make(chan *ChanClose), resolutionMsgs: make(chan *resolutionMsg), quit: make(chan struct{}), - }, nil + } + + s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{ + fetchUpdate: s.cfg.FetchLastChannelUpdate, + forwardPackets: s.ForwardPackets, + }) + + return s, nil } // resolutionMsg is a struct that wraps an existing ResolutionMsg with a done @@ -2037,7 +2043,8 @@ func (s *Switch) AddLink(link ChannelLink) error { // Get and attach the mailbox for this link, which buffers packets in // case there packets that we tried to deliver while this link was // offline. - mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID) + shortChanID := link.ShortChanID() + mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID) link.AttachMailBox(mailbox) if err := link.Start(); err != nil { @@ -2045,7 +2052,6 @@ func (s *Switch) AddLink(link ChannelLink) error { return err } - shortChanID := link.ShortChanID() if shortChanID == hop.Source { log.Infof("Adding pending link chan_id=%v, short_chan_id=%v", chanID, shortChanID) @@ -2217,7 +2223,7 @@ func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error { // Finally, alert the mail orchestrator to the change of short channel // ID, and deliver any unclaimed packets to the link. - mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID) + mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID) s.mailOrchestrator.BindLiveShortChanID( mailbox, chanID, shortChanID, ) From 63f3d0b0120c7684eeb87ea998561bc1fe2ba4eb Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:48:58 -0700 Subject: [PATCH 05/13] htlcswitch/mailbox: advance packet head after delivery This commit delays the advancement of the pktHead until after the message has been delivered. This is a prepatory step, as in the future we may fail to deliver the packet due to a deadline expiring. --- htlcswitch/mailbox.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 54c918b85..d0b509d56 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -219,6 +219,13 @@ func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { return false } + // Check whether we are removing the head of the queue. If so, we must + // advance the head to the next packet before removing. It's possible + // that the courier has already adanced the pktHead, so this check + // prevents the pktHead from getting desynchronized. + if entry == m.pktHead { + m.pktHead = entry.Next() + } m.htlcPkts.Remove(entry) delete(m.pktIndex, inKey) m.pktCond.L.Unlock() @@ -333,8 +340,9 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } var ( - nextPkt *htlcPacket - nextMsg lnwire.Message + nextPkt *htlcPacket + nextPktEl *list.Element + nextMsg lnwire.Message ) switch cType { // Grab the datum off the front of the queue, shifting the @@ -350,7 +358,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // re-delivered once the link comes back online. case pktCourier: nextPkt = m.pktHead.Value.(*htlcPacket) - m.pktHead = m.pktHead.Next() + nextPktEl = m.pktHead } // Now that we're done with the condition, we can unlock it to @@ -382,6 +390,14 @@ func (m *memoryMailBox) mailCourier(cType courierType) { case pktCourier: select { case m.pktOutbox <- nextPkt: + m.pktCond.L.Lock() + // Only advance the pktHead if this packet + // is still at the head of the queue. + if m.pktHead != nil && m.pktHead == nextPktEl { + m.pktHead = m.pktHead.Next() + } + m.pktCond.L.Unlock() + case pktDone := <-m.pktReset: m.pktCond.L.Lock() m.pktHead = m.htlcPkts.Front() From 37dca27a3d51b10844a6831124e659bf06264a08 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:49:26 -0700 Subject: [PATCH 06/13] htlcswitch: thread clock from switch to mailbox --- htlcswitch/mailbox.go | 19 +++++++++++++++++++ htlcswitch/mailbox_test.go | 20 ++++++++++++++++---- htlcswitch/mock.go | 2 ++ htlcswitch/switch.go | 16 ++++++++++++++++ server.go | 2 ++ 5 files changed, 55 insertions(+), 4 deletions(-) diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index d0b509d56..1c1ad5c40 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lnwire" ) @@ -79,6 +80,14 @@ type mailBoxConfig struct { // be routed. A quit channel should be provided so that the call can // properly exit during shutdown. forwardPackets func(chan struct{}, ...*htlcPacket) chan error + + // clock is a time source for the mailbox. + clock clock.Clock + + // expiry is the interval after which Adds will be cancelled if they + // have not been yet been delivered. The computed deadline will expiry + // this long after the Adds are added via AddPacket. + expiry time.Duration } // memoryMailBox is an implementation of the MailBox struct backed by purely @@ -586,6 +595,14 @@ type mailOrchConfig struct { // fetchUpdate retreives the most recent channel update for the channel // this mailbox belongs to. fetchUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) + + // clock is a time source for the generated mailboxes. + clock clock.Clock + + // expiry is the interval after which Adds will be cancelled if they + // have not been yet been delivered. The computed deadline will expiry + // this long after the Adds are added to a mailbox via AddPacket. + expiry time.Duration } // newMailOrchestrator initializes a fresh mailOrchestrator. @@ -642,6 +659,8 @@ func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox( shortChanID: shortChanID, fetchUpdate: mo.cfg.fetchUpdate, forwardPackets: mo.cfg.forwardPackets, + clock: mo.cfg.clock, + expiry: mo.cfg.expiry, }) mailbox.Start() mo.mailboxes[chanID] = mailbox diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index ccf35e879..040f2d343 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lnwire" ) @@ -19,7 +20,10 @@ func TestMailBoxCouriers(t *testing.T) { // First, we'll create new instance of the current default mailbox // type. - mailBox := newMemoryMailBox(&mailBoxConfig{}) + mailBox := newMemoryMailBox(&mailBoxConfig{ + clock: clock.NewDefaultClock(), + expiry: time.Minute, + }) mailBox.Start() defer mailBox.Stop() @@ -172,14 +176,17 @@ func TestMailBoxResetAfterShutdown(t *testing.T) { type mailboxContext struct { t *testing.T + clock *clock.TestClock mailbox MailBox forwards chan *htlcPacket } -func newMailboxContext(t *testing.T) *mailboxContext { +func newMailboxContext(t *testing.T, startTime time.Time, + expiry time.Duration) *mailboxContext { ctx := &mailboxContext{ t: t, + clock: clock.NewTestClock(startTime), forwards: make(chan *htlcPacket, 1), } ctx.mailbox = newMemoryMailBox(&mailBoxConfig{ @@ -190,6 +197,8 @@ func newMailboxContext(t *testing.T) *mailboxContext { }, nil }, forwardPackets: ctx.forward, + clock: ctx.clock, + expiry: expiry, }) ctx.mailbox.Start() @@ -282,7 +291,7 @@ func (c *mailboxContext) checkFails(adds []*htlcPacket) { // TestMailBoxFailAdd asserts that FailAdd returns a response to the switch // under various interleavings with other operations on the mailbox. func TestMailBoxFailAdd(t *testing.T) { - ctx := newMailboxContext(t) + ctx := newMailboxContext(t, time.Now(), time.Minute) defer ctx.mailbox.Stop() failAdds := func(adds []*htlcPacket) { @@ -316,7 +325,10 @@ func TestMailOrchestrator(t *testing.T) { t.Parallel() // First, we'll create a new instance of our orchestrator. - mo := newMailOrchestrator(&mailOrchConfig{}) + mo := newMailOrchestrator(&mailOrchConfig{ + clock: clock.NewDefaultClock(), + expiry: time.Minute, + }) defer mo.Stop() // We'll be delivering 10 htlc packets via the orchestrator. diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index c65e5fb01..e9a2a1efa 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -177,6 +177,8 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) LogEventTicker: ticker.NewForce(DefaultLogInterval), AckEventTicker: ticker.NewForce(DefaultAckInterval), HtlcNotifier: &mockHTLCNotifier{}, + Clock: clock.NewDefaultClock(), + HTLCExpiry: time.Hour, } return New(cfg, startingHeight) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index ba57ed11f..06c597bf6 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -15,6 +15,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/kvdb" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/lntypes" @@ -36,6 +37,10 @@ const ( // DefaultAckInterval is the duration between attempts to ack any settle // fails in a forwarding package. DefaultAckInterval = 15 * time.Second + + // DefaultHTLCExpiry is the duration after which Adds will be cancelled + // if they could not get added to an outgoing commitment. + DefaultHTLCExpiry = time.Minute ) var ( @@ -174,6 +179,15 @@ type Config struct { // RejectHTLC is a flag that instructs the htlcswitch to reject any // HTLCs that are not from the source hop. RejectHTLC bool + + // Clock is a time source for the switch. + Clock clock.Clock + + // HTLCExpiry is the interval after which Adds will be cancelled if they + // have not been yet been delivered to a link. The computed deadline + // will expiry this long after the Adds are added to a mailbox via + // AddPacket. + HTLCExpiry time.Duration } // Switch is the central messaging bus for all incoming/outgoing HTLCs. @@ -301,6 +315,8 @@ func New(cfg Config, currentHeight uint32) (*Switch, error) { s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{ fetchUpdate: s.cfg.FetchLastChannelUpdate, forwardPackets: s.ForwardPackets, + clock: s.cfg.Clock, + expiry: s.cfg.HTLCExpiry, }) return s, nil diff --git a/server.go b/server.go index f8c36b306..3fdffb2c3 100644 --- a/server.go +++ b/server.go @@ -496,6 +496,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval), AllowCircularRoute: cfg.AllowCircularRoute, RejectHTLC: cfg.RejectHTLC, + Clock: clock.NewDefaultClock(), + HTLCExpiry: htlcswitch.DefaultHTLCExpiry, }, uint32(currentHeight)) if err != nil { return nil, err From 1aa2dde4a43651645d0ada6b791a0e7c10a4e4f8 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:49:48 -0700 Subject: [PATCH 07/13] htlcswithc/mailbox: prioritize settles/fails over adds This commit splits the packet courier internally into two distinct queues, one for adds and one for settles+fails. This allows us to prioritize HTLCs that will clear the commitment transaction and make space for adds. Previously this responsibility was handled by the overflow queue. --- htlcswitch/mailbox.go | 153 +++++++++++++++++++++++++++++-------- htlcswitch/mailbox_test.go | 109 ++++++++++++++++++++++++++ 2 files changed, 229 insertions(+), 33 deletions(-) diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 1c1ad5c40..4eff70260 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -4,6 +4,7 @@ import ( "bytes" "container/list" "errors" + "fmt" "sync" "time" @@ -108,8 +109,13 @@ type memoryMailBox struct { htlcPkts *list.List pktIndex map[CircuitKey]*list.Element pktHead *list.Element - pktMtx sync.Mutex - pktCond *sync.Cond + + addPkts *list.List + addIndex map[CircuitKey]*list.Element + addHead *list.Element + + pktMtx sync.Mutex + pktCond *sync.Cond pktOutbox chan *htlcPacket pktReset chan chan struct{} @@ -125,11 +131,13 @@ func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox { cfg: cfg, wireMessages: list.New(), htlcPkts: list.New(), + addPkts: list.New(), messageOutbox: make(chan lnwire.Message), pktOutbox: make(chan *htlcPacket), msgReset: make(chan chan struct{}, 1), pktReset: make(chan chan struct{}, 1), pktIndex: make(map[CircuitKey]*list.Element), + addIndex: make(map[CircuitKey]*list.Element), wireShutdown: make(chan struct{}), pktShutdown: make(chan struct{}), quit: make(chan struct{}), @@ -222,24 +230,39 @@ func (m *memoryMailBox) signalUntilReset(cType courierType, // NOTE: It is safe to call this method multiple times for the same circuit key. func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { m.pktCond.L.Lock() - entry, ok := m.pktIndex[inKey] - if !ok { - m.pktCond.L.Unlock() - return false + defer m.pktCond.L.Unlock() + + if entry, ok := m.pktIndex[inKey]; ok { + // Check whether we are removing the head of the queue. If so, + // we must advance the head to the next packet before removing. + // It's possible that the courier has already advanced the + // pktHead, so this check prevents the pktHead from getting + // desynchronized. + if entry == m.pktHead { + m.pktHead = entry.Next() + } + m.htlcPkts.Remove(entry) + delete(m.pktIndex, inKey) + + return true } - // Check whether we are removing the head of the queue. If so, we must - // advance the head to the next packet before removing. It's possible - // that the courier has already adanced the pktHead, so this check - // prevents the pktHead from getting desynchronized. - if entry == m.pktHead { - m.pktHead = entry.Next() - } - m.htlcPkts.Remove(entry) - delete(m.pktIndex, inKey) - m.pktCond.L.Unlock() + if entry, ok := m.addIndex[inKey]; ok { + // Check whether we are removing the head of the queue. If so, + // we must advance the head to the next add before removing. + // It's possible that the courier has already advanced the + // addHead, so this check prevents the addHead from getting + // desynchronized. + if entry == m.addHead { + m.addHead = entry.Next() + } + m.addPkts.Remove(entry) + delete(m.addIndex, inKey) - return true + return true + } + + return false } // HasPacket queries the packets for a circuit key, this is used to drop packets @@ -328,7 +351,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { case pktCourier: m.pktCond.L.Lock() - for m.pktHead == nil { + for m.pktHead == nil && m.addHead == nil { m.pktCond.Wait() select { @@ -338,6 +361,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // reconnect. case pktDone := <-m.pktReset: m.pktHead = m.htlcPkts.Front() + m.addHead = m.addPkts.Front() close(pktDone) case <-m.quit: @@ -351,6 +375,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) { var ( nextPkt *htlcPacket nextPktEl *list.Element + nextAdd *htlcPacket + nextAddEl *list.Element nextMsg lnwire.Message ) switch cType { @@ -366,8 +392,15 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // doesn't make it into a commitment, then it'll be // re-delivered once the link comes back online. case pktCourier: - nextPkt = m.pktHead.Value.(*htlcPacket) - nextPktEl = m.pktHead + // Peek at the next item to deliver, prioritizing + // Settle/Fail packets over Adds. + if m.pktHead != nil { + nextPkt = m.pktHead.Value.(*htlcPacket) + nextPktEl = m.pktHead + } else { + nextAdd = m.addHead.Value.(*htlcPacket) + nextAddEl = m.addHead + } } // Now that we're done with the condition, we can unlock it to @@ -397,22 +430,56 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } case pktCourier: + var ( + pktOutbox chan *htlcPacket + addOutbox chan *htlcPacket + ) + + // Prioritize delivery of Settle/Fail packets over Adds. + // This ensures that we actively clear the commitment of + // existing HTLCs before trying to add new ones. This + // can help to improve forwarding performance since the + // time to sign a commitment is linear in the number of + // HTLCs manifested on the commitments. + // + // NOTE: Both types are eventually delivered over the + // same channel, but we can control which is delivered + // by exclusively making one nil and the other non-nil. + // We know from our loop condition that at least one + // nextPkt and nextAdd are non-nil. + if nextPkt != nil { + pktOutbox = m.pktOutbox + } else { + addOutbox = m.pktOutbox + } + select { - case m.pktOutbox <- nextPkt: + case pktOutbox <- nextPkt: m.pktCond.L.Lock() - // Only advance the pktHead if this packet - // is still at the head of the queue. + // Only advance the pktHead if this Settle or + // Fail is still at the head of the queue. if m.pktHead != nil && m.pktHead == nextPktEl { m.pktHead = m.pktHead.Next() } m.pktCond.L.Unlock() + case addOutbox <- nextAdd: + m.pktCond.L.Lock() + // Only advance the addHead if this Add is still + // at the head of the queue. + if m.addHead != nil && m.addHead == nextAddEl { + m.addHead = m.addHead.Next() + } + m.pktCond.L.Unlock() + case pktDone := <-m.pktReset: m.pktCond.L.Lock() m.pktHead = m.htlcPkts.Front() + m.addHead = m.addPkts.Front() m.pktCond.L.Unlock() close(pktDone) + case <-m.quit: return } @@ -444,18 +511,38 @@ func (m *memoryMailBox) AddMessage(msg lnwire.Message) error { // NOTE: This method is safe for concrete use and part of the MailBox // interface. func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { - // First, we'll lock the condition, and add the packet to the end of - // the htlc packet inbox. m.pktCond.L.Lock() - if _, ok := m.pktIndex[pkt.inKey()]; ok { - m.pktCond.L.Unlock() - return nil - } + switch htlc := pkt.htlc.(type) { - entry := m.htlcPkts.PushBack(pkt) - m.pktIndex[pkt.inKey()] = entry - if m.pktHead == nil { - m.pktHead = entry + // Split off Settle/Fail packets into the htlcPkts queue. + case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC: + if _, ok := m.pktIndex[pkt.inKey()]; ok { + m.pktCond.L.Unlock() + return nil + } + + entry := m.htlcPkts.PushBack(pkt) + m.pktIndex[pkt.inKey()] = entry + if m.pktHead == nil { + m.pktHead = entry + } + + // Split off Add packets into the addPkts queue. + case *lnwire.UpdateAddHTLC: + if _, ok := m.addIndex[pkt.inKey()]; ok { + m.pktCond.L.Unlock() + return nil + } + + entry := m.addPkts.PushBack(pkt) + m.addIndex[pkt.inKey()] = entry + if m.addHead == nil { + m.addHead = entry + } + + default: + m.pktCond.L.Unlock() + return fmt.Errorf("unknown htlc type: %T", htlc) } m.pktCond.L.Unlock() diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index 040f2d343..6a7cf026d 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -38,6 +38,9 @@ func TestMailBoxCouriers(t *testing.T) { outgoingChanID: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())), incomingChanID: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())), amount: lnwire.MilliSatoshi(prand.Int63()), + htlc: &lnwire.UpdateAddHTLC{ + ID: uint64(i), + }, } sentPackets[i] = pkt @@ -315,6 +318,106 @@ func TestMailBoxFailAdd(t *testing.T) { // duplicate fails are sent. go failAdds(adds) ctx.checkFails(nil) + +} + +// TestMailBoxPacketPrioritization asserts that the mailbox will prioritize +// delivering Settle and Fail packets over Adds if both are available for +// delivery at the same time. +func TestMailBoxPacketPrioritization(t *testing.T) { + t.Parallel() + + // First, we'll create new instance of the current default mailbox + // type. + mailBox := newMemoryMailBox(&mailBoxConfig{ + clock: clock.NewDefaultClock(), + expiry: time.Minute, + }) + mailBox.Start() + defer mailBox.Stop() + + const numPackets = 5 + + _, _, aliceChanID, bobChanID := genIDs() + + // Next we'll send the following sequence of packets: + // - Settle1 + // - Add1 + // - Add2 + // - Fail + // - Settle2 + sentPackets := make([]*htlcPacket, numPackets) + for i := 0; i < numPackets; i++ { + pkt := &htlcPacket{ + outgoingChanID: aliceChanID, + outgoingHTLCID: uint64(i), + incomingChanID: bobChanID, + incomingHTLCID: uint64(i), + amount: lnwire.MilliSatoshi(prand.Int63()), + } + + switch i { + case 0, 4: + // First and last packets are a Settle. A non-Add is + // sent first to make the test deterministic w/o needing + // to sleep. + pkt.htlc = &lnwire.UpdateFulfillHTLC{ID: uint64(i)} + case 1, 2: + // Next two packets are Adds. + pkt.htlc = &lnwire.UpdateAddHTLC{ID: uint64(i)} + case 3: + // Last packet is a Fail. + pkt.htlc = &lnwire.UpdateFailHTLC{ID: uint64(i)} + } + + sentPackets[i] = pkt + + err := mailBox.AddPacket(pkt) + if err != nil { + t.Fatalf("failed to add packet: %v", err) + } + } + + // When dequeueing the packets, we expect the following sequence: + // - Settle1 + // - Fail + // - Settle2 + // - Add1 + // - Add2 + // + // We expect to see Fail and Settle2 to be delivered before either Add1 + // or Add2 due to the prioritization between the split queue. + for i := 0; i < numPackets; i++ { + select { + case pkt := <-mailBox.PacketOutBox(): + var expPkt *htlcPacket + switch i { + case 0: + // First packet should be Settle1. + expPkt = sentPackets[0] + case 1: + // Second packet should be Fail. + expPkt = sentPackets[3] + case 2: + // Third packet should be Settle2. + expPkt = sentPackets[4] + case 3: + // Fourth packet should be Add1. + expPkt = sentPackets[1] + case 4: + // Last packet should be Add2. + expPkt = sentPackets[2] + } + + if !reflect.DeepEqual(expPkt, pkt) { + t.Fatalf("recvd packet mismatch %d, want: %v, got: %v", + i, spew.Sdump(expPkt), spew.Sdump(pkt)) + } + + case <-time.After(50 * time.Millisecond): + t.Fatalf("didn't receive packet %d before timeout", i) + } + } } // TestMailOrchestrator asserts that the orchestrator properly buffers packets @@ -346,6 +449,9 @@ func TestMailOrchestrator(t *testing.T) { incomingChanID: bobChanID, incomingHTLCID: uint64(i), amount: lnwire.MilliSatoshi(prand.Int63()), + htlc: &lnwire.UpdateAddHTLC{ + ID: uint64(i), + }, } sentPackets[i] = pkt @@ -411,6 +517,9 @@ func TestMailOrchestrator(t *testing.T) { incomingChanID: bobChanID, incomingHTLCID: uint64(halfPackets + i), amount: lnwire.MilliSatoshi(prand.Int63()), + htlc: &lnwire.UpdateAddHTLC{ + ID: uint64(halfPackets + i), + }, } sentPackets[i] = pkt From e7ece11c29ff2e1cb9fef54e7850f80c2f45c828 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:50:07 -0700 Subject: [PATCH 08/13] htlcswitch/mailbox: fail htlcs when delayed for 1 minute Now that packet failure is handled by the mailbox, we can now enforce a delivery deadline and fail the packet if it the deadilne is exceeded. This gives senders quicker feedback about tried routes, and allows them to try alternative paths to the destination in the meantime. --- htlcswitch/link.go | 7 +++ htlcswitch/mailbox.go | 63 ++++++++++++++++++++++++--- htlcswitch/mailbox_test.go | 89 +++++++++++++++++++++++++++++++++++--- 3 files changed, 145 insertions(+), 14 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index afdd50cc0..27102d550 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -509,6 +509,13 @@ func (l *channelLink) Stop() { close(l.quit) l.wg.Wait() + // Now that the htlcManager has completely exited, reset the packet + // courier. This allows the mailbox to revaluate any lingering Adds that + // were delivered but didn't make it on a commitment to be failed back + // if the link is offline for an extended period of time. The error is + // ignored since it can only fail when the daemon is exiting. + _ = l.mailBox.ResetPackets() + // As a final precaution, we will attempt to flush any uncommitted // preimages to the preimage cache. The preimages should be re-delivered // after channel reestablishment, however this adds an extra layer of diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 4eff70260..dd5b4a641 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -253,9 +253,16 @@ func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { // It's possible that the courier has already advanced the // addHead, so this check prevents the addHead from getting // desynchronized. + // + // NOTE: While this event is rare for Settles or Fails, it could + // be very common for Adds since the mailbox has the ability to + // cancel Adds before they are delivered. When that occurs, the + // head of addPkts has only been peeked and we expect to be + // removing the head of the queue. if entry == m.addHead { m.addHead = entry.Next() } + m.addPkts.Remove(entry) delete(m.addIndex, inKey) @@ -314,6 +321,18 @@ func (m *memoryMailBox) signalUntilShutdown(cType courierType) { } } +// pktWithExpiry wraps an incoming packet and records the time at which it it +// should be canceled from the mailbox. This will be used to detect if it gets +// stuck in the mailbox and inform when to cancel back. +type pktWithExpiry struct { + pkt *htlcPacket + expiry time.Time +} + +func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time { + return clock.TickAfter(p.expiry.Sub(clock.Now())) +} + // mailCourier is a dedicated goroutine whose job is to reliably deliver // messages of a particular type. There are two types of couriers: wire // couriers, and mail couriers. Depending on the passed courierType, this @@ -364,6 +383,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { m.addHead = m.addPkts.Front() close(pktDone) + case <-m.quit: m.pktCond.L.Unlock() return @@ -375,7 +395,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { var ( nextPkt *htlcPacket nextPktEl *list.Element - nextAdd *htlcPacket + nextAdd *pktWithExpiry nextAddEl *list.Element nextMsg lnwire.Message ) @@ -392,13 +412,18 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // doesn't make it into a commitment, then it'll be // re-delivered once the link comes back online. case pktCourier: - // Peek at the next item to deliver, prioritizing - // Settle/Fail packets over Adds. + // Peek at the head of the Settle/Fails and Add queues. + // We peak both even if there is a Settle/Fail present + // because we need to set a deadline for the next + // pending Add if it's present. Due to clock + // monotonicity, we know that the head of the Adds is + // the next to expire. if m.pktHead != nil { nextPkt = m.pktHead.Value.(*htlcPacket) nextPktEl = m.pktHead - } else { - nextAdd = m.addHead.Value.(*htlcPacket) + } + if m.addHead != nil { + nextAdd = m.addHead.Value.(*pktWithExpiry) nextAddEl = m.addHead } } @@ -433,6 +458,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) { var ( pktOutbox chan *htlcPacket addOutbox chan *htlcPacket + add *htlcPacket + deadline <-chan time.Time ) // Prioritize delivery of Settle/Fail packets over Adds. @@ -453,6 +480,22 @@ func (m *memoryMailBox) mailCourier(cType courierType) { addOutbox = m.pktOutbox } + // If we have a pending Add, we'll also construct the + // deadline so we can fail it back if we are unable to + // deliver any message in time. We also dereference the + // nextAdd's packet, since we will need access to it in + // the case we are delivering it and/or if the deadline + // expires. + // + // NOTE: It's possible after this point for add to be + // nil, but this can only occur when addOutbox is also + // nil, hence we won't accidentally deliver a nil + // packet. + if nextAdd != nil { + add = nextAdd.pkt + deadline = nextAdd.deadline(m.cfg.clock) + } + select { case pktOutbox <- nextPkt: m.pktCond.L.Lock() @@ -463,7 +506,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } m.pktCond.L.Unlock() - case addOutbox <- nextAdd: + case addOutbox <- add: m.pktCond.L.Lock() // Only advance the addHead if this Add is still // at the head of the queue. @@ -472,6 +515,9 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } m.pktCond.L.Unlock() + case <-deadline: + m.FailAdd(add) + case pktDone := <-m.pktReset: m.pktCond.L.Lock() m.pktHead = m.htlcPkts.Front() @@ -534,7 +580,10 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { return nil } - entry := m.addPkts.PushBack(pkt) + entry := m.addPkts.PushBack(&pktWithExpiry{ + pkt: pkt, + expiry: m.cfg.clock.Now().Add(m.cfg.expiry), + }) m.addIndex[pkt.inKey()] = entry if m.addHead == nil { m.addHead = entry diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index 6a7cf026d..999468dce 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -179,8 +179,8 @@ func TestMailBoxResetAfterShutdown(t *testing.T) { type mailboxContext struct { t *testing.T - clock *clock.TestClock mailbox MailBox + clock *clock.TestClock forwards chan *htlcPacket } @@ -294,7 +294,15 @@ func (c *mailboxContext) checkFails(adds []*htlcPacket) { // TestMailBoxFailAdd asserts that FailAdd returns a response to the switch // under various interleavings with other operations on the mailbox. func TestMailBoxFailAdd(t *testing.T) { - ctx := newMailboxContext(t, time.Now(), time.Minute) + var ( + batchDelay = time.Second + expiry = time.Minute + firstBatchStart = time.Now() + secondBatchStart = time.Now().Add(batchDelay) + thirdBatchStart = time.Now().Add(2 * batchDelay) + thirdBatchExpiry = thirdBatchStart.Add(expiry) + ) + ctx := newMailboxContext(t, firstBatchStart, expiry) defer ctx.mailbox.Stop() failAdds := func(adds []*htlcPacket) { @@ -306,19 +314,54 @@ func TestMailBoxFailAdd(t *testing.T) { const numBatchPackets = 5 // Send 10 adds, and pull them from the mailbox. - adds := ctx.sendAdds(0, numBatchPackets) - ctx.receivePkts(adds) + firstBatch := ctx.sendAdds(0, numBatchPackets) + ctx.receivePkts(firstBatch) // Fail all of these adds, simulating an error adding the HTLCs to the // commitment. We should see a failure message for each. - go failAdds(adds) - ctx.checkFails(adds) + go failAdds(firstBatch) + ctx.checkFails(firstBatch) // As a sanity check, Fail all of them again and assert that no // duplicate fails are sent. - go failAdds(adds) + go failAdds(firstBatch) ctx.checkFails(nil) + // Now, send a second batch of adds after a short delay and deliver them + // to the link. + ctx.clock.SetTime(secondBatchStart) + secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets) + ctx.receivePkts(secondBatch) + + // Reset the packet queue w/o changing the current time. This simulates + // the link flapping and coming back up before the second batch's + // expiries have elapsed. We should see no failures sent back. + err := ctx.mailbox.ResetPackets() + if err != nil { + t.Fatalf("unable to reset packets: %v", err) + } + ctx.checkFails(nil) + + // Redeliver the second batch to the link and hold them there. + ctx.receivePkts(secondBatch) + + // Send a third batch of adds shortly after the second batch. + ctx.clock.SetTime(thirdBatchStart) + thirdBatch := ctx.sendAdds(2*numBatchPackets, numBatchPackets) + + // Advance the clock so that the third batch expires. We expect to only + // see fails for the third batch, since the second batch is still being + // held by the link. + ctx.clock.SetTime(thirdBatchExpiry) + ctx.checkFails(thirdBatch) + + // Finally, reset the link which should cause the second batch to be + // cancelled immediately. + err = ctx.mailbox.ResetPackets() + if err != nil { + t.Fatalf("unable to reset packets: %v", err) + } + ctx.checkFails(secondBatch) } // TestMailBoxPacketPrioritization asserts that the mailbox will prioritize @@ -420,6 +463,38 @@ func TestMailBoxPacketPrioritization(t *testing.T) { } } +// TestMailBoxAddExpiry asserts that the mailbox will cancel back Adds that have +// reached their expiry time. +func TestMailBoxAddExpiry(t *testing.T) { + var ( + expiry = time.Minute + batchDelay = time.Second + firstBatchStart = time.Now() + firstBatchExpiry = firstBatchStart.Add(expiry) + secondBatchStart = firstBatchStart.Add(batchDelay) + secondBatchExpiry = secondBatchStart.Add(expiry) + ) + + ctx := newMailboxContext(t, firstBatchStart, expiry) + defer ctx.mailbox.Stop() + + // Each batch will consist of 10 messages. + const numBatchPackets = 10 + + firstBatch := ctx.sendAdds(0, numBatchPackets) + + ctx.clock.SetTime(secondBatchStart) + ctx.checkFails(nil) + + secondBatch := ctx.sendAdds(numBatchPackets, numBatchPackets) + + ctx.clock.SetTime(firstBatchExpiry) + ctx.checkFails(firstBatch) + + ctx.clock.SetTime(secondBatchExpiry) + ctx.checkFails(secondBatch) +} + // TestMailOrchestrator asserts that the orchestrator properly buffers packets // for channels that haven't been made live, such that they are delivered // immediately after BindLiveShortChanID. It also tests that packets are delivered From 16ad0274c932f1db8c7da9db4ed853630ad7b690 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:50:25 -0700 Subject: [PATCH 09/13] htlcswitch/mailbox: fail on duplicate adds --- htlcswitch/mailbox.go | 16 ++++++++----- htlcswitch/mailbox_test.go | 46 +++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index dd5b4a641..66398e291 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -12,9 +12,15 @@ import ( "github.com/lightningnetwork/lnd/lnwire" ) -// ErrMailBoxShuttingDown is returned when the mailbox is interrupted by a -// shutdown request. -var ErrMailBoxShuttingDown = errors.New("mailbox is shutting down") +var ( + // ErrMailBoxShuttingDown is returned when the mailbox is interrupted by + // a shutdown request. + ErrMailBoxShuttingDown = errors.New("mailbox is shutting down") + + // ErrPacketAlreadyExists signals that an attempt to add a packet failed + // because it already exists in the mailbox. + ErrPacketAlreadyExists = errors.New("mailbox already has packet") +) // MailBox is an interface which represents a concurrent-safe, in-order // delivery queue for messages from the network and also from the main switch. @@ -564,7 +570,7 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC: if _, ok := m.pktIndex[pkt.inKey()]; ok { m.pktCond.L.Unlock() - return nil + return ErrPacketAlreadyExists } entry := m.htlcPkts.PushBack(pkt) @@ -577,7 +583,7 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { case *lnwire.UpdateAddHTLC: if _, ok := m.addIndex[pkt.inKey()]; ok { m.pktCond.L.Unlock() - return nil + return ErrPacketAlreadyExists } entry := m.addPkts.PushBack(&pktWithExpiry{ diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index 999468dce..655caaffe 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -44,7 +44,10 @@ func TestMailBoxCouriers(t *testing.T) { } sentPackets[i] = pkt - mailBox.AddPacket(pkt) + err := mailBox.AddPacket(pkt) + if err != nil { + t.Fatalf("unable to add packet: %v", err) + } } // Next, we'll do the same, but this time adding wire messages. @@ -495,6 +498,47 @@ func TestMailBoxAddExpiry(t *testing.T) { ctx.checkFails(secondBatch) } +// TestMailBoxDuplicateAddPacket asserts that the mailbox returns an +// ErrPacketAlreadyExists failure when two htlcPackets are added with identical +// incoming circuit keys. +func TestMailBoxDuplicateAddPacket(t *testing.T) { + t.Parallel() + + mailBox := newMemoryMailBox(&mailBoxConfig{ + clock: clock.NewDefaultClock(), + }) + mailBox.Start() + defer mailBox.Stop() + + addTwice := func(t *testing.T, pkt *htlcPacket) { + // The first add should succeed. + err := mailBox.AddPacket(pkt) + if err != nil { + t.Fatalf("unable to add packet: %v", err) + } + + // Adding again with the same incoming circuit key should fail. + err = mailBox.AddPacket(pkt) + if err != ErrPacketAlreadyExists { + t.Fatalf("expected ErrPacketAlreadyExists, got: %v", err) + } + } + + // Assert duplicate AddPacket calls fail for all types of HTLCs. + addTwice(t, &htlcPacket{ + incomingHTLCID: 0, + htlc: &lnwire.UpdateAddHTLC{}, + }) + addTwice(t, &htlcPacket{ + incomingHTLCID: 1, + htlc: &lnwire.UpdateFulfillHTLC{}, + }) + addTwice(t, &htlcPacket{ + incomingHTLCID: 2, + htlc: &lnwire.UpdateFailHTLC{}, + }) +} + // TestMailOrchestrator asserts that the orchestrator properly buffers packets // for channels that haven't been made live, such that they are delivered // immediately after BindLiveShortChanID. It also tests that packets are delivered From 6fca22be2b19f216e830f084ee1b4a65527072b5 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:50:45 -0700 Subject: [PATCH 10/13] htlcswitch/link: use return instead of break out There is no clean up logic after the loop, done purely to improve clarity. --- htlcswitch/link.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 27102d550..f474303ec 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1010,13 +1010,12 @@ func (l *channelLink) htlcManager() { go l.fwdPkgGarbager() } -out: for { // We must always check if we failed at some point processing // the last update before processing the next. if l.failed { l.log.Errorf("link failed, exiting htlcManager") - break out + return } // If the previous event resulted in a non-empty batch, resume @@ -1086,7 +1085,7 @@ out: l.cfg.Peer.WipeChannel(chanPoint) }() - break out + return case <-l.cfg.BatchTicker.Ticks(): // Attempt to extend the remote commitment chain @@ -1096,7 +1095,7 @@ out: if err := l.updateCommitTx(); err != nil { l.fail(LinkFailureError{code: ErrInternalError}, "unable to update commitment: %v", err) - break out + return } // A message from the switch was just received. This indicates @@ -1121,11 +1120,11 @@ out: fmt.Sprintf("process hodl queue: %v", err.Error()), ) - break out + return } case <-l.quit: - break out + return } } } From a8977651ccb44f5ab47a75e7321c9a09a73810a1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:51:06 -0700 Subject: [PATCH 11/13] htlcswitch/linkfailure: use whitelist for ShouldSendToPeer --- htlcswitch/linkfailure.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/htlcswitch/linkfailure.go b/htlcswitch/linkfailure.go index c806c4b26..b9ec7596f 100644 --- a/htlcswitch/linkfailure.go +++ b/htlcswitch/linkfailure.go @@ -90,13 +90,23 @@ func (e LinkFailureError) Error() string { // the link fails with this LinkFailureError. func (e LinkFailureError) ShouldSendToPeer() bool { switch e.code { - // If the failure is a result of the peer sending us an error, we don't - // have to respond with one. - case ErrRemoteError: - return false - // In all other cases we will attempt to send our peer an error message. - default: + // Since sending an error can lead some nodes to force close the + // channel, create a whitelist of the failures we want to send so that + // newly added error codes aren't automatically sent to the remote peer. + case + ErrInternalError, + ErrRemoteError, + ErrSyncError, + ErrInvalidUpdate, + ErrInvalidCommitment, + ErrInvalidRevocation, + ErrRecoveryError: + return true + + // In all other cases we will not attempt to send our peer an error. + default: + return false } } From ec1b8d874d41472ca710fc4de89c09931d154556 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:51:30 -0700 Subject: [PATCH 12/13] htlcswitch/link: add pending commit ticker for stall detection This commit adds a PendingCommitTicker to the link config, which allows us to control how quickly we fail the link if the commitment dance stalls. Now that the mailbox has the ability to cancel packets, when the link fails it will reset the mailbox packets on exit, forcing a reevaluation of the HTLCs against their mailbox expiries. --- htlcswitch/link.go | 14 ++++++ htlcswitch/link_test.go | 103 +++++++++++++++++++++++++++++++++++--- htlcswitch/linkfailure.go | 6 +++ htlcswitch/test_utils.go | 1 + peer.go | 1 + 5 files changed, 117 insertions(+), 8 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index f474303ec..c7a86c1e0 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -223,6 +223,11 @@ type ChannelLinkConfig struct { // syncing. FwdPkgGCTicker ticker.Ticker + // PendingCommitTicker is a ticker that allows the link to determine if + // a locally initiated commitment dance gets stuck waiting for the + // remote party to revoke. + PendingCommitTicker ticker.Ticker + // BatchSize is the max size of a batch of updates done to the link // before we do a state update. BatchSize uint32 @@ -1098,6 +1103,11 @@ func (l *channelLink) htlcManager() { return } + case <-l.cfg.PendingCommitTicker.Ticks(): + l.fail(LinkFailureError{code: ErrRemoteUnresponsive}, + "unable to complete dance") + return + // A message from the switch was just received. This indicates // that the link is an intermediate hop in a multi-hop HTLC // circuit. @@ -1934,6 +1944,8 @@ func (l *channelLink) updateCommitTx() error { theirCommitSig, htlcSigs, pendingHTLCs, err := l.channel.SignNextCommitment() if err == lnwallet.ErrNoWindow { + l.cfg.PendingCommitTicker.Resume() + l.log.Tracef("revocation window exhausted, unable to send: "+ "%v, pend_updates=%v, dangling_closes%v", l.channel.PendingLocalUpdateCount(), @@ -1953,6 +1965,8 @@ func (l *channelLink) updateCommitTx() error { return err } + l.cfg.PendingCommitTicker.Pause() + // The remote party now has a new pending commitment, so we'll update // the contract court to be aware of this new set (the prior old remote // pending). diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index ffd53a475..935ccde3d 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1699,10 +1699,11 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - Registry: invoiceRegistry, - ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: bticker, - FwdPkgGCTicker: ticker.NewForce(15 * time.Second), + Registry: invoiceRegistry, + ChainEvents: &contractcourt.ChainEventSubscription{}, + BatchTicker: bticker, + FwdPkgGCTicker: ticker.NewForce(15 * time.Second), + PendingCommitTicker: ticker.New(time.Minute), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -4203,10 +4204,11 @@ func (h *persistentLinkHarness) restartLink( UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - Registry: h.coreLink.cfg.Registry, - ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: bticker, - FwdPkgGCTicker: ticker.New(5 * time.Second), + Registry: h.coreLink.cfg.Registry, + ChainEvents: &contractcourt.ChainEventSubscription{}, + BatchTicker: bticker, + FwdPkgGCTicker: ticker.New(5 * time.Second), + PendingCommitTicker: ticker.New(time.Minute), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -6134,6 +6136,91 @@ func TestChannelLinkReceiveEmptySig(t *testing.T) { aliceLink.Stop() } +// TestPendingCommitTicker tests that a link will fail itself after a timeout if +// the commitment dance stalls out. +func TestPendingCommitTicker(t *testing.T) { + t.Parallel() + + const chanAmt = btcutil.SatoshiPerBitcoin * 5 + const chanReserve = btcutil.SatoshiPerBitcoin * 1 + aliceLink, bobChannel, batchTicker, start, cleanUp, _, err := + newSingleLinkTestHarness(chanAmt, chanReserve) + if err != nil { + t.Fatalf("unable to create link: %v", err) + } + + var ( + coreLink = aliceLink.(*channelLink) + aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs + ) + + coreLink.cfg.PendingCommitTicker = ticker.NewForce(time.Millisecond) + + linkErrs := make(chan LinkFailureError) + coreLink.cfg.OnChannelFailure = func(_ lnwire.ChannelID, + _ lnwire.ShortChannelID, linkErr LinkFailureError) { + + linkErrs <- linkErr + } + + if err := start(); err != nil { + t.Fatalf("unable to start test harness: %v", err) + } + defer cleanUp() + + ctx := linkTestContext{ + t: t, + aliceLink: aliceLink, + bobChannel: bobChannel, + aliceMsgs: aliceMsgs, + } + + // Send an HTLC from Alice to Bob, and signal the batch ticker to signa + // a commitment. + htlc, _ := generateHtlcAndInvoice(t, 0) + ctx.sendHtlcAliceToBob(0, htlc) + ctx.receiveHtlcAliceToBob() + batchTicker <- time.Now() + + select { + case msg := <-aliceMsgs: + if _, ok := msg.(*lnwire.CommitSig); !ok { + t.Fatalf("expected CommitSig, got: %T", msg) + } + case <-time.After(time.Second): + t.Fatalf("alice did not send commit sig") + } + + // Check that Alice hasn't failed. + select { + case linkErr := <-linkErrs: + t.Fatalf("link failed unexpectedly: %v", linkErr) + case <-time.After(50 * time.Millisecond): + } + + // Without completing the dance, send another HTLC from Alice to Bob. + // Since the revocation window has been exhausted, we should see the + // link fail itself immediately due to the low pending commit timeout. + // In production this would be much longer, e.g. a minute. + htlc, _ = generateHtlcAndInvoice(t, 1) + ctx.sendHtlcAliceToBob(1, htlc) + ctx.receiveHtlcAliceToBob() + batchTicker <- time.Now() + + // Assert that we get the expected link failure from Alice. + select { + case linkErr := <-linkErrs: + if linkErr.code != ErrRemoteUnresponsive { + t.Fatalf("error code mismatch, "+ + "want: ErrRemoteUnresponsive, got: %v", + linkErr.code) + } + + case <-time.After(time.Second): + t.Fatalf("did not receive failure") + } +} + // assertFailureCode asserts that an error is of type ClearTextError and that // the failure code is as expected. func assertFailureCode(t *testing.T, err error, code lnwire.FailCode) { diff --git a/htlcswitch/linkfailure.go b/htlcswitch/linkfailure.go index b9ec7596f..840a4d8da 100644 --- a/htlcswitch/linkfailure.go +++ b/htlcswitch/linkfailure.go @@ -20,6 +20,10 @@ const ( // to fail the link. ErrRemoteError + // ErrRemoteUnresponsive indicates that our peer took too long to + // complete a commitment dance. + ErrRemoteUnresponsive + // ErrSyncError indicates that we failed synchronizing the state of the // channel with our peer. ErrSyncError @@ -71,6 +75,8 @@ func (e LinkFailureError) Error() string { return "internal error" case ErrRemoteError: return "remote error" + case ErrRemoteUnresponsive: + return "remote unresponsive" case ErrSyncError: return "sync error" case ErrInvalidUpdate: diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index c997c4f90..c591e65bc 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1167,6 +1167,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, BatchSize: 10, BatchTicker: ticker.NewForce(testBatchTimeout), FwdPkgGCTicker: ticker.NewForce(fwdPkgTimeout), + PendingCommitTicker: ticker.NewForce(time.Minute), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, diff --git a/peer.go b/peer.go index c6e2ae025..3ae997778 100644 --- a/peer.go +++ b/peer.go @@ -669,6 +669,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, SyncStates: syncStates, BatchTicker: ticker.New(50 * time.Millisecond), FwdPkgGCTicker: ticker.New(time.Minute), + PendingCommitTicker: ticker.New(time.Minute), BatchSize: 10, UnsafeReplay: cfg.UnsafeReplay, MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, From 55f90be2a57c74e498ace45cff2bf4aa2890551d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 14 Apr 2020 10:51:48 -0700 Subject: [PATCH 13/13] htlcswitch/mailbox: rename Settle/Fail queue to indicate replies This commit renames the variables associated with processing the Settle/Fail packets to indicate that they are replies. --- htlcswitch/mailbox.go | 66 ++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 66398e291..95d88763e 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -112,10 +112,12 @@ type memoryMailBox struct { messageOutbox chan lnwire.Message msgReset chan chan struct{} - htlcPkts *list.List - pktIndex map[CircuitKey]*list.Element - pktHead *list.Element + // repPkts is a queue for reply packets, e.g. Settles and Fails. + repPkts *list.List + repIndex map[CircuitKey]*list.Element + repHead *list.Element + // addPkts is a dedicated queue for Adds. addPkts *list.List addIndex map[CircuitKey]*list.Element addHead *list.Element @@ -136,13 +138,13 @@ func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox { box := &memoryMailBox{ cfg: cfg, wireMessages: list.New(), - htlcPkts: list.New(), + repPkts: list.New(), addPkts: list.New(), messageOutbox: make(chan lnwire.Message), pktOutbox: make(chan *htlcPacket), msgReset: make(chan chan struct{}, 1), pktReset: make(chan chan struct{}, 1), - pktIndex: make(map[CircuitKey]*list.Element), + repIndex: make(map[CircuitKey]*list.Element), addIndex: make(map[CircuitKey]*list.Element), wireShutdown: make(chan struct{}), pktShutdown: make(chan struct{}), @@ -238,17 +240,17 @@ func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { m.pktCond.L.Lock() defer m.pktCond.L.Unlock() - if entry, ok := m.pktIndex[inKey]; ok { + if entry, ok := m.repIndex[inKey]; ok { // Check whether we are removing the head of the queue. If so, // we must advance the head to the next packet before removing. // It's possible that the courier has already advanced the - // pktHead, so this check prevents the pktHead from getting + // repHead, so this check prevents the repHead from getting // desynchronized. - if entry == m.pktHead { - m.pktHead = entry.Next() + if entry == m.repHead { + m.repHead = entry.Next() } - m.htlcPkts.Remove(entry) - delete(m.pktIndex, inKey) + m.repPkts.Remove(entry) + delete(m.repIndex, inKey) return true } @@ -282,7 +284,7 @@ func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool { // bound for the switch that already have a queued response. func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool { m.pktCond.L.Lock() - _, ok := m.pktIndex[inKey] + _, ok := m.repIndex[inKey] m.pktCond.L.Unlock() return ok @@ -376,7 +378,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { case pktCourier: m.pktCond.L.Lock() - for m.pktHead == nil && m.addHead == nil { + for m.repHead == nil && m.addHead == nil { m.pktCond.Wait() select { @@ -385,7 +387,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // any un-ACK'd messages are re-delivered upon // reconnect. case pktDone := <-m.pktReset: - m.pktHead = m.htlcPkts.Front() + m.repHead = m.repPkts.Front() m.addHead = m.addPkts.Front() close(pktDone) @@ -399,8 +401,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } var ( - nextPkt *htlcPacket - nextPktEl *list.Element + nextRep *htlcPacket + nextRepEl *list.Element nextAdd *pktWithExpiry nextAddEl *list.Element nextMsg lnwire.Message @@ -424,9 +426,9 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // pending Add if it's present. Due to clock // monotonicity, we know that the head of the Adds is // the next to expire. - if m.pktHead != nil { - nextPkt = m.pktHead.Value.(*htlcPacket) - nextPktEl = m.pktHead + if m.repHead != nil { + nextRep = m.repHead.Value.(*htlcPacket) + nextRepEl = m.repHead } if m.addHead != nil { nextAdd = m.addHead.Value.(*pktWithExpiry) @@ -479,8 +481,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) { // same channel, but we can control which is delivered // by exclusively making one nil and the other non-nil. // We know from our loop condition that at least one - // nextPkt and nextAdd are non-nil. - if nextPkt != nil { + // nextRep and nextAdd are non-nil. + if nextRep != nil { pktOutbox = m.pktOutbox } else { addOutbox = m.pktOutbox @@ -503,12 +505,12 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } select { - case pktOutbox <- nextPkt: + case pktOutbox <- nextRep: m.pktCond.L.Lock() - // Only advance the pktHead if this Settle or + // Only advance the repHead if this Settle or // Fail is still at the head of the queue. - if m.pktHead != nil && m.pktHead == nextPktEl { - m.pktHead = m.pktHead.Next() + if m.repHead != nil && m.repHead == nextRepEl { + m.repHead = m.repHead.Next() } m.pktCond.L.Unlock() @@ -526,7 +528,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { case pktDone := <-m.pktReset: m.pktCond.L.Lock() - m.pktHead = m.htlcPkts.Front() + m.repHead = m.repPkts.Front() m.addHead = m.addPkts.Front() m.pktCond.L.Unlock() @@ -566,17 +568,17 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { m.pktCond.L.Lock() switch htlc := pkt.htlc.(type) { - // Split off Settle/Fail packets into the htlcPkts queue. + // Split off Settle/Fail packets into the repPkts queue. case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC: - if _, ok := m.pktIndex[pkt.inKey()]; ok { + if _, ok := m.repIndex[pkt.inKey()]; ok { m.pktCond.L.Unlock() return ErrPacketAlreadyExists } - entry := m.htlcPkts.PushBack(pkt) - m.pktIndex[pkt.inKey()] = entry - if m.pktHead == nil { - m.pktHead = entry + entry := m.repPkts.PushBack(pkt) + m.repIndex[pkt.inKey()] = entry + if m.repHead == nil { + m.repHead = entry } // Split off Add packets into the addPkts queue.