diff --git a/htlcswitch/link.go b/htlcswitch/link.go index d95ac1adf..4385eb5bb 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -9,6 +9,8 @@ import ( "io" + "encoding/hex" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" @@ -82,6 +84,10 @@ type channelLink struct { // which may affect behaviour of the service. cfg *ChannelLinkConfig + // queue is used to store the htlc add updates which haven't been + // processed because of the commitment trancation overflow. + queue *packetQueue + // upstream is a channel which responsible for propagating the // received from remote peer messages, with which we have an opened // channel, to handler function. @@ -115,6 +121,7 @@ func NewChannelLink(cfg *ChannelLinkConfig, downstream: make(chan *htlcPacket), control: make(chan interface{}), cancelReasons: make(map[uint64]lnwire.OpaqueReason), + queue: newWaitingQueue(), quit: make(chan struct{}), } } @@ -248,7 +255,31 @@ out: break out } + // Previously add update have been added to the reprocessing + // queue because of the overflooding threat, and now we are + // trying to process it again. + case packet := <-l.queue.pending: + msg := packet.htlc.(*lnwire.UpdateAddHTLC) + log.Infof("Reprocess downstream add update "+ + "with payment hash(%v)", + hex.EncodeToString(msg.PaymentHash[:])) + l.handleDownStreamPkt(packet) + case pkt := <-l.downstream: + // If we have non empty processing queue than in + // order to preserve the order of add updates + // consume it, and process it later. + htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) + if ok && l.queue.length() != 0 { + log.Infof("Downstream htlc add update with "+ + "payment hash(%v) have been added to "+ + "reprocessing queue, batch: %v", + hex.EncodeToString(htlc.PaymentHash[:]), + l.batchCounter) + + l.queue.consume(pkt) + continue + } l.handleDownStreamPkt(pkt) case msg := <-l.upstream: @@ -282,7 +313,15 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // chains. htlc.ChanID = l.ChanID() index, err := l.channel.AddHTLC(htlc) - if err != nil { + if err == lnwallet.ErrMaxHTLCNumber { + log.Infof("Downstream htlc add update with "+ + "payment hash(%v) have been added to "+ + "reprocessing queue, batch: %v", + hex.EncodeToString(htlc.PaymentHash[:]), + l.batchCounter) + l.queue.consume(pkt) + return + } else if err != nil { // TODO: possibly perform fallback/retry logic // depending on type of error @@ -298,6 +337,10 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { err) return } + log.Tracef("Receive downstream htlc with payment hash"+ + "(%v), assign the index: %v, batch: %v", + hex.EncodeToString(htlc.PaymentHash[:]), + index, l.batchCounter+1) htlc.ID = index l.cfg.Peer.SendMessage(htlc) @@ -381,6 +424,9 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { l.cfg.Peer.Disconnect() return } + log.Tracef("Receive upstream htlc with payment hash(%v), "+ + "assign the index: %v", + hex.EncodeToString(msg.PaymentHash[:]), index) // TODO(roasbeef): perform sanity checks on per-hop payload // * time-lock is sane, fee, chain, etc @@ -611,6 +657,7 @@ func (l *channelLink) processLockedInHtlcs( &lnwire.UpdateFufillHTLC{ PaymentPreimage: pd.RPreimage, }, pd.RHash, pd.Amount)) + l.queue.release() case lnwallet.Fail: opaqueReason := l.cancelReasons[pd.ParentIndex] @@ -625,6 +672,7 @@ func (l *channelLink) processLockedInHtlcs( Reason: opaqueReason, ChanID: l.ChanID(), }, pd.RHash, pd.Amount)) + l.queue.release() case lnwallet.Add: blob := l.blobs[pd.Index] diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 45c4c0c05..819ae7478 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -126,6 +126,82 @@ func TestChannelLinkSingleHopPayment(t *testing.T) { } } +// TestChannelLinkBidirectionalOneHopPayments tests the ability of channel +// link to cope with bigger number of payment updates that commitment +// transaction may consist. +func TestChannelLinkBidirectionalOneHopPayments(t *testing.T) { + n := newThreeHopNetwork(t, + btcutil.SatoshiPerBitcoin*3, + btcutil.SatoshiPerBitcoin*5, + ) + if err := n.start(); err != nil { + t.Fatal(err) + } + defer n.stop() + + bobBandwidthBefore := n.firstBobChannelLink.Bandwidth() + aliceBandwidthBefore := n.aliceChannelLink.Bandwidth() + + debug := false + if debug { + // Log message that alice receives. + n.aliceServer.record(createLogFunc("alice", + n.aliceChannelLink.ChanID())) + + // Log message that bob receives. + n.bobServer.record(createLogFunc("bob", + n.firstBobChannelLink.ChanID())) + } + + // Send max available payment number in both sides, thereby testing + // the property of channel link to cope with overflowing. + errChan := make(chan error) + count := 2 * lnwallet.MaxHTLCNumber + for i := 0; i < count/2; i++ { + go func() { + _, err := n.makePayment([]Peer{ + n.aliceServer, + n.bobServer, + }, 10) + errChan <- err + }() + } + + for i := 0; i < count/2; i++ { + go func() { + _, err := n.makePayment([]Peer{ + n.bobServer, + n.aliceServer, + }, 10) + errChan <- err + }() + } + + // Check that alice invoice was settled and bandwidth of HTLC + // links was changed. + for i := 0; i < count; i++ { + select { + case err := <-errChan: + if err != nil { + t.Fatalf("unable to make the payment: %v", err) + } + case <-time.After(4 * time.Second): + t.Fatalf("timeout: (%v/%v)", i+1, count) + } + + } + + // At the end Bob and Alice balances should be the same as previous, + // because they sent the equal amount of money to each other. + if aliceBandwidthBefore != n.aliceChannelLink.Bandwidth() { + t.Fatal("alice bandwidth shouldn't have changed") + } + + if bobBandwidthBefore != n.firstBobChannelLink.Bandwidth() { + t.Fatal("bob bandwidth shouldn't have changed") + } +} + // TestChannelLinkMultiHopPayment checks the ability to send payment over two // hopes. In this test we send the payment from Carol to Alice over Bob peer. // (Carol -> Bob -> Alice) and checking that HTLC was settled properly and diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index b3b5e4cb9..c0aebd53e 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -359,7 +359,7 @@ func (n *threeHopNetwork) makePayment(peers []Peer, select { case err := <-errChan: return invoice, err - case <-time.After(6 * time.Second): + case <-time.After(12 * time.Second): return invoice, errors.New("htlc was no settled in time") } }