From 308ad1caf67c3c7eb689b919b7d70503630cf74d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sat, 28 Apr 2018 23:21:43 -0700 Subject: [PATCH] htlcswitch/link_test: add link trimming tests --- htlcswitch/link_test.go | 844 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 822 insertions(+), 22 deletions(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 1454847ed..892d5ec97 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "fmt" "io" + "reflect" "runtime" "strings" "sync" @@ -1413,30 +1414,31 @@ func (m *mockPeer) Disconnect(reason error) { var _ Peer = (*mockPeer)(nil) func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( - ChannelLink, *lnwallet.LightningChannel, chan time.Time, func(), error) { - globalEpoch := &chainntnfs.BlockEpochEvent{ - Epochs: make(chan *chainntnfs.BlockEpoch), - Cancel: func() { - }, - } + ChannelLink, *lnwallet.LightningChannel, chan time.Time, func(), + chanRestoreFunc, error) { var chanIDBytes [8]byte if _, err := io.ReadFull(rand.Reader, chanIDBytes[:]); err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } chanID := lnwire.NewShortChanIDFromInt( binary.BigEndian.Uint64(chanIDBytes[:])) - aliceChannel, bobChannel, fCleanUp, _, err := createTestChannel( + aliceChannel, bobChannel, fCleanUp, restore, err := createTestChannel( alicePrivKey, bobPrivKey, chanAmt, chanAmt, chanReserve, chanReserve, chanID, ) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } var ( + globalEpoch = &chainntnfs.BlockEpochEvent{ + Epochs: make(chan *chainntnfs.BlockEpoch), + Cancel: func() { + }, + } invoiceRegistry = newMockRegistry() decoder = newMockIteratorDecoder() obfuscator = NewMockObfuscator() @@ -1444,7 +1446,6 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( sentMsgs: make(chan lnwire.Message, 2000), quit: make(chan struct{}), } - globalPolicy = ForwardingPolicy{ MinHTLC: lnwire.NewMSatFromSatoshis(5), BaseFee: lnwire.NewMSatFromSatoshis(1), @@ -1461,7 +1462,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( aliceSwitch, err := New(Config{DB: aliceDb}) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } t := make(chan time.Time) @@ -1494,11 +1495,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( const startingHeight = 100 aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight) - mailbox := newMemoryMailBox() - mailbox.Start() - aliceLink.AttachMailBox(mailbox) - if err := aliceLink.Start(); err != nil { - return nil, nil, nil, nil, err + if err := aliceSwitch.AddLink(aliceLink); err != nil { + return nil, nil, nil, nil, nil, err } go func() { for { @@ -1517,7 +1515,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( defer bobChannel.Stop() } - return aliceLink, bobChannel, t, cleanUp, nil + return aliceLink, bobChannel, t, cleanUp, restore, nil } func assertLinkBandwidth(t *testing.T, link ChannelLink, @@ -1689,7 +1687,8 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { // We'll start the test by creating a single instance of const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, bobChannel, tmr, cleanUp, err := newSingleLinkTestHarness(chanAmt, 0) + aliceLink, bobChannel, tmr, cleanUp, _, err := + newSingleLinkTestHarness(chanAmt, 0) if err != nil { t.Fatalf("unable to create link: %v", err) } @@ -2106,7 +2105,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { var mockBlob [lnwire.OnionPacketSize]byte const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, bobChannel, batchTick, cleanUp, err := + aliceLink, bobChannel, batchTick, cleanUp, _, err := newSingleLinkTestHarness(chanAmt, 0) if err != nil { t.Fatalf("unable to create link: %v", err) @@ -2315,6 +2314,557 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { } } +// genAddsAndCircuits creates `numHtlcs` sequential ADD packets and there +// corresponding circuits. The provided `htlc` is used in all test packets. +func genAddsAndCircuits(numHtlcs int, htlc *lnwire.UpdateAddHTLC) ( + []*htlcPacket, []*PaymentCircuit) { + + addPkts := make([]*htlcPacket, 0, numHtlcs) + circuits := make([]*PaymentCircuit, 0, numHtlcs) + for i := 0; i < numHtlcs; i++ { + addPkt := htlcPacket{ + htlc: htlc, + incomingChanID: sourceHop, + incomingHTLCID: uint64(i), + obfuscator: NewMockObfuscator(), + } + + circuit := makePaymentCircuit(&htlc.PaymentHash, &addPkt) + addPkt.circuit = &circuit + + addPkts = append(addPkts, &addPkt) + circuits = append(circuits, &circuit) + } + + return addPkts, circuits +} + +// TestChannelLinkTrimCircuitsPending checks that the switch and link properly +// trim circuits if there are open circuits corresponding to ADDs on a pending +// commmitment transaction. +func TestChannelLinkTrimCircuitsPending(t *testing.T) { + t.Parallel() + + const ( + chanAmt = btcutil.SatoshiPerBitcoin * 5 + numHtlcs = 4 + halfHtlcs = numHtlcs / 2 + ) + + // We'll start by creating a new link with our chanAmt (5 BTC). We will + // only be testing Alice's behavior, so the reference to Bob's channel + // state is unnecessary. + aliceLink, _, batchTicker, cleanUp, restore, err := + newSingleLinkTestHarness(chanAmt, 0) + if err != nil { + t.Fatalf("unable to create link: %v", err) + } + defer cleanUp() + + alice := newPersistentLinkHarness(t, aliceLink, batchTicker, restore) + + // Compute the static fees that will be used to determine the + // correctness of Alice's bandwidth when forwarding HTLCs. + estimator := &lnwallet.StaticFeeEstimator{ + FeeRate: 24, + } + feeRate, err := estimator.EstimateFeePerVSize(1) + if err != nil { + t.Fatalf("unable to query fee estimator: %v", err) + } + + defaultCommitFee := alice.channel.StateSnapshot().CommitFee + htlcFee := lnwire.NewMSatFromSatoshis( + feeRate.FeePerKWeight().FeeForWeight(lnwallet.HtlcWeight), + ) + + // The starting bandwidth of the channel should be exactly the amount + // that we created the channel between her and Bob, minus the commitment + // fee. + expectedBandwidth := lnwire.NewMSatFromSatoshis(chanAmt - defaultCommitFee) + assertLinkBandwidth(t, alice.link, expectedBandwidth) + + // Capture Alice's starting bandwidth to perform later, relative + // bandwidth assertions. + aliceStartingBandwidth := alice.link.Bandwidth() + + // Next, we'll create an HTLC worth 1 BTC that will be used as a dummy + // message for the test. + var mockBlob [lnwire.OnionPacketSize]byte + htlcAmt := lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin) + _, htlc, err := generatePayment(htlcAmt, htlcAmt, 5, mockBlob) + if err != nil { + t.Fatalf("unable to create payment: %v", err) + } + + // Create `numHtlc` htlcPackets and payment circuits that will be used + // to drive the test. All of the packets will use the same dummy HTLC. + addPkts, circuits := genAddsAndCircuits(numHtlcs, htlc) + + // To begin the test, start by committing the circuits belong to our + // first two HTLCs. + fwdActions := alice.commitCircuits(circuits[:halfHtlcs]) + + // Both of these circuits should have successfully added, as this is the + // first attempt to send them. + if len(fwdActions.Adds) != halfHtlcs { + t.Fatalf("expected %d circuits to be added", halfHtlcs) + } + alice.assertNumPendingNumOpenCircuits(2, 0) + + // Since both were committed successfully, we will now deliver them to + // Alice's link. + for _, addPkt := range addPkts[:halfHtlcs] { + if err := alice.link.HandleSwitchPacket(addPkt); err != nil { + t.Fatalf("unable to handle switch packet: %v", err) + } + } + + // Wait until Alice's link has sent both HTLCs via the peer. + alice.checkSent(addPkts[:halfHtlcs]) + + // The resulting bandwidth should reflect that Alice is paying both + // htlc amounts, in addition to both htlc fees. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee), + ) + + // Now, initiate a state transition by Alice so that the pending HTLCs + // are locked in. This will *not* involve any participation by Bob, + // which ensures the commitment will remain in a pending state. + alice.trySignNextCommitment() + alice.assertNumPendingNumOpenCircuits(2, 2) + + // Restart Alice's link, which simulates a disconnection with the remote + // peer. + cleanUp = alice.restart(false) + defer cleanUp() + + alice.assertNumPendingNumOpenCircuits(2, 2) + + // Make a second attempt to commit the first two circuits. This can + // happen if the incoming link flaps, but also allows us to verify that + // the circuits were trimmed properly. + fwdActions = alice.commitCircuits(circuits[:halfHtlcs]) + + // Since Alice has a pending commitment with the first two HTLCs, the + // restart should not have trimmed them from the circuit map. + // Therefore, we expect both of these circuits to be dropped by the + // switch, as keystones should still be set. + if len(fwdActions.Drops) != halfHtlcs { + t.Fatalf("expected %d packets to be dropped", halfHtlcs) + } + + // The resulting bandwidth should remain unchanged from before, + // reflecting that Alice is paying both htlc amounts, in addition to + // both htlc fees. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee), + ) + + // Now, restart Alice's link *and* the entire switch. This will ensure + // that entire circuit map is reloaded from disk, and we can now test + // against the behavioral differences of committing circuits that + // conflict with duplicate circuits after a restart. + cleanUp = alice.restart(true) + defer cleanUp() + + alice.assertNumPendingNumOpenCircuits(2, 2) + + // Alice should not send out any messages. Even though Alice has a + // pending commitment transaction, channel reestablishment is not + // enabled in this test. + select { + case <-alice.msgs: + t.Fatalf("message should not have been sent by Alice") + case <-time.After(time.Second): + } + + // We will now try to commit the circuits for all of our HTLCs. The + // first two are already on the pending commitment transaction, the + // latter two are new HTLCs. + fwdActions = alice.commitCircuits(circuits) + + // The first two circuits should have been dropped, as they are still on + // the pending commitment transaction, and the restart should not have + // trimmed the circuits for these valid HTLCs. + if len(fwdActions.Drops) != halfHtlcs { + t.Fatalf("expected %d packets to be dropped", halfHtlcs) + } + // The latter two circuits are unknown the circuit map, and should + // report being added. + if len(fwdActions.Adds) != halfHtlcs { + t.Fatalf("expected %d packets to be added", halfHtlcs) + } + + // Deliver the latter two HTLCs to Alice's links so that they can be + // processed and added to the in-memory commitment state. + for _, addPkt := range addPkts[halfHtlcs:] { + if err := alice.link.HandleSwitchPacket(addPkt); err != nil { + t.Fatalf("unable to handle switch packet: %v", err) + } + } + + // Wait for Alice to send the two latter HTLCs via the peer. + alice.checkSent(addPkts[halfHtlcs:]) + + // With two HTLCs on the pending commit, and two added to the in-memory + // commitment state, the resulting bandwidth should reflect that Alice + // is paying the all htlc amounts in addition to all htlc fees. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-numHtlcs*(htlcAmt+htlcFee), + ) + + // We will try to initiate a state transition for Alice, which will + // ensure the circuits for the two in-memory HTLCs are opened. However, + // since we have a pending commitment, these HTLCs will not actually be + // included in a commitment. + alice.trySignNextCommitment() + alice.assertNumPendingNumOpenCircuits(4, 4) + + // Restart Alice's link to simulate a disconnect. Since the switch + // remains up throughout, the two latter HTLCs will remain in the link's + // mailbox, and will reprocessed upon being reattached to the link. + cleanUp = alice.restart(false) + defer cleanUp() + + alice.assertNumPendingNumOpenCircuits(4, 4) + + // Again, try to recommit all of our circuits. + fwdActions = alice.commitCircuits(circuits) + + // It is expected that all of these will get dropped by the switch. + // The first two circuits are still open as a result of being on the + // commitment transaction. The latter two should have had their open + // circuits trimmed, *but* since the HTLCs are still in Alice's mailbox, + // the switch knows not to fail them as a result of the latter two + // circuits never having been loaded from disk. + if len(fwdActions.Drops) != numHtlcs { + t.Fatalf("expected %d packets to be dropped", numHtlcs) + } + + // Wait for the latter two htlcs to be pulled from the mailbox, added to + // the in-memory channel state, and sent out via the peer. + alice.checkSent(addPkts[halfHtlcs:]) + + // This should result in reconstructing the same bandwidth as our last + // assertion. There are two HTLCs on the pending commit, and two added + // to the in-memory commitment state, the resulting bandwidth should + // reflect that Alice is paying the all htlc amounts in addition to all + // htlc fees. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-numHtlcs*(htlcAmt+htlcFee), + ) + + // Again, we will try to initiate a state transition for Alice, which + // will ensure the circuits for the two in-memory HTLCs are opened. + // As before, these HTLCs will not actually be included in a commitment + // since we have a pending commitment. + alice.trySignNextCommitment() + alice.assertNumPendingNumOpenCircuits(4, 4) + + // As a final persistence check, we will restart the link and switch, + // wiping the latter two HTLCs from memory, and forcing their circuits + // to be reloaded from disk. + cleanUp = alice.restart(true) + defer cleanUp() + + alice.assertNumPendingNumOpenCircuits(4, 2) + + // Alice's mailbox will be empty after the restart, and no channel + // reestablishment is configured, so no messages will be sent upon + // restart. + select { + case <-alice.msgs: + t.Fatalf("message should not have been sent by Alice") + case <-time.After(time.Second): + } + + // Finally, make one last attempt to commit all circuits. + fwdActions = alice.commitCircuits(circuits) + + // The first two HTLCs should still be dropped by the htlcswitch. Their + // existence on the pending commitment transaction should prevent their + // open circuits from being trimmed. + if len(fwdActions.Drops) != halfHtlcs { + t.Fatalf("expected %d packets to be dropped", halfHtlcs) + } + // The latter two HTLCs should now be failed by the switch. These will + // have been trimmed by the link or switch restarting, and since the + // HTLCs are known to be lost from memory (since their circuits were + // loaded from disk), it is safe fail them back as they won't ever be + // delivered to the outgoing link. + if len(fwdActions.Fails) != halfHtlcs { + t.Fatalf("expected %d packets to be dropped", halfHtlcs) + } + + // Since the latter two HTLCs have been completely dropped from memory, + // only the first two HTLCs we added should still be reflected in the + // channel bandwidth. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee), + ) +} + +// TestChannelLinkTrimCircuitsNoCommit checks that the switch and link properly trim +// circuits if the ADDs corresponding to open circuits are never committed. +func TestChannelLinkTrimCircuitsNoCommit(t *testing.T) { + t.Parallel() + + const ( + chanAmt = btcutil.SatoshiPerBitcoin * 5 + numHtlcs = 4 + halfHtlcs = numHtlcs / 2 + ) + + // We'll start by creating a new link with our chanAmt (5 BTC). We will + // only be testing Alice's behavior, so the reference to Bob's channel + // state is unnecessary. + aliceLink, _, batchTicker, cleanUp, restore, err := + newSingleLinkTestHarness(chanAmt, 0) + if err != nil { + t.Fatalf("unable to create link: %v", err) + } + defer cleanUp() + + alice := newPersistentLinkHarness(t, aliceLink, batchTicker, restore) + + // We'll put Alice into hodl.Commit mode, such that the circuits for any + // outgoing ADDs are opened, but the changes are not committed in the + // channel state. + alice.coreLink.cfg.HodlMask = hodl.Commit.Mask() + alice.coreLink.cfg.DebugHTLC = true + + // Compute the static fees that will be used to determine the + // correctness of Alice's bandwidth when forwarding HTLCs. + estimator := &lnwallet.StaticFeeEstimator{ + FeeRate: 24, + } + feeRate, err := estimator.EstimateFeePerVSize(1) + if err != nil { + t.Fatalf("unable to query fee estimator: %v", err) + } + + defaultCommitFee := alice.channel.StateSnapshot().CommitFee + htlcFee := lnwire.NewMSatFromSatoshis( + feeRate.FeePerKWeight().FeeForWeight(lnwallet.HtlcWeight), + ) + + // The starting bandwidth of the channel should be exactly the amount + // that we created the channel between her and Bob, minus the commitment + // fee. + expectedBandwidth := lnwire.NewMSatFromSatoshis(chanAmt - defaultCommitFee) + assertLinkBandwidth(t, alice.link, expectedBandwidth) + + // Capture Alice's starting bandwidth to perform later, relative + // bandwidth assertions. + aliceStartingBandwidth := alice.link.Bandwidth() + + // Next, we'll create an HTLC worth 1 BTC that will be used as a dummy + // message for the test. + var mockBlob [lnwire.OnionPacketSize]byte + htlcAmt := lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin) + _, htlc, err := generatePayment(htlcAmt, htlcAmt, 5, mockBlob) + if err != nil { + t.Fatalf("unable to create payment: %v", err) + } + + // Create `numHtlc` htlcPackets and payment circuits that will be used + // to drive the test. All of the packets will use the same dummy HTLC. + addPkts, circuits := genAddsAndCircuits(numHtlcs, htlc) + + // To begin the test, start by committing the circuits belong to our + // first two HTLCs. + fwdActions := alice.commitCircuits(circuits[:halfHtlcs]) + + // Both of these circuits should have successfully added, as this is the + // first attempt to send them. + if len(fwdActions.Adds) != halfHtlcs { + t.Fatalf("expected %d circuits to be added", halfHtlcs) + } + + // Since both were committed successfully, we will now deliver them to + // Alice's link. + for _, addPkt := range addPkts[:halfHtlcs] { + if err := alice.link.HandleSwitchPacket(addPkt); err != nil { + t.Fatalf("unable to handle switch packet: %v", err) + } + } + + // Wait until Alice's link has sent both HTLCs via the peer. + alice.checkSent(addPkts[:halfHtlcs]) + + // The resulting bandwidth should reflect that Alice is paying both + // htlc amounts, in addition to both htlc fees. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee), + ) + + alice.assertNumPendingNumOpenCircuits(2, 0) + + // Now, init a state transition by Alice to try and commit the HTLCs. + // Since she is in hodl.Commit mode, this will fail, but the circuits + // will be opened persistently. + alice.trySignNextCommitment() + + alice.assertNumPendingNumOpenCircuits(2, 2) + + // Restart Alice's link, which simulates a disconnection with the remote + // peer. Alice's link and switch should trim the circuits that were + // opened but not committed. + cleanUp = alice.restart(false, hodl.Commit) + defer cleanUp() + + alice.assertNumPendingNumOpenCircuits(2, 2) + + // The first two HTLCs should have been reset in Alice's mailbox since + // the switch was not shutdown. Knowing this the switch should drop the + // two circuits, even if the circuits were trimmed. + fwdActions = alice.commitCircuits(circuits[:halfHtlcs]) + if len(fwdActions.Drops) != halfHtlcs { + t.Fatalf("expected %d packets to be dropped since "+ + "the switch has not been restarted", halfHtlcs) + } + + // Wait for alice to process the first two HTLCs resend them via the + // peer. + alice.checkSent(addPkts[:halfHtlcs]) + + // The resulting bandwidth should reflect that Alice is paying both htlc + // amounts, in addition to both htlc fees. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee), + ) + // Again, initiate another state transition by Alice to try and commit + // the HTLCs. Since she is in hodl.Commit mode, this will fail, but the + // circuits will be opened persistently. + alice.trySignNextCommitment() + alice.assertNumPendingNumOpenCircuits(2, 2) + + // Now, we we will do a full restart of the link and switch, configuring + // Alice again in hodl.Commit mode. Since none of the HTLCs were + // actually committed, the previously opened circuits should be trimmed + // by both the link and switch. + cleanUp = alice.restart(true, hodl.Commit) + defer cleanUp() + + alice.assertNumPendingNumOpenCircuits(2, 0) + + // Attempt another commit of our first two circuits. Both should fail, + // as the opened circuits should have been trimmed, and circuit map + // recognizes that these HTLCs were lost during the restart. + fwdActions = alice.commitCircuits(circuits[:halfHtlcs]) + if len(fwdActions.Fails) != halfHtlcs { + t.Fatalf("expected %d packets to be failed", halfHtlcs) + } + + // Bob should not receive any HTLCs from Alice, since Alice's mailbox is + // empty and there is no pending commitment. + select { + case <-alice.msgs: + t.Fatalf("received unexpected message from Alice") + case <-time.After(time.Second): + } + + // Alice's bandwidth should have reverted back to her starting value. + assertLinkBandwidth(t, alice.link, aliceStartingBandwidth) + + // Now, try to commit the last two payment circuits, which are unused + // thus far. These should succeed without hestiation. + fwdActions = alice.commitCircuits(circuits[halfHtlcs:]) + if len(fwdActions.Adds) != halfHtlcs { + t.Fatalf("expected %d packets to be added", halfHtlcs) + } + + // Deliver the last two HTLCs to the link via Alice's mailbox. + for _, addPkt := range addPkts[halfHtlcs:] { + if err := alice.link.HandleSwitchPacket(addPkt); err != nil { + t.Fatalf("unable to handle switch packet: %v", err) + } + } + + // Verify that Alice processed and sent out the ADD packets via the + // peer. + alice.checkSent(addPkts[halfHtlcs:]) + + // The resulting bandwidth should reflect that Alice is paying both htlc + // amounts, in addition to both htlc fees. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee), + ) + + // Now, initiate a state transition for Alice. Since we are hodl.Commit + // mode, this will only open the circuits that were added to the + // in-memory channel state. + alice.trySignNextCommitment() + alice.assertNumPendingNumOpenCircuits(4, 2) + + // Restart Alice's link, and place her back in hodl.Commit mode. On + // restart, all previously opened circuits should be trimmed by both the + // link and the switch. + cleanUp = alice.restart(false, hodl.Commit) + defer cleanUp() + + alice.assertNumPendingNumOpenCircuits(4, 2) + + // Now, try to commit all of known circuits. + fwdActions = alice.commitCircuits(circuits) + + // The first two HTLCs will fail to commit for the same reason as + // before, the circuits have been trimmed. + if len(fwdActions.Fails) != halfHtlcs { + t.Fatalf("expected %d packet to be failed", halfHtlcs) + } + // The last two HTLCs will be dropped, as thought the circuits are + // trimmed, the switch is aware that the HTLCs are still in Alice's + // mailbox. + if len(fwdActions.Drops) != halfHtlcs { + t.Fatalf("expected %d packet to be dropped", halfHtlcs) + } + + // Wait until Alice reprocesses the last two HTLCs and sends them via + // the peer. + alice.checkSent(addPkts[halfHtlcs:]) + + // Her bandwidth should now reflect having sent only those two HTLCs. + assertLinkBandwidth(t, alice.link, + aliceStartingBandwidth-halfHtlcs*(htlcAmt+htlcFee), + ) + + // Now, initiate a state transition for Alice. Since we are hodl.Commit + // mode, this will only open the circuits that were added to the + // in-memory channel state. + alice.trySignNextCommitment() + alice.assertNumPendingNumOpenCircuits(4, 2) + + // Finally, do one last restart of both the link and switch. This will + // flush the HTLCs from the mailbox. The circuits should now be trimmed + // for all of the HTLCs. + cleanUp = alice.restart(true, hodl.Commit) + defer cleanUp() + + alice.assertNumPendingNumOpenCircuits(4, 0) + + // Bob should not receive any HTLCs from Alice, as none of the HTLCs are + // in Alice's mailbox, and channel reestablishment is disabled. + select { + case <-alice.msgs: + t.Fatalf("received unexpected message from Alice") + case <-time.After(time.Second): + } + + // Attempt to commit the last two circuits, both should now fail since + // though they were opened before shutting down, the circuits have been + // properly trimmed. + fwdActions = alice.commitCircuits(circuits[halfHtlcs:]) + if len(fwdActions.Fails) != halfHtlcs { + t.Fatalf("expected %d packet to be failed", halfHtlcs) + } + + // Alice balance should not have changed since the start. + assertLinkBandwidth(t, alice.link, aliceStartingBandwidth) +} + // TestChannelLinkBandwidthChanReserve checks that the bandwidth available // on the channel link reflects the channel reserve that must be kept // at all times. @@ -2325,7 +2875,7 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) { // channel reserve. const chanAmt = btcutil.SatoshiPerBitcoin * 5 const chanReserve = btcutil.SatoshiPerBitcoin * 1 - aliceLink, bobChannel, batchTimer, cleanUp, err := + aliceLink, bobChannel, batchTimer, cleanUp, _, err := newSingleLinkTestHarness(chanAmt, chanReserve) if err != nil { t.Fatalf("unable to create link: %v", err) @@ -2440,8 +2990,8 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) { // should therefore be 0. const bobChanAmt = btcutil.SatoshiPerBitcoin * 1 const bobChanReserve = btcutil.SatoshiPerBitcoin * 1.5 - bobLink, _, _, bobCleanUp, err := newSingleLinkTestHarness(bobChanAmt, - bobChanReserve) + bobLink, _, _, bobCleanUp, _, err := + newSingleLinkTestHarness(bobChanAmt, bobChanReserve) if err != nil { t.Fatalf("unable to create link: %v", err) } @@ -3111,3 +3661,253 @@ func TestChannelLinkAcceptOverpay(t *testing.T) { expectedCarolBandwidth, n.carolChannelLink.Bandwidth()) } } + +// chanRestoreFunc is a method signature for functions that can reload both +// endpoints of a link from their persistent storage engines. +type chanRestoreFunc func() (*lnwallet.LightningChannel, *lnwallet.LightningChannel, error) + +// persistentLinkHarness is used to control the lifecylce of a link and the +// switch that operates it. It supports the ability to restart either the link +// or both the link and the switch. +type persistentLinkHarness struct { + t *testing.T + + link ChannelLink + coreLink *channelLink + channel *lnwallet.LightningChannel + + batchTicker chan time.Time + msgs chan lnwire.Message + + restoreChan chanRestoreFunc +} + +// newPersistentLinkHarness initializes a new persistentLinkHarness and derives +// the supporting references from the active link. +func newPersistentLinkHarness(t *testing.T, link ChannelLink, + batchTicker chan time.Time, + restore chanRestoreFunc) *persistentLinkHarness { + + coreLink := link.(*channelLink) + + return &persistentLinkHarness{ + t: t, + link: link, + coreLink: coreLink, + channel: coreLink.channel, + batchTicker: batchTicker, + msgs: coreLink.cfg.Peer.(*mockPeer).sentMsgs, + restoreChan: restore, + } +} + +// restart facilitates a shutdown and restart of the link maintained by the +// harness. The primary purpose of this method is to ensure the consistency of +// the supporting references is maintained across restarts. +// +// If `restartSwitch` is set, the entire switch will also be restarted, +// and will be reinitialized with the contents of the channeldb backing Alice's +// channel. +// +// Any number of hodl flags can be passed as additional arguments to this +// method. If none are provided, the mask will be extracted as hodl.MaskNone. +func (h *persistentLinkHarness) restart(restartSwitch bool, + hodlFlags ...hodl.Flag) func() { + + // First, remove the link from the switch. + h.coreLink.cfg.Switch.RemoveLink(h.link.ChanID()) + + var htlcSwitch *Switch + if restartSwitch { + // If a switch restart is requested, we will stop it and + // leave htlcSwitch nil, which will trigger the creation + // of a fresh instance in restartLink. + h.coreLink.cfg.Switch.Stop() + } else { + // Otherwise, we capture the switch's reference so that + // it can be carried over to the restarted link. + htlcSwitch = h.coreLink.cfg.Switch + } + + // Since our in-memory state may have diverged from our persistent + // state, we will restore the persisted state to ensure we always start + // the link in a consistent state. + var err error + h.channel, _, err = h.restoreChan() + if err != nil { + h.t.Fatalf("unable to restore channels: %v", err) + } + + // Now, restart the link using the channel state. This will take care of + // adding the link to an existing switch, or creating a new one using + // the database owned by the link. + var cleanUp func() + h.link, h.batchTicker, cleanUp, err = restartLink( + h.channel, htlcSwitch, hodlFlags, + ) + if err != nil { + h.t.Fatalf("unable to restart alicelink: %v", err) + } + + // Repopulate the remaining fields in the harness. + h.coreLink = h.link.(*channelLink) + h.msgs = h.coreLink.cfg.Peer.(*mockPeer).sentMsgs + + return cleanUp +} + +// checkSent reads the links message stream and verify that the messages are +// dequeued in the same order as provided by `pkts`. +func (h *persistentLinkHarness) checkSent(pkts []*htlcPacket) { + for _, pkt := range pkts { + var msg lnwire.Message + select { + case msg = <-h.msgs: + case <-time.After(15 * time.Second): + h.t.Fatalf("did not receive message") + } + + if !reflect.DeepEqual(msg, pkt.htlc) { + h.t.Fatalf("unexpected packet, want %v, got %v", + pkt.htlc, msg) + } + } +} + +// commitCircuits accepts a list of circuits and tries to commit them to the +// switch's circuit map. The forwarding actions are returned if there was no +// failure. +func (h *persistentLinkHarness) commitCircuits(circuits []*PaymentCircuit) *CircuitFwdActions { + fwdActions, err := h.coreLink.cfg.Switch.commitCircuits(circuits...) + if err != nil { + h.t.Fatalf("unable to commit circuit: %v", err) + } + + return fwdActions +} + +func (h *persistentLinkHarness) assertNumPendingNumOpenCircuits( + wantPending, wantOpen int) { + + _, _, line, _ := runtime.Caller(1) + + numPending := h.coreLink.cfg.Switch.circuits.NumPending() + if numPending != wantPending { + h.t.Fatalf("line: %d: wrong number of pending circuits: "+ + "want %d, got %d", line, wantPending, numPending) + } + numOpen := h.coreLink.cfg.Switch.circuits.NumOpen() + if numOpen != wantOpen { + h.t.Fatalf("line: %d: wrong number of open circuits: "+ + "want %d, got %d", line, wantOpen, numOpen) + } +} + +// trySignNextCommitment signals the batch ticker so that the link will try to +// update its commitment transaction. +func (h *persistentLinkHarness) trySignNextCommitment() { + select { + case h.batchTicker <- time.Now(): + // Give the link enough time to process the request. + time.Sleep(time.Millisecond * 500) + + case <-time.After(15 * time.Second): + h.t.Fatalf("did not initiate state transition") + } +} + +// restartLink creates a new channel link from the given channel state, and adds +// to an htlcswitch. If none is provided by the caller, a new one will be +// created using Alice's database. +func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, + hodlFlags []hodl.Flag) (ChannelLink, chan time.Time, func(), error) { + + var ( + globalEpoch = &chainntnfs.BlockEpochEvent{ + Epochs: make(chan *chainntnfs.BlockEpoch), + Cancel: func() { + }, + } + invoiceRegistry = newMockRegistry() + decoder = newMockIteratorDecoder() + obfuscator = NewMockObfuscator() + alicePeer = &mockPeer{ + sentMsgs: make(chan lnwire.Message, 2000), + quit: make(chan struct{}), + } + + globalPolicy = ForwardingPolicy{ + MinHTLC: lnwire.NewMSatFromSatoshis(5), + BaseFee: lnwire.NewMSatFromSatoshis(1), + TimeLockDelta: 6, + } + + pCache = &mockPreimageCache{ + // hash -> preimage + preimageMap: make(map[[32]byte][]byte), + } + ) + + aliceDb := aliceChannel.State().Db + + if aliceSwitch == nil { + var err error + aliceSwitch, err = New(Config{DB: aliceDb}) + if err != nil { + return nil, nil, nil, err + } + } + + t := make(chan time.Time) + ticker := &mockTicker{t} + aliceCfg := ChannelLinkConfig{ + FwrdingPolicy: globalPolicy, + Peer: alicePeer, + Switch: aliceSwitch, + Circuits: aliceSwitch.CircuitModifier(), + ForwardPackets: aliceSwitch.ForwardPackets, + DecodeHopIterators: decoder.DecodeHopIterators, + ExtractErrorEncrypter: func(*btcec.PublicKey) ( + ErrorEncrypter, lnwire.FailCode) { + return obfuscator, lnwire.CodeNone + }, + FetchLastChannelUpdate: mockGetChanUpdateMessage, + PreimageCache: pCache, + UpdateContractSignals: func(*contractcourt.ContractSignals) error { + return nil + }, + Registry: invoiceRegistry, + ChainEvents: &contractcourt.ChainEventSubscription{}, + BlockEpochs: globalEpoch, + BatchTicker: ticker, + FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), + // Make the BatchSize large enough to not + // trigger commit update automatically during tests. + BatchSize: 10000, + // Set any hodl flags requested for the new link. + HodlMask: hodl.MaskFromFlags(hodlFlags...), + DebugHTLC: len(hodlFlags) > 0, + } + + const startingHeight = 100 + aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight) + if err := aliceSwitch.AddLink(aliceLink); err != nil { + return nil, nil, nil, err + } + go func() { + for { + select { + case <-aliceLink.(*channelLink).htlcUpdates: + case <-aliceLink.(*channelLink).quit: + return + } + } + }() + + cleanUp := func() { + close(alicePeer.quit) + defer aliceLink.Stop() + } + + return aliceLink, t, cleanUp, nil +}