diff --git a/htlcswitch/link.go b/htlcswitch/link.go index b5c66e376..cd2410dd8 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1608,7 +1608,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { return } - // TODO(roasbeef): pipeline to switch + settlePacket := &htlcPacket{ + outgoingChanID: l.ShortChanID(), + outgoingHTLCID: idx, + htlc: &lnwire.UpdateFulfillHTLC{ + PaymentPreimage: pre, + }, + } // Add the newly discovered preimage to our growing list of // uncommitted preimage. These will be written to the witness @@ -1616,6 +1622,9 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // from the remote peer. l.uncommittedPreimages = append(l.uncommittedPreimages, pre) + // Pipeline this settle, send it to the switch. + go l.forwardBatch(settlePacket) + case *lnwire.UpdateFailMalformedHTLC: // Convert the failure type encoded within the HTLC fail // message to the proper generic lnwire error code. diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 6ae141abc..7e4a93026 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -232,10 +232,10 @@ func TestChannelLinkSingleHopPayment(t *testing.T) { t.Fatalf("unable to make the payment: %v", err) } - // Wait for Bob to receive the revocation. + // Wait for Alice to receive the revocation. // // TODO(roasbeef); replace with select over returned err chan - time.Sleep(100 * time.Millisecond) + time.Sleep(2 * time.Second) // Check that alice invoice was settled and bandwidth of HTLC // links was changed. @@ -494,8 +494,8 @@ func testChannelLinkMultiHopPayment(t *testing.T, t.Fatalf("unable to send payment: %v", err) } - // Wait for Bob to receive the revocation. - time.Sleep(100 * time.Millisecond) + // Wait for Alice and Bob's second link to receive the revocation. + time.Sleep(2 * time.Second) // Check that Carol invoice was settled and bandwidth of HTLC // links were changed. @@ -3977,7 +3977,8 @@ func TestChannelLinkAcceptOverpay(t *testing.T) { t.Fatalf("unable to send payment: %v", err) } - time.Sleep(100 * time.Millisecond) + // Wait for Alice and Bob's second link to receive the revocation. + time.Sleep(2 * time.Second) // Even though we sent 2x what was asked for, Carol should still have // accepted the payment and marked it as settled. @@ -5801,7 +5802,12 @@ func TestChannelLinkHoldInvoiceSettle(t *testing.T) { t.Fatal(err) } - // Wait for Bob to receive the revocation. + // Wait for Alice to receive the revocation. This is needed + // because the settles are pipelined to the switch and otherwise + // the bandwidth won't be updated by the time Alice receives a + // response here. + time.Sleep(2 * time.Second) + if ctx.startBandwidthAlice-ctx.amount != ctx.n.aliceChannelLink.Bandwidth() { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index b5ebfcf8e..9c5335a7b 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -175,6 +175,7 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) Notifier: &mockNotifier{}, FwdEventTicker: ticker.NewForce(DefaultFwdEventInterval), LogEventTicker: ticker.NewForce(DefaultLogInterval), + AckEventTicker: ticker.NewForce(DefaultAckInterval), NotifyActiveChannel: func(wire.OutPoint) {}, NotifyInactiveChannel: func(wire.OutPoint) {}, } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 8bcd278ee..22b935bbf 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -30,6 +30,10 @@ const ( // DefaultLogInterval is the duration between attempts to log statistics // about forwarding events. DefaultLogInterval = 10 * time.Second + + // DefaultAckInterval is the duration between attempts to ack any settle + // fails in a forwarding package. + DefaultAckInterval = 15 * time.Second ) var ( @@ -159,6 +163,10 @@ type Config struct { // aggregate stats about it's forwarding during the last interval. LogEventTicker ticker.Ticker + // AckEventTicker is a signal instructing the htlcswitch to ack any settle + // fails in forwarding packages. + AckEventTicker ticker.Ticker + // NotifyActiveChannel and NotifyInactiveChannel allow the link to tell // the ChannelNotifier when channels become active and inactive. NotifyActiveChannel func(wire.OutPoint) @@ -259,6 +267,11 @@ type Switch struct { // active ChainNotifier instance. This will be used to retrieve the // lastest height of the chain. blockEpochStream *chainntnfs.BlockEpochEvent + + // pendingSettleFails is the set of settle/fail entries that we need to + // ack in the forwarding package of the outgoing link. This was added to + // make pipelining settles more efficient. + pendingSettleFails []channeldb.SettleFailRef } // New creates the new instance of htlc switch. @@ -1347,11 +1360,10 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { pkt.outgoingHTLCID) log.Error(err) - // TODO(conner): ack settle/fail if pkt.destRef != nil { - if err := s.ackSettleFail(*pkt.destRef); err != nil { - return nil, err - } + // Add this SettleFailRef to the set of pending settle/fail entries + // awaiting acknowledgement. + s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef) } return nil, err @@ -1366,9 +1378,9 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { // forwarding package of the outgoing link for a payment circuit. We do this if // we're the originator of the payment, so the link stops attempting to // re-broadcast. -func (s *Switch) ackSettleFail(settleFailRef channeldb.SettleFailRef) error { +func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error { return s.cfg.DB.Batch(func(tx *bbolt.Tx) error { - return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRef) + return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...) }) } @@ -1533,8 +1545,17 @@ func (s *Switch) htlcForwarder() { s.cfg.FwdEventTicker.Resume() defer s.cfg.FwdEventTicker.Stop() + defer s.cfg.AckEventTicker.Stop() + out: for { + + // If the set of pending settle/fail entries is non-zero, + // reinstate the ack ticker so we can batch ack them. + if len(s.pendingSettleFails) > 0 { + s.cfg.AckEventTicker.Resume() + } + select { case blockEpoch, ok := <-s.blockEpochStream.Epochs: if !ok { @@ -1697,6 +1718,31 @@ out: totalSatSent += diffSatSent totalSatRecv += diffSatRecv + // The ack ticker has fired so if we have any settle/fail entries + // for a forwarding package to ack, we will do so here in a batch + // db call. + case <-s.cfg.AckEventTicker.Ticks(): + // If the current set is empty, pause the ticker. + if len(s.pendingSettleFails) == 0 { + s.cfg.AckEventTicker.Pause() + continue + } + + // Batch ack the settle/fail entries. + if err := s.ackSettleFail(s.pendingSettleFails...); err != nil { + log.Errorf("Unable to ack batch of settle/fails: %v", err) + continue + } + + log.Tracef("Acked %d settle fails: %v", len(s.pendingSettleFails), + newLogClosure(func() string { + return spew.Sdump(s.pendingSettleFails) + })) + + // Reset the pendingSettleFails buffer while keeping acquired + // memory. + s.pendingSettleFails = s.pendingSettleFails[:0] + case <-s.quit: return } diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 823f3f750..4e4815f55 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -9417,22 +9417,30 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { // Next query for Bob's and Alice's channel states, in order to confirm // that all payment have been successful transmitted. - ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) - aliceChan, err := getChanInfo(ctxt, net.Alice) - if len(aliceChan.PendingHtlcs) != 0 { - t.Fatalf("alice's pending htlcs is incorrect, got %v, "+ - "expected %v", len(aliceChan.PendingHtlcs), 0) - } + + // Wait for the revocation to be received so alice no longer has pending + // htlcs listed and has correct balances. This is needed due to the fact + // that we now pipeline the settles. + err = lntest.WaitPredicate(func() bool { + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + aliceChan, err := getChanInfo(ctxt, net.Alice) + if err != nil { + return false + } + if len(aliceChan.PendingHtlcs) != 0 { + return false + } + if aliceChan.RemoteBalance != bobAmt { + return false + } + if aliceChan.LocalBalance != aliceAmt { + return false + } + + return true + }, time.Second*5) if err != nil { - t.Fatalf("unable to get bob's channel info: %v", err) - } - if aliceChan.RemoteBalance != bobAmt { - t.Fatalf("alice's remote balance is incorrect, got %v, "+ - "expected %v", aliceChan.RemoteBalance, bobAmt) - } - if aliceChan.LocalBalance != aliceAmt { - t.Fatalf("alice's local balance is incorrect, got %v, "+ - "expected %v", aliceChan.LocalBalance, aliceAmt) + t.Fatalf("failed to assert alice's pending htlcs and/or remote/local balance") } // Wait for Bob to receive revocation from Alice. diff --git a/server.go b/server.go index 64b376663..8962e7579 100644 --- a/server.go +++ b/server.go @@ -437,12 +437,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter, FetchLastChannelUpdate: s.fetchLastChanUpdate(), Notifier: s.cc.chainNotifier, - FwdEventTicker: ticker.New( - htlcswitch.DefaultFwdEventInterval), - LogEventTicker: ticker.New( - htlcswitch.DefaultLogInterval), - NotifyActiveChannel: s.channelNotifier.NotifyActiveChannelEvent, - NotifyInactiveChannel: s.channelNotifier.NotifyInactiveChannelEvent, + FwdEventTicker: ticker.New(htlcswitch.DefaultFwdEventInterval), + LogEventTicker: ticker.New(htlcswitch.DefaultLogInterval), + AckEventTicker: ticker.New(htlcswitch.DefaultAckInterval), + NotifyActiveChannel: s.channelNotifier.NotifyActiveChannelEvent, + NotifyInactiveChannel: s.channelNotifier.NotifyInactiveChannelEvent, }, uint32(currentHeight)) if err != nil { return nil, err diff --git a/test_utils.go b/test_utils.go index 044fee17c..cbeac2b49 100644 --- a/test_utils.go +++ b/test_utils.go @@ -365,6 +365,8 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, htlcswitch.DefaultFwdEventInterval), LogEventTicker: ticker.New( htlcswitch.DefaultLogInterval), + AckEventTicker: ticker.New( + htlcswitch.DefaultAckInterval), }, uint32(currentHeight)) if err != nil { return nil, nil, nil, nil, err