diff --git a/htlcswitch.go b/htlcswitch.go index bad6ae2b3..710ab825c 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -51,12 +51,14 @@ type htlcPacket struct { dest chainhash.Hash - index uint32 srcLink wire.OutPoint onion *sphinx.ProcessedPacket msg lnwire.Message - amt btcutil.Amount + + // TODO(roasbeef): refactor and add type to pkt message + payHash [32]byte + amt btcutil.Amount err chan error } @@ -70,16 +72,16 @@ type circuitKey [32]byte // paymentCircuit represents an active Sphinx (onion routing) circuit between // two active links within the htlcSwitch. A payment circuit is created once a -// link forwards an HTLC add request which initites the creation of the ciruit. -// The onion routing informtion contained within this message is used to -// identify the settle/clear ends of the circuit. A circuit may be re-used (not -// torndown) in the case that multiple HTLC's with the send RHash are sent. +// link forwards an HTLC add request which initiates the creation of the +// circuit. The onion routing information contained within this message is +// used to identify the settle/clear ends of the circuit. A circuit may be +// re-used (not torndown) in the case that multiple HTLC's with the send RHash +// are sent. type paymentCircuit struct { // TODO(roasbeef): add reference count so know when to delete? // * atomic int re // * due to same r-value being re-used? - // NOTE: This integer must be used *atomically*. refCount uint32 // clear is the link the htlcSwitch will forward the HTLC add message @@ -290,6 +292,8 @@ out: // state so we can properly forward the ultimate // settle message. case *lnwire.HTLCAddRequest: + payHash := wireMsg.RedemptionHashes[0] + // Create the two ends of the payment circuit // required to ensure completion of this new // payment. @@ -300,6 +304,27 @@ out: if !ok { hswcLog.Errorf("unable to find dest end of "+ "circuit: %x", nextHop) + + // We we're unable to locate the + // next-hop as encoded within the + // Sphinx packet. Therefore, we send a + // cancellation message back to the + // source of the packet so they can + // propagate the message back to the + // origin. + cancelPkt := &htlcPacket{ + payHash: payHash, + msg: &lnwire.CancelHTLC{ + Reason: lnwire.UnknownDestination, + }, + err: make(chan error, 1), + } + + h.chanIndexMtx.RLock() + cancelLink := h.chanIndex[pkt.srcLink] + h.chanIndexMtx.RUnlock() + + cancelLink.linkChan <- cancelPkt continue } @@ -307,8 +332,30 @@ out: settleLink := h.chanIndex[pkt.srcLink] h.chanIndexMtx.RUnlock() - // TODO(roasbeef): examine per-hop info to decide on link? - // * check clear has enough available sat + // If the link we're attempting to forward the + // HTLC over has insufficient capacity, then + // we'll cancel the HTLC as the payment cannot + // succeed. + linkBandwidth := atomic.LoadInt64(&clearLink[0].availableBandwidth) + if linkBandwidth < int64(wireMsg.Amount) { + hswcLog.Errorf("unable to forward HTLC "+ + "link %v has insufficient "+ + "capacity, have %v need %v", + clearLink[0].chanPoint, linkBandwidth, + int64(wireMsg.Amount)) + + pkt := &htlcPacket{ + payHash: payHash, + msg: &lnwire.CancelHTLC{ + Reason: lnwire.InsufficientCapacity, + }, + err: make(chan error, 1), + } + + settleLink.linkChan <- pkt + continue + } + circuit := &paymentCircuit{ clear: clearLink[0], settle: settleLink, @@ -380,6 +427,45 @@ out: circuit.settle.chanPoint, n) satSent += pkt.amt + + delete(h.paymentCircuits, cKey) + + // We've just received an HTLC cancellation triggered + // by an upstream peer somewhere within the ultimate + // route. In response, we'll terminate the payment + // circuit and propagate the error backwards. + case *lnwire.CancelHTLC: + // In order to properly handle the error, well + // need to look up the original circuit that + // the incoming HTLC created. + circuit, ok := h.paymentCircuits[pkt.payHash] + if !ok { + hswcLog.Debugf("No existing circuit "+ + "for %x to cancel", pkt.payHash) + continue + } + + // Since an outgoing HTLC we sent on the clear + // link as he cancelled, we update the + // bandwidth of the clear link, restoring the + // value of the HTLC worth. + n := atomic.AddInt64(&circuit.clear.availableBandwidth, + int64(pkt.amt)) + hswcLog.Debugf("HTLC %x has been cancelled, "+ + "incrementing link %v bandwidth to %v", pkt.payHash, + circuit.clear.chanPoint, n) + + // With our link info updated, we now continue + // the error propagation by sending the + // cancellation message over the link that sent + // us the incoming HTLC. + circuit.settle.linkChan <- &htlcPacket{ + msg: wireMsg, + payHash: pkt.payHash, + err: make(chan error, 1), + } + + delete(h.paymentCircuits, pkt.payHash) } case <-logTicker.C: if numUpdates == 0 { diff --git a/peer.go b/peer.go index a07a2d78b..a53944268 100644 --- a/peer.go +++ b/peer.go @@ -5,6 +5,7 @@ import ( "container/list" "crypto/rand" "encoding/binary" + "errors" "fmt" "net" "sync" @@ -317,8 +318,8 @@ func (p *peer) Disconnect() { // Launch a goroutine to clean up the remaining resources. go func() { // Tell the switch to unregister all links associated with this - // peer. Passing nil as the target link indicates that all links - // associated with this interface should be closed. + // peer. Passing nil as the target link indicates that all + // links associated with this interface should be closed. p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil) p.server.donePeers <- p @@ -385,12 +386,16 @@ out: case *lnwire.ErrorGeneric: p.server.fundingMgr.processErrorGeneric(msg, p) + // TODO(roasbeef): create ChanUpdater interface for the below case *lnwire.HTLCAddRequest: isChanUpdate = true targetChan = msg.ChannelPoint case *lnwire.HTLCSettleRequest: isChanUpdate = true targetChan = msg.ChannelPoint + case *lnwire.CancelHTLC: + isChanUpdate = true + targetChan = msg.ChannelPoint case *lnwire.CommitRevocation: isChanUpdate = true targetChan = msg.ChannelPoint @@ -510,6 +515,7 @@ fin: break fin } } + p.wg.Done() peerLog.Tracef("writeHandler for peer %v done", p) } @@ -944,6 +950,16 @@ type commitmentState struct { // many of the pending HTLC's we've received from the upstream peer. htlcsToSettle map[uint32]*channeldb.Invoice + // htlcsToCancel is a set of HTLC's identified by their log index which + // are to be cancelled upon the next state transition. + htlcsToCancel map[uint32]lnwire.CancelReason + + // cancelReasons stores the reason why a particular HTLC was cancelled. + // The index of the HTLC within the log is mapped to the cancellation + // reason. This value is used to thread the proper error through to the + // htlcSwitch, or sub-system that initiated the HTLC. + cancelReasons map[uint32]lnwire.CancelReason + // TODO(roasbeef): use once trickle+batch logic is in pendingBatch []*pendingPayment @@ -967,7 +983,7 @@ type commitmentState struct { switchChan chan<- *htlcPacket // sphinx is an instance of the Sphinx onion Router for this node. The - // router will be used to process all incmoing Sphinx packets embedded + // router will be used to process all incoming Sphinx packets embedded // within HTLC add messages. sphinx *sphinx.Router @@ -1018,6 +1034,8 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, chanPoint: channel.ChannelPoint(), clearedHTCLs: make(map[uint32]*pendingPayment), htlcsToSettle: make(map[uint32]*channeldb.Invoice), + htlcsToCancel: make(map[uint32]lnwire.CancelReason), + cancelReasons: make(map[uint32]lnwire.CancelReason), pendingCircuits: make(map[uint32]*sphinx.ProcessedPacket), sphinx: p.server.sphinx, switchChan: htlcPlex, @@ -1148,6 +1166,9 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { }) case *lnwire.HTLCSettleRequest: + // An HTLC we forward to the switch has just settle somehere + // upstream. Therefore we settle the HTLC within the our local + // state machine. pre := htlc.RedemptionProofs[0] logIndex, err := state.channel.SettleHTLC(pre) if err != nil { @@ -1157,9 +1178,35 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { return } + // With the HTLC settled, we'll need to populate the wire + // message to target the specific channel and HTLC to be + // cancelled. htlc.ChannelPoint = state.chanPoint htlc.HTLCKey = lnwire.HTLCKey(logIndex) + // Then we send the HTLC settle message to the connected peer + // so we can continue the propagation of the settle message. + p.queueMsg(htlc, nil) + isSettle = true + + case *lnwire.CancelHTLC: + // An HTLC cancellation has been triggered somewhere upstream, + // we'll remove then HTLC from our local state machine. + logIndex, err := state.channel.CancelHTLC(pkt.payHash) + if err != nil { + peerLog.Errorf("unable to cancel HTLC: %v", err) + return + } + + // With the HTLC removed, we'll need to populate the wire + // message to target the specific channel and HTLC to be + // cancelled. The "Reason" field will have already been set + // within the switch. + htlc.ChannelPoint = state.chanPoint + htlc.HTLCKey = lnwire.HTLCKey(logIndex) + + // Finally, we send the HTLC message to the peer which + // initially created the HTLC. p.queueMsg(htlc, nil) isSettle = true } @@ -1199,6 +1246,18 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { return } + // We just received an add request from an upstream peer, so we + // add it to our state machine, then add the HTLC to our + // "settle" list in the event that we know the pre-image + index, err := state.channel.ReceiveHTLC(htlcPkt) + if err != nil { + peerLog.Errorf("Receiving HTLC rejected: %v", err) + return + } + + // TODO(roasbeef): perform sanity checks on per-hop payload + // * time-lock is sane, fee, chain, etc + // Attempt to process the Sphinx packet. We include the payment // hash of the HTLC as it's authenticated within the Sphinx // packet itself as associated data in order to thwart attempts @@ -1208,20 +1267,11 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { rHash := htlcPkt.RedemptionHashes[0][:] sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt, rHash) if err != nil { + // If we're unable to parse the Sphinx packet, then + // we'll cancel the HTLC after the current commitment + // transition. peerLog.Errorf("unable to process onion pkt: %v", err) - p.Disconnect() - return - } - - // TODO(roasbeef): perform sanity checks on per-hop payload - // * time-lock is sane, fee, chain, etc - - // We just received an add request from an upstream peer, so we - // add it to our state machine, then add the HTLC to our - // "settle" list in the event that we know the pre-image - index, err := state.channel.ReceiveHTLC(htlcPkt) - if err != nil { - peerLog.Errorf("Receiving HTLC rejected: %v", err) + state.htlcsToCancel[index] = lnwire.SphinxParseError return } @@ -1233,14 +1283,29 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { rHash := htlcPkt.RedemptionHashes[0] invoice, err := p.server.invoices.LookupInvoice(rHash) if err != nil { - // TODO(roasbeef): send a canceHTLC message if we can't settle. - peerLog.Errorf("unable to query to locate: %v", err) - p.Disconnect() + // If we're the exit node, but don't recognize + // the payment hash, then we'll fail the HTLC + // on the next state transition. + peerLog.Errorf("unable to settle HTLC, "+ + "payment hash (%x) unrecognized", rHash[:]) + state.htlcsToCancel[index] = lnwire.UnknownPaymentHash return } - // TODO(roasbeef): check values accept if >= - state.htlcsToSettle[index] = invoice + // If we're not currently in debug mode, and the + // extended HTLC doesn't meet the value requested, then + // we'll fail the HTLC. + if !cfg.DebugHTLC && htlcPkt.Amount < invoice.Terms.Value { + peerLog.Errorf("rejecting HTLC due to incorrect "+ + "amount: expected %v, received %v", + invoice.Terms.Value, htlcPkt.Amount) + state.htlcsToCancel[index] = lnwire.IncorrectValue + } else { + // Otherwise, everything is in order and we'll + // settle the HTLC after the current state + // transition. + state.htlcsToSettle[index] = invoice + } // There are additional hops left within this route, so we // track the next hop according to the index of this HTLC @@ -1248,15 +1313,12 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // switch, we'll attach the routing information so the switch // can finalize the circuit. case sphinx.MoreHops: - // TODO(roasbeef): send cancel + error if not in - // routing table state.pendingCircuits[index] = sphinxPacket default: peerLog.Errorf("mal formed onion packet") - p.Disconnect() + state.htlcsToCancel[index] = lnwire.SphinxParseError } case *lnwire.HTLCSettleRequest: - // TODO(roasbeef): this assumes no "multi-sig" pre := htlcPkt.RedemptionProofs[0] idx := uint32(htlcPkt.HTLCKey) if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil { @@ -1268,6 +1330,16 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // TODO(roasbeef): add pre-image to DB in order to swipe // repeated r-values + case *lnwire.CancelHTLC: + idx := uint32(htlcPkt.HTLCKey) + if err := state.channel.ReceiveCancelHTLC(idx); err != nil { + peerLog.Errorf("unable to recv HTLC cancel: %v", err) + p.Disconnect() + return + } + + state.cancelReasons[idx] = htlcPkt.Reason + case *lnwire.CommitSignature: // We just received a new update to our local commitment chain, // validate this new commitment, closing the link if invalid. @@ -1313,15 +1385,29 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { } // If any of the htlc's eligible for forwarding are pending - // settling or timeing out previous outgoing payments, then we + // settling or timing out previous outgoing payments, then we // can them from the pending set, and signal the requester (if // existing) that the payment has been fully fulfilled. var bandwidthUpdate btcutil.Amount settledPayments := make(map[lnwallet.PaymentHash]struct{}) - numSettled := 0 + cancelledHtlcs := make(map[uint32]struct{}) for _, htlc := range htlcsToForward { - if p, ok := state.clearedHTCLs[htlc.ParentIndex]; ok { - p.err <- nil + parentIndex := htlc.ParentIndex + if p, ok := state.clearedHTCLs[parentIndex]; ok { + switch htlc.EntryType { + // If the HTLC was settled successfully, then + // we return a nil error back to the possible + // caller. + case lnwallet.Settle: + p.err <- nil + + // Otherwise, the HTLC failed, so we propagate + // the error back to the potential caller. + case lnwallet.Cancel: + errMsg := state.cancelReasons[parentIndex] + p.err <- errors.New(errMsg.String()) + } + delete(state.clearedHTCLs, htlc.ParentIndex) } @@ -1331,54 +1417,82 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { continue } - // If we can't immediately settle this HTLC, then we - // can halt processing here. + // If we can settle this HTLC within our local state + // update log, then send the update entry to the remote + // party. invoice, ok := state.htlcsToSettle[htlc.Index] + if ok { + preimage := invoice.Terms.PaymentPreimage + logIndex, err := state.channel.SettleHTLC(preimage) + if err != nil { + peerLog.Errorf("unable to settle htlc: %v", err) + p.Disconnect() + continue + } + + settleMsg := &lnwire.HTLCSettleRequest{ + ChannelPoint: state.chanPoint, + HTLCKey: lnwire.HTLCKey(logIndex), + RedemptionProofs: [][32]byte{preimage}, + } + p.queueMsg(settleMsg, nil) + + delete(state.htlcsToSettle, htlc.Index) + settledPayments[htlc.RHash] = struct{}{} + + bandwidthUpdate += htlc.Amount + continue + } + + // Alternatively, if we marked this HTLC for + // cancellation, then immediately cancel the HTLC as + // it's now locked in within both commitment + // transactions. + reason, ok := state.htlcsToCancel[htlc.Index] if !ok { continue } - // Otherwise, we settle this HTLC within our local - // state update log, then send the update entry to the - // remote party. - preimage := invoice.Terms.PaymentPreimage - logIndex, err := state.channel.SettleHTLC(preimage) + logIndex, err := state.channel.CancelHTLC(htlc.RHash) if err != nil { - peerLog.Errorf("unable to settle htlc: %v", err) + peerLog.Errorf("unable to cancel htlc: %v", err) p.Disconnect() continue } - settleMsg := &lnwire.HTLCSettleRequest{ - ChannelPoint: state.chanPoint, - HTLCKey: lnwire.HTLCKey(logIndex), - RedemptionProofs: [][32]byte{preimage}, + cancelMsg := &lnwire.CancelHTLC{ + ChannelPoint: state.chanPoint, + HTLCKey: lnwire.HTLCKey(logIndex), + Reason: reason, } - p.queueMsg(settleMsg, nil) - delete(state.htlcsToSettle, htlc.Index) + p.queueMsg(cancelMsg, nil) + delete(state.htlcsToCancel, htlc.Index) - bandwidthUpdate += htlc.Amount - settledPayments[htlc.RHash] = struct{}{} - - numSettled++ + cancelledHtlcs[htlc.Index] = struct{}{} } go func() { for _, htlc := range htlcsToForward { // We don't need to forward any HTLC's that we - // just settled above. + // just settled or cancelled above. // TODO(roasbeef): key by index instead? if _, ok := settledPayments[htlc.RHash]; ok { continue } + if _, ok := cancelledHtlcs[htlc.Index]; ok { + continue + } onionPkt := state.pendingCircuits[htlc.Index] delete(state.pendingCircuits, htlc.Index) + reason := state.cancelReasons[htlc.ParentIndex] + delete(state.cancelReasons, htlc.ParentIndex) + // Send this fully activated HTLC to the htlc // switch to continue the chained clear/settle. pkt, err := logEntryToHtlcPkt(*state.chanPoint, - htlc, onionPkt) + htlc, onionPkt, reason) if err != nil { peerLog.Errorf("unable to make htlc pkt: %v", err) @@ -1390,7 +1504,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { }() - if numSettled == 0 { + if len(settledPayments) == 0 && len(cancelledHtlcs) == 0 { return } @@ -1466,11 +1580,12 @@ func (p *peer) updateCommitTx(state *commitmentState) (bool, error) { // logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP) // log entry the corresponding htlcPacket with src/dest set along with the -// proper wire message. This helepr method is provided in order to aide an +// proper wire message. This helper method is provided in order to aide an // htlcManager in forwarding packets to the htlcSwitch. func logEntryToHtlcPkt(chanPoint wire.OutPoint, pd *lnwallet.PaymentDescriptor, - onionPkt *sphinx.ProcessedPacket) (*htlcPacket, error) { + onionPkt *sphinx.ProcessedPacket, + reason lnwire.CancelReason) (*htlcPacket, error) { pkt := &htlcPacket{} @@ -1493,6 +1608,14 @@ func logEntryToHtlcPkt(chanPoint wire.OutPoint, msg = &lnwire.HTLCSettleRequest{ RedemptionProofs: [][32]byte{pd.RPreimage}, } + case lnwallet.Cancel: + // For cancellation messages, we'll also need to set the rHash + // within the htlcPacket so the switch knows on which outbound + // link to forward the cancellation message + msg = &lnwire.CancelHTLC{ + Reason: reason, + } + pkt.payHash = pd.RHash } pkt.amt = pd.Amount