mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-04-13 06:29:07 +02:00
lntemp+itest: refactor testForwardInterceptorDedupHtlc
This commit is contained in:
parent
707d888aa4
commit
97a7638c50
@ -1808,3 +1808,36 @@ func (h *HarnessTest) QueryRoutesAndRetry(hn *node.HarnessNode,
|
||||
|
||||
return routes
|
||||
}
|
||||
|
||||
// ReceiveHtlcInterceptor waits until a message is received on the htlc
|
||||
// interceptor stream or the timeout is reached.
|
||||
func (h *HarnessTest) ReceiveHtlcInterceptor(
|
||||
stream rpc.InterceptorClient) *routerrpc.ForwardHtlcInterceptRequest {
|
||||
|
||||
chanMsg := make(chan *routerrpc.ForwardHtlcInterceptRequest)
|
||||
errChan := make(chan error)
|
||||
go func() {
|
||||
// Consume one message. This will block until the message is
|
||||
// received.
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
chanMsg <- resp
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(DefaultTimeout):
|
||||
require.Fail(h, "timeout", "timeout intercepting htlc")
|
||||
|
||||
case err := <-errChan:
|
||||
require.Failf(h, "err from stream",
|
||||
"received err from stream: %v", err)
|
||||
|
||||
case updateMsg := <-chanMsg:
|
||||
return updateMsg
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -162,3 +162,18 @@ func (h *HarnessRPC) BuildRoute(
|
||||
h.NoError(err, "BuildRoute")
|
||||
return resp
|
||||
}
|
||||
|
||||
type InterceptorClient routerrpc.Router_HtlcInterceptorClient
|
||||
|
||||
// HtlcInterceptor makes a RPC call to the node's RouterClient and asserts.
|
||||
func (h *HarnessRPC) HtlcInterceptor() (InterceptorClient, context.CancelFunc) {
|
||||
// HtlcInterceptor needs to have the context alive for the entire test
|
||||
// case as the returned client will be used for send and receive events
|
||||
// stream. Thus we use cancel context here instead of a timeout
|
||||
// context.
|
||||
ctxt, cancel := context.WithCancel(h.runCtx)
|
||||
resp, err := h.Router.HtlcInterceptor(ctxt)
|
||||
h.NoError(err, "HtlcInterceptor")
|
||||
|
||||
return resp, cancel
|
||||
}
|
||||
|
@ -409,4 +409,8 @@ var allTestCasesTemp = []*lntemp.TestCase{
|
||||
Name: "sendtoroute amp",
|
||||
TestFunc: testSendToRouteAMP,
|
||||
},
|
||||
{
|
||||
Name: "forward interceptor dedup htlcs",
|
||||
TestFunc: testForwardInterceptorDedupHtlc,
|
||||
},
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -12,8 +13,11 @@ import (
|
||||
"github.com/lightningnetwork/lnd/chainreg"
|
||||
"github.com/lightningnetwork/lnd/lnrpc"
|
||||
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
|
||||
"github.com/lightningnetwork/lnd/lntemp"
|
||||
"github.com/lightningnetwork/lnd/lntemp/node"
|
||||
"github.com/lightningnetwork/lnd/lntest"
|
||||
"github.com/lightningnetwork/lnd/lntest/wait"
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -35,163 +39,145 @@ type interceptorTestCase struct {
|
||||
|
||||
// testForwardInterceptorDedupHtlc tests that upon reconnection, duplicate
|
||||
// HTLCs aren't re-notified using the HTLC interceptor API.
|
||||
func testForwardInterceptorDedupHtlc(net *lntest.NetworkHarness, t *harnessTest) {
|
||||
func testForwardInterceptorDedupHtlc(ht *lntemp.HarnessTest) {
|
||||
// Initialize the test context with 3 connected nodes.
|
||||
alice := net.NewNode(t.t, "alice", nil)
|
||||
defer shutdownAndAssert(net, t, alice)
|
||||
ts := newInterceptorTestScenario(ht)
|
||||
|
||||
bob := net.NewNode(t.t, "bob", nil)
|
||||
defer shutdownAndAssert(net, t, bob)
|
||||
|
||||
carol := net.NewNode(t.t, "carol", nil)
|
||||
defer shutdownAndAssert(net, t, carol)
|
||||
|
||||
tc := newInterceptorTestContext(t, net, alice, bob, carol)
|
||||
|
||||
const (
|
||||
chanAmt = btcutil.Amount(300000)
|
||||
)
|
||||
alice, bob, carol := ts.alice, ts.bob, ts.carol
|
||||
|
||||
// Open and wait for channels.
|
||||
tc.openChannel(tc.alice, tc.bob, chanAmt)
|
||||
tc.openChannel(tc.bob, tc.carol, chanAmt)
|
||||
defer tc.closeChannels()
|
||||
tc.waitForChannels()
|
||||
const chanAmt = btcutil.Amount(300000)
|
||||
p := lntemp.OpenChannelParams{Amt: chanAmt}
|
||||
reqs := []*lntemp.OpenChannelRequest{
|
||||
{Local: alice, Remote: bob, Param: p},
|
||||
{Local: bob, Remote: carol, Param: p},
|
||||
}
|
||||
resp := ht.OpenMultiChannelsAsync(reqs)
|
||||
cpAB, cpBC := resp[0], resp[1]
|
||||
|
||||
ctxb := context.Background()
|
||||
ctxt, cancelInterceptor := context.WithCancel(ctxb)
|
||||
interceptor, err := tc.bob.RouterClient.HtlcInterceptor(ctxt)
|
||||
require.NoError(tc.t.t, err, "failed to create HtlcInterceptor")
|
||||
// Make sure Alice is aware of channel Bob=>Carol.
|
||||
ht.AssertTopologyChannelOpen(alice, cpBC)
|
||||
|
||||
addResponse, err := tc.carol.AddInvoice(ctxb, &lnrpc.Invoice{
|
||||
ValueMsat: 1000,
|
||||
})
|
||||
require.NoError(tc.t.t, err, "unable to add invoice")
|
||||
// Connect the interceptor.
|
||||
interceptor, cancelInterceptor := bob.RPC.HtlcInterceptor()
|
||||
|
||||
invoice, err := tc.carol.LookupInvoice(ctxb, &lnrpc.PaymentHash{
|
||||
RHashStr: hex.EncodeToString(addResponse.RHash),
|
||||
})
|
||||
require.NoError(tc.t.t, err, "unable to find invoice")
|
||||
// Prepare the test cases.
|
||||
req := &lnrpc.Invoice{ValueMsat: 1000}
|
||||
addResponse := carol.RPC.AddInvoice(req)
|
||||
invoice := carol.RPC.LookupInvoice(addResponse.RHash)
|
||||
tc := &interceptorTestCase{
|
||||
amountMsat: 1000,
|
||||
invoice: invoice,
|
||||
payAddr: invoice.PaymentAddr,
|
||||
}
|
||||
|
||||
// We initiate a payment from Alice.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
// Signal that all the payments have been sent.
|
||||
defer close(done)
|
||||
|
||||
ts.sendPaymentAndAssertAction(tc)
|
||||
}()
|
||||
|
||||
// We start the htlc interceptor with a simple implementation that
|
||||
// saves all intercepted packets. These packets are held to simulate a
|
||||
// pending payment.
|
||||
interceptedPacketstMap := &sync.Map{}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
packet, err := interceptor.Recv()
|
||||
if err != nil {
|
||||
// If it is just the error result of the
|
||||
// context cancellation the we exit silently.
|
||||
status, ok := status.FromError(err)
|
||||
if ok && status.Code() == codes.Canceled {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise it an unexpected error, we fail
|
||||
// the test.
|
||||
require.NoError(
|
||||
tc.t.t, err,
|
||||
"unexpected error in interceptor.Recv()",
|
||||
)
|
||||
return
|
||||
}
|
||||
interceptedPacketstMap.Store(
|
||||
packet.IncomingCircuitKey.HtlcId, packet,
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
// We initiate a payment from Alice.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, _ = tc.sendAliceToCarolPayment(
|
||||
ctxb, 1000,
|
||||
invoice.RHash, invoice.PaymentAddr,
|
||||
)
|
||||
}()
|
||||
packet := ht.ReceiveHtlcInterceptor(interceptor)
|
||||
|
||||
// Here we should wait for the channel to contain a pending htlc, and
|
||||
// also be shown as being active.
|
||||
err = wait.Predicate(func() bool {
|
||||
channels, err := tc.bob.ListChannels(ctxt, &lnrpc.ListChannelsRequest{
|
||||
ActiveOnly: true,
|
||||
Peer: tc.alice.PubKey[:],
|
||||
})
|
||||
if err != nil {
|
||||
return false
|
||||
err := wait.NoError(func() error {
|
||||
channel := ht.QueryChannelByChanPoint(bob, cpAB)
|
||||
|
||||
if len(channel.PendingHtlcs) == 0 {
|
||||
return fmt.Errorf("expect alice <> bob channel to " +
|
||||
"have pending htlcs")
|
||||
}
|
||||
if len(channels.Channels) == 0 {
|
||||
return false
|
||||
if channel.Active {
|
||||
return nil
|
||||
}
|
||||
|
||||
aliceChan := channels.Channels[0]
|
||||
if len(aliceChan.PendingHtlcs) == 0 {
|
||||
return false
|
||||
}
|
||||
return aliceChan.Active
|
||||
return fmt.Errorf("channel not active")
|
||||
}, defaultTimeout)
|
||||
require.NoError(
|
||||
tc.t.t, err, "alice <> bob channel pending htlc never arrived",
|
||||
ht, err, "alice <> bob channel pending htlc never arrived",
|
||||
)
|
||||
|
||||
// At this point we want to make bob's link send all pending htlcs to
|
||||
// the switch again. We force this behavior by disconnecting and
|
||||
// connecting to the peer.
|
||||
if err := tc.net.DisconnectNodes(tc.bob, tc.alice); err != nil {
|
||||
tc.t.Fatalf("failed to disconnect alice and bob")
|
||||
}
|
||||
tc.net.EnsureConnected(tc.t.t, tc.bob, tc.alice)
|
||||
ht.DisconnectNodes(bob, alice)
|
||||
ht.EnsureConnected(bob, alice)
|
||||
|
||||
// Here we wait for the channel to be active again.
|
||||
err = wait.Predicate(func() bool {
|
||||
req := &lnrpc.ListChannelsRequest{
|
||||
ActiveOnly: true,
|
||||
Peer: tc.alice.PubKey[:],
|
||||
}
|
||||
|
||||
channels, err := tc.bob.ListChannels(ctxt, req)
|
||||
return err == nil && len(channels.Channels) > 0
|
||||
}, defaultTimeout)
|
||||
require.NoError(
|
||||
tc.t.t, err, "alice <> bob channel didn't re-activate",
|
||||
)
|
||||
ht.AssertChannelExists(bob, cpAB)
|
||||
|
||||
// Now that the channel is active we make sure the test passes as
|
||||
// expected.
|
||||
payments, err := tc.alice.ListPayments(ctxb, &lnrpc.ListPaymentsRequest{
|
||||
IncludeIncomplete: true,
|
||||
})
|
||||
require.NoError(tc.t.t, err, "failed to fetch payment")
|
||||
|
||||
// We expect one in flight payment since we held the htlcs.
|
||||
require.Equal(tc.t.t, len(payments.Payments), 1)
|
||||
require.Equal(tc.t.t, payments.Payments[0].Status, lnrpc.Payment_IN_FLIGHT)
|
||||
|
||||
// We now fail all htlcs to cancel the payment.
|
||||
packetsCount := 0
|
||||
interceptedPacketstMap.Range(func(_, packet interface{}) bool {
|
||||
p := packet.(*routerrpc.ForwardHtlcInterceptRequest)
|
||||
_ = interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{
|
||||
IncomingCircuitKey: p.IncomingCircuitKey,
|
||||
Action: routerrpc.ResolveHoldForwardAction_FAIL,
|
||||
})
|
||||
packetsCount++
|
||||
return true
|
||||
})
|
||||
var preimage lntypes.Preimage
|
||||
copy(preimage[:], invoice.RPreimage)
|
||||
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT)
|
||||
|
||||
// At this point if we have more than one held htlcs then we should
|
||||
// fail. This means we hold the same htlc twice which is a risk we
|
||||
// want to eliminate. If we don't have the same htlc twice in theory we
|
||||
// can cancel one and settle the other by mistake.
|
||||
require.Equal(tc.t.t, packetsCount, 1)
|
||||
// fail. This means we hold the same htlc twice which is a risk we want
|
||||
// to eliminate. If we don't have the same htlc twice in theory we can
|
||||
// cancel one and settle the other by mistake.
|
||||
errDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(errDone)
|
||||
|
||||
_, err := interceptor.Recv()
|
||||
require.Error(ht, err, "expected an error from interceptor")
|
||||
|
||||
status, ok := status.FromError(err)
|
||||
switch {
|
||||
// If it is just the error result of the context cancellation
|
||||
// the we exit silently.
|
||||
case ok && status.Code() == codes.Canceled:
|
||||
fallthrough
|
||||
|
||||
// When the test ends, during the node's shutdown it will close
|
||||
// the connection.
|
||||
case strings.Contains(err.Error(), "closed network connection"):
|
||||
fallthrough
|
||||
|
||||
case strings.Contains(err.Error(), "EOF"):
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise we receive an unexpected error.
|
||||
require.Failf(ht, "interceptor", "unexpected err: %v", err)
|
||||
}()
|
||||
|
||||
// We now fail all htlcs to cancel the payment.
|
||||
err = interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{
|
||||
IncomingCircuitKey: packet.IncomingCircuitKey,
|
||||
Action: routerrpc.ResolveHoldForwardAction_FAIL,
|
||||
})
|
||||
require.NoError(ht, err, "failed to send request")
|
||||
|
||||
// Cancel the context, which will disconnect the above interceptor.
|
||||
cancelInterceptor()
|
||||
wg.Wait()
|
||||
|
||||
// Make sure all goroutines are finished.
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(defaultTimeout):
|
||||
require.Fail(ht, "timeout waiting for sending payment")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-errDone:
|
||||
case <-time.After(defaultTimeout):
|
||||
require.Fail(ht, "timeout waiting for interceptor error")
|
||||
}
|
||||
|
||||
// Finally, close channels.
|
||||
ht.CloseChannel(alice, cpAB)
|
||||
ht.CloseChannel(bob, cpBC)
|
||||
}
|
||||
|
||||
// testForwardInterceptorBasic tests the forward interceptor RPC layer.
|
||||
@ -609,3 +595,147 @@ func (c *interceptorTestContext) buildRoute(ctx context.Context, amtMsat int64,
|
||||
|
||||
return routeResp.Route, nil
|
||||
}
|
||||
|
||||
// interceptorTestScenario is a helper struct to hold the test context and
|
||||
// provide the needed functionality.
|
||||
type interceptorTestScenario struct {
|
||||
ht *lntemp.HarnessTest
|
||||
alice, bob, carol *node.HarnessNode
|
||||
}
|
||||
|
||||
// newInterceptorTestScenario initializes a new test scenario with three nodes
|
||||
// and connects them to have the following topology,
|
||||
//
|
||||
// Alice --> Bob --> Carol
|
||||
//
|
||||
// Among them, Alice and Bob are standby nodes and Carol is a new node.
|
||||
func newInterceptorTestScenario(
|
||||
ht *lntemp.HarnessTest) *interceptorTestScenario {
|
||||
|
||||
alice, bob := ht.Alice, ht.Bob
|
||||
carol := ht.NewNode("carol", nil)
|
||||
|
||||
ht.EnsureConnected(alice, bob)
|
||||
ht.EnsureConnected(bob, carol)
|
||||
return &interceptorTestScenario{
|
||||
ht: ht,
|
||||
alice: alice,
|
||||
bob: bob,
|
||||
carol: carol,
|
||||
}
|
||||
}
|
||||
|
||||
// prepareTestCases prepares 4 tests:
|
||||
// 1. failed htlc.
|
||||
// 2. resumed htlc.
|
||||
// 3. settling htlc externally.
|
||||
// 4. held htlc that is resumed later.
|
||||
func (c *interceptorTestScenario) prepareTestCases() []*interceptorTestCase {
|
||||
cases := []*interceptorTestCase{
|
||||
{
|
||||
amountMsat: 1000, shouldHold: false,
|
||||
interceptorAction: routerrpc.ResolveHoldForwardAction_FAIL,
|
||||
},
|
||||
{
|
||||
amountMsat: 1000, shouldHold: false,
|
||||
interceptorAction: routerrpc.ResolveHoldForwardAction_RESUME,
|
||||
},
|
||||
{
|
||||
amountMsat: 1000, shouldHold: false,
|
||||
interceptorAction: routerrpc.ResolveHoldForwardAction_SETTLE,
|
||||
},
|
||||
{
|
||||
amountMsat: 1000, shouldHold: true,
|
||||
interceptorAction: routerrpc.ResolveHoldForwardAction_RESUME,
|
||||
},
|
||||
}
|
||||
|
||||
for _, t := range cases {
|
||||
inv := &lnrpc.Invoice{ValueMsat: t.amountMsat}
|
||||
addResponse := c.carol.RPC.AddInvoice(inv)
|
||||
invoice := c.carol.RPC.LookupInvoice(addResponse.RHash)
|
||||
|
||||
// We'll need to also decode the returned invoice so we can
|
||||
// grab the payment address which is now required for ALL
|
||||
// payments.
|
||||
payReq := c.carol.RPC.DecodePayReq(invoice.PaymentRequest)
|
||||
|
||||
t.invoice = invoice
|
||||
t.payAddr = payReq.PaymentAddr
|
||||
}
|
||||
return cases
|
||||
}
|
||||
|
||||
// sendPaymentAndAssertAction sends a payment from alice to carol and asserts
|
||||
// that the specified interceptor action is taken.
|
||||
func (c *interceptorTestScenario) sendPaymentAndAssertAction(
|
||||
tc *interceptorTestCase) *lnrpc.HTLCAttempt {
|
||||
|
||||
// Build a route from alice to carol.
|
||||
route := c.buildRoute(
|
||||
tc.amountMsat, []*node.HarnessNode{c.bob, c.carol}, tc.payAddr,
|
||||
)
|
||||
|
||||
// Send a custom record to the forwarding node.
|
||||
route.Hops[0].CustomRecords = map[uint64][]byte{
|
||||
customTestKey: customTestValue,
|
||||
}
|
||||
|
||||
// Send the payment.
|
||||
sendReq := &routerrpc.SendToRouteRequest{
|
||||
PaymentHash: tc.invoice.RHash,
|
||||
Route: route,
|
||||
}
|
||||
return c.alice.RPC.SendToRouteV2(sendReq)
|
||||
}
|
||||
|
||||
func (c *interceptorTestScenario) assertAction(tc *interceptorTestCase,
|
||||
attempt *lnrpc.HTLCAttempt) {
|
||||
|
||||
// Now check the expected action has been taken.
|
||||
switch tc.interceptorAction {
|
||||
// For 'fail' interceptor action we make sure the payment failed.
|
||||
case routerrpc.ResolveHoldForwardAction_FAIL:
|
||||
require.Equal(c.ht, lnrpc.HTLCAttempt_FAILED, attempt.Status,
|
||||
"expected payment to fail")
|
||||
|
||||
// Assert that we get a temporary channel failure which has a
|
||||
// channel update.
|
||||
require.NotNil(c.ht, attempt.Failure)
|
||||
require.NotNil(c.ht, attempt.Failure.ChannelUpdate)
|
||||
|
||||
require.Equal(c.ht, lnrpc.Failure_TEMPORARY_CHANNEL_FAILURE,
|
||||
attempt.Failure.Code)
|
||||
|
||||
// For settle and resume we make sure the payment is successful.
|
||||
case routerrpc.ResolveHoldForwardAction_SETTLE:
|
||||
fallthrough
|
||||
|
||||
case routerrpc.ResolveHoldForwardAction_RESUME:
|
||||
require.Equal(c.ht, lnrpc.HTLCAttempt_SUCCEEDED,
|
||||
attempt.Status, "expected payment to succeed")
|
||||
}
|
||||
}
|
||||
|
||||
// buildRoute is a helper function to build a route with given hops.
|
||||
func (c *interceptorTestScenario) buildRoute(amtMsat int64,
|
||||
hops []*node.HarnessNode, payAddr []byte) *lnrpc.Route {
|
||||
|
||||
rpcHops := make([][]byte, 0, len(hops))
|
||||
for _, hop := range hops {
|
||||
k := hop.PubKeyStr
|
||||
pubkey, err := route.NewVertexFromStr(k)
|
||||
require.NoErrorf(c.ht, err, "error parsing %v: %v", k, err)
|
||||
rpcHops = append(rpcHops, pubkey[:])
|
||||
}
|
||||
|
||||
req := &routerrpc.BuildRouteRequest{
|
||||
AmtMsat: amtMsat,
|
||||
FinalCltvDelta: chainreg.DefaultBitcoinTimeLockDelta,
|
||||
HopPubkeys: rpcHops,
|
||||
PaymentAddr: payAddr,
|
||||
}
|
||||
|
||||
routeResp := c.alice.RPC.BuildRoute(req)
|
||||
return routeResp.Route
|
||||
}
|
||||
|
@ -44,10 +44,6 @@ var allTestCases = []*testCase{
|
||||
name: "forward interceptor",
|
||||
test: testForwardInterceptorBasic,
|
||||
},
|
||||
{
|
||||
name: "forward interceptor dedup htlcs",
|
||||
test: testForwardInterceptorDedupHtlc,
|
||||
},
|
||||
{
|
||||
name: "wallet import account",
|
||||
test: testWalletImportAccount,
|
||||
|
Loading…
x
Reference in New Issue
Block a user