diff --git a/lnrpc/routerrpc/forward_interceptor.go b/lnrpc/routerrpc/forward_interceptor.go index c644cd6b6..e0b0d4ca8 100644 --- a/lnrpc/routerrpc/forward_interceptor.go +++ b/lnrpc/routerrpc/forward_interceptor.go @@ -150,7 +150,7 @@ func (r *forwardInterceptor) holdAndForwardToClient( htlc := forward.Packet() inKey := htlc.IncomingCircuit - // ignore already held htlcs. + // Ignore already held htlcs. if _, ok := r.holdForwards[inKey]; ok { return nil } diff --git a/lntest/itest/lnd_forward_interceptor_test.go b/lntest/itest/lnd_forward_interceptor_test.go index 26af2150b..035576594 100644 --- a/lntest/itest/lnd_forward_interceptor_test.go +++ b/lntest/itest/lnd_forward_interceptor_test.go @@ -33,7 +33,168 @@ type interceptorTestCase struct { interceptorAction routerrpc.ResolveHoldForwardAction } -// testForwardInterceptor tests the forward interceptor RPC layer. +// testForwardInterceptorDedupHtlc tests that upon reconnection, duplicate +// HTLCs aren't re-notified using the HTLC interceptor API. +func testForwardInterceptorDedupHtlc(net *lntest.NetworkHarness, t *harnessTest) { + // Initialize the test context with 3 connected nodes. + alice := net.NewNode(t.t, "alice", nil) + defer shutdownAndAssert(net, t, alice) + + bob := net.NewNode(t.t, "bob", nil) + defer shutdownAndAssert(net, t, alice) + + carol := net.NewNode(t.t, "carol", nil) + defer shutdownAndAssert(net, t, alice) + + tc := newInterceptorTestContext(t, net, alice, bob, carol) + + const ( + chanAmt = btcutil.Amount(300000) + ) + + // Open and wait for channels. + tc.openChannel(tc.alice, tc.bob, chanAmt) + tc.openChannel(tc.bob, tc.carol, chanAmt) + defer tc.closeChannels() + tc.waitForChannels() + + ctxb := context.Background() + ctxt, cancelInterceptor := context.WithCancel(ctxb) + interceptor, err := tc.bob.RouterClient.HtlcInterceptor(ctxt) + require.NoError(tc.t.t, err, "failed to create HtlcInterceptor") + + addResponse, err := tc.carol.AddInvoice(ctxb, &lnrpc.Invoice{ + ValueMsat: 1000, + }) + require.NoError(tc.t.t, err, "unable to add invoice") + + invoice, err := tc.carol.LookupInvoice(ctxb, &lnrpc.PaymentHash{ + RHashStr: hex.EncodeToString(addResponse.RHash), + }) + require.NoError(tc.t.t, err, "unable to find invoice") + + // We start the htlc interceptor with a simple implementation that + // saves all intercepted packets. These packets are held to simulate a + // pending payment. + interceptedPacketstMap := &sync.Map{} + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + packet, err := interceptor.Recv() + if err != nil { + // If it is just the error result of the + // context cancellation the we exit silently. + status, ok := status.FromError(err) + if ok && status.Code() == codes.Canceled { + return + } + + // Otherwise it an unexpected error, we fail + // the test. + require.NoError( + tc.t.t, err, + "unexpected error in interceptor.Recv()", + ) + return + } + interceptedPacketstMap.Store( + packet.IncomingCircuitKey.HtlcId, packet, + ) + } + }() + + // We initiate a payment from Alice. + wg.Add(1) + go func() { + defer wg.Done() + _, _ = tc.sendAliceToCarolPayment( + ctxb, 1000, + invoice.RHash, invoice.PaymentAddr, + ) + }() + + // Here we should wait for the channel to contain a pending htlc, and + // also be shown as being active. + err = wait.Predicate(func() bool { + channels, err := tc.bob.ListChannels(ctxt, &lnrpc.ListChannelsRequest{ + ActiveOnly: true, + Peer: tc.alice.PubKey[:], + }) + if err != nil { + return false + } + if len(channels.Channels) == 0 { + return false + } + + aliceChan := channels.Channels[0] + if len(aliceChan.PendingHtlcs) == 0 { + return false + } + return aliceChan.Active + }, defaultTimeout) + require.NoError( + tc.t.t, err, "alice <> bob channel pending htlc never arrived", + ) + + // At this point we want to make bob's link send all pending htlcs to + // the switch again. We force this behavior by disconnecting and + // connecting to the peer. + if err := tc.net.DisconnectNodes(tc.bob, tc.alice); err != nil { + tc.t.Fatalf("failed to disconnect alice and bob") + } + tc.net.EnsureConnected(tc.t.t, tc.bob, tc.alice) + + // Here we wait for the channel to be active again. + err = wait.Predicate(func() bool { + req := &lnrpc.ListChannelsRequest{ + ActiveOnly: true, + Peer: tc.alice.PubKey[:], + } + + channels, err := tc.bob.ListChannels(ctxt, req) + return err == nil && len(channels.Channels) > 0 + }, defaultTimeout) + require.NoError( + tc.t.t, err, "alice <> bob channel didn't re-activate", + ) + + // Now that the channel is active we make sure the test passes as + // expected. + payments, err := tc.alice.ListPayments(ctxb, &lnrpc.ListPaymentsRequest{ + IncludeIncomplete: true, + }) + require.NoError(tc.t.t, err, "failed to fetch payment") + + // We expect one in flight payment since we held the htlcs. + require.Equal(tc.t.t, len(payments.Payments), 1) + require.Equal(tc.t.t, payments.Payments[0].Status, lnrpc.Payment_IN_FLIGHT) + + // We now fail all htlcs to cancel the payment. + packetsCount := 0 + interceptedPacketstMap.Range(func(_, packet interface{}) bool { + p := packet.(*routerrpc.ForwardHtlcInterceptRequest) + _ = interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{ + IncomingCircuitKey: p.IncomingCircuitKey, + Action: routerrpc.ResolveHoldForwardAction_FAIL, + }) + packetsCount++ + return true + }) + + // At this point if we have more than one held htlcs then we should + // fail. This means we hold the same htlc twice which is a risk we + // want to eliminate. If we don't have the same htlc twice in theory we + // can cancel one and settle the other by mistake. + require.Equal(tc.t.t, packetsCount, 1) + + cancelInterceptor() + wg.Wait() +} + +// testForwardInterceptorBasic tests the forward interceptor RPC layer. // The test creates a cluster of 3 connected nodes: Alice -> Bob -> Carol // Alice sends 4 different payments to Carol while the interceptor handles // differently the htlcs. @@ -43,7 +204,7 @@ type interceptorTestCase struct { // 3. Intercepted held htlcs result in no payment (invoice is not settled). // 4. When Interceptor disconnects it resumes all held htlcs, which result in // valid payment (invoice is settled). -func testForwardInterceptor(net *lntest.NetworkHarness, t *harnessTest) { +func testForwardInterceptorBasic(net *lntest.NetworkHarness, t *harnessTest) { // Initialize the test context with 3 connected nodes. alice := net.NewNode(t.t, "alice", nil) defer shutdownAndAssert(net, t, alice) diff --git a/lntest/itest/lnd_test_list_on_test.go b/lntest/itest/lnd_test_list_on_test.go index b5067433d..bfcb3450d 100644 --- a/lntest/itest/lnd_test_list_on_test.go +++ b/lntest/itest/lnd_test_list_on_test.go @@ -312,8 +312,12 @@ var allTestCases = []*testCase{ test: testRestAPI, }, { - name: "intercept forwarded htlc packets", - test: testForwardInterceptor, + name: "forward interceptor", + test: testForwardInterceptorBasic, + }, + { + name: "forward interceptor dedup htlcs", + test: testForwardInterceptorDedupHtlc, }, { name: "wumbo channels",