diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 36852d36c..6961050ac 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -76,6 +76,14 @@ type ChannelLink interface { // policy to govern if it an incoming HTLC should be forwarded or not. UpdateForwardingPolicy(ForwardingPolicy) + // HtlcSatifiesPolicy should return a nil error if the passed HTLC + // details satisfy the current forwarding policy fo the target link. + // Otherwise, a valid protocol failure message should be returned in + // order to signal to the source of the HTLC, the policy consistency + // issue. + HtlcSatifiesPolicy(payHash [32]byte, + incomingAmt, amtToForward lnwire.MilliSatoshi) lnwire.FailureMessage + // Bandwidth returns the amount of milli-satoshis which current link // might pass through channel link. The value returned from this method // represents the up to date available flow through the channel. This diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 2a7feae23..d8eecac26 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -79,7 +79,8 @@ type ForwardingPolicy struct { // // TODO(roasbeef): also add in current available channel bandwidth, inverse // func -func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi { +func ExpectedFee(f ForwardingPolicy, + htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi { // TODO(roasbeef): write some basic table driven tests return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 @@ -151,10 +152,12 @@ type ChannelLinkConfig struct { // Sphinx onion blob, and creating onion failure obfuscator. ExtractErrorEncrypter ErrorEncrypterExtracter - // GetLastChannelUpdate retrieves the latest routing policy for this - // particular channel. This will be used to provide payment senders our - // latest policy when sending encrypted error messages. - GetLastChannelUpdate func() (*lnwire.ChannelUpdate, error) + // FetchLastChannelUpdate retrieves the latest routing policy for a + // target channel. This channel will typically be the outgoing channel + // specified when we receive an incoming HTLC. This will be used to + // provide payment senders our latest policy when sending encrypted + // error messages. + FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) // Peer is a lightning network node with which we have the channel link // opened. @@ -310,11 +313,6 @@ type channelLink struct { // by the HTLC switch. downstream chan *htlcPacket - // linkControl is a channel which is used to query the state of the - // link, or update various policies used which govern if an HTLC is to - // be forwarded and/or accepted. - linkControl chan interface{} - // htlcUpdates is a channel that we'll use to update outside // sub-systems with the latest set of active HTLC's on our channel. htlcUpdates chan []channeldb.HTLC @@ -342,7 +340,6 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, cfg: cfg, channel: channel, shortChanID: channel.ShortChanID(), - linkControl: make(chan interface{}), // TODO(roasbeef): just do reserve here? logCommitTimer: time.NewTimer(300 * time.Millisecond), overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), @@ -920,30 +917,6 @@ out: case msg := <-l.upstream: l.handleUpstreamMsg(msg) - // TODO(roasbeef): make distinct goroutine to handle? - case cmd := <-l.linkControl: - - switch req := cmd.(type) { - case *policyUpdate: - // In order to avoid overriding a valid policy - // with a "null" field in the new policy, we'll - // only update to the set sub policy if the new - // value isn't uninitialized. - if req.policy.BaseFee != 0 { - l.cfg.FwrdingPolicy.BaseFee = req.policy.BaseFee - } - if req.policy.FeeRate != 0 { - l.cfg.FwrdingPolicy.FeeRate = req.policy.FeeRate - } - if req.policy.TimeLockDelta != 0 { - l.cfg.FwrdingPolicy.TimeLockDelta = req.policy.TimeLockDelta - } - - if req.done != nil { - close(req.done) - } - } - case <-l.quit: break out } @@ -1502,6 +1475,9 @@ func (l *channelLink) Peer() Peer { // // NOTE: Part of the ChannelLink interface. func (l *channelLink) ShortChanID() lnwire.ShortChannelID { + l.RLock() + defer l.RUnlock() + return l.shortChanID } @@ -1581,14 +1557,6 @@ func (l *channelLink) AttachMailBox(mailbox MailBox) { l.Unlock() } -// policyUpdate is a message sent to a channel link when an outside sub-system -// wishes to update the current forwarding policy. -type policyUpdate struct { - policy ForwardingPolicy - - done chan struct{} -} - // UpdateForwardingPolicy updates the forwarding policy for the target // ChannelLink. Once updated, the link will use the new forwarding policy to // govern if it an incoming HTLC should be forwarded or not. Note that this @@ -1598,20 +1566,98 @@ type policyUpdate struct { // // NOTE: Part of the ChannelLink interface. func (l *channelLink) UpdateForwardingPolicy(newPolicy ForwardingPolicy) { - cmd := &policyUpdate{ - policy: newPolicy, - done: make(chan struct{}), + l.Lock() + defer l.Unlock() + + // In order to avoid overriding a valid policy with a "null" field in + // the new policy, we'll only update to the set sub policy if the new + // value isn't uninitialized. + if newPolicy.BaseFee != 0 { + l.cfg.FwrdingPolicy.BaseFee = newPolicy.BaseFee + } + if newPolicy.FeeRate != 0 { + l.cfg.FwrdingPolicy.FeeRate = newPolicy.FeeRate + } + if newPolicy.TimeLockDelta != 0 { + l.cfg.FwrdingPolicy.TimeLockDelta = newPolicy.TimeLockDelta + } + if newPolicy.MinHTLC != 0 { + l.cfg.FwrdingPolicy.MinHTLC = newPolicy.MinHTLC + } +} + +// HtlcSatifiesPolicy should return a nil error if the passed HTLC details +// satisfy the current forwarding policy fo the target link. Otherwise, a +// valid protocol failure message should be returned in order to signal to the +// source of the HTLC, the policy consistency issue. +// +// NOTE: Part of the ChannelLink interface. +func (l *channelLink) HtlcSatifiesPolicy(payHash [32]byte, + incomingHtlcAmt, amtToForward lnwire.MilliSatoshi) lnwire.FailureMessage { + + l.RLock() + defer l.RUnlock() + + // As our first sanity check, we'll ensure that the passed HTLC isn't + // too small for the next hop. If so, then we'll cancel the HTLC + // directly. + if amtToForward < l.cfg.FwrdingPolicy.MinHTLC { + l.errorf("outgoing htlc(%x) is too small: min_htlc=%v, "+ + "htlc_value=%v", payHash[:], l.cfg.FwrdingPolicy.MinHTLC, + amtToForward) + + // As part of the returned error, we'll send our latest routing + // policy so the sending node obtains the most up to date data. + var failure lnwire.FailureMessage + update, err := l.cfg.FetchLastChannelUpdate( + l.shortChanID, + ) + if err != nil { + failure = lnwire.NewTemporaryChannelFailure(nil) + } else { + failure = lnwire.NewAmountBelowMinimum( + amtToForward, *update, + ) + } + + return failure } - select { - case l.linkControl <- cmd: - case <-l.quit: + // Next, using the amount of the incoming HTLC, we'll calculate the + // expected fee this incoming HTLC must carry in order to satisfy the + // constraints of the outgoing link. + expectedFee := ExpectedFee(l.cfg.FwrdingPolicy, amtToForward) + + // If the actual fee is less than our expected fee, then we'll reject + // this HTLC as it didn't provide a sufficient amount of fees, or the + // values have been tampered with, or the send used incorrect/dated + // information to construct the forwarding information for this hop. In + // any case, we'll cancel this HTLC. + actualFee := incomingHtlcAmt - amtToForward + if incomingHtlcAmt < amtToForward || actualFee < expectedFee { + l.errorf("outgoing htlc(%x) has insufficient "+ + "fee: expected %v, got %v", payHash[:], + int64(expectedFee), + int64(actualFee)) + + // As part of the returned error, we'll send our latest routing + // policy so the sending node obtains the most up to date data. + var failure lnwire.FailureMessage + update, err := l.cfg.FetchLastChannelUpdate( + l.shortChanID, + ) + if err != nil { + failure = lnwire.NewTemporaryChannelFailure(nil) + } else { + failure = lnwire.NewFeeInsufficient( + amtToForward, *update, + ) + } + + return failure } - select { - case <-cmd.done: - case <-l.quit: - } + return nil } // Stats returns the statistics of channel link. @@ -2101,17 +2147,22 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, htlc: addMsg, obfuscator: obfuscator, } - switchPackets = append(switchPackets, - updatePacket) + switchPackets = append( + switchPackets, updatePacket, + ) continue } + // We'll consult the forwarding policy for this link + // when checking time locked related constraints. + hopPolicy := l.cfg.FwrdingPolicy + // We want to avoid forwarding an HTLC which will // expire in the near future, so we'll reject an HTLC // if its expiration time is too close to the current // height. - timeDelta := l.cfg.FwrdingPolicy.TimeLockDelta + timeDelta := hopPolicy.TimeLockDelta if pd.Timeout-timeDelta <= heightNow { log.Errorf("htlc(%x) has an expiry "+ "that's too soon: outgoing_expiry=%v, "+ @@ -2119,7 +2170,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, pd.Timeout-timeDelta, heightNow) var failure lnwire.FailureMessage - update, err := l.cfg.GetLastChannelUpdate() + update, err := l.cfg.FetchLastChannelUpdate( + l.shortChanID, + ) if err != nil { failure = lnwire.NewTemporaryChannelFailure(nil) } else { @@ -2127,78 +2180,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, } l.sendHTLCError( - pd.HtlcIndex, failure, obfuscator, pd.SourceRef, - ) - needUpdate = true - continue - } - - // As our second sanity check, we'll ensure that the - // passed HTLC isn't too small. If so, then - // we'll cancel the HTLC directly. - if pd.Amount < l.cfg.FwrdingPolicy.MinHTLC { - log.Errorf("Incoming htlc(%x) is too "+ - "small: min_htlc=%v, htlc_value=%v", - pd.RHash[:], l.cfg.FwrdingPolicy.MinHTLC, - pd.Amount) - - // As part of the returned error, we'll send - // our latest routing policy so the sending - // node obtains the most up to date data. - var failure lnwire.FailureMessage - update, err := l.cfg.GetLastChannelUpdate() - if err != nil { - failure = lnwire.NewTemporaryChannelFailure(nil) - } else { - failure = lnwire.NewAmountBelowMinimum( - pd.Amount, *update) - } - - l.sendHTLCError( - pd.HtlcIndex, failure, obfuscator, pd.SourceRef, - ) - needUpdate = true - continue - } - - // Next, using the amount of the incoming HTLC, we'll - // calculate the expected fee this incoming HTLC must - // carry in order to be accepted. - expectedFee := ExpectedFee( - l.cfg.FwrdingPolicy, - fwdInfo.AmountToForward, - ) - - // If the actual fee is less than our expected - // fee, then we'll reject this HTLC as it didn't - // provide a sufficient amount of fees, or the values - // have been tampered with, or the send used - // incorrect/dated information to construct the - // forwarding information for this hop. In any case, - // we'll cancel this HTLC. - actualFee := pd.Amount - fwdInfo.AmountToForward - if pd.Amount < fwdInfo.AmountToForward || - actualFee < expectedFee { - - log.Errorf("Incoming htlc(%x) has insufficient "+ - "fee: expected %v, got %v", pd.RHash[:], - int64(expectedFee), - int64(pd.Amount-fwdInfo.AmountToForward)) - - // As part of the returned error, we'll send - // our latest routing policy so the sending - // node obtains the most up to date data. - var failure lnwire.FailureMessage - update, err := l.cfg.GetLastChannelUpdate() - if err != nil { - failure = lnwire.NewTemporaryChannelFailure(nil) - } else { - failure = lnwire.NewFeeInsufficient(pd.Amount, - *update) - } - - l.sendHTLCError( - pd.HtlcIndex, failure, obfuscator, pd.SourceRef, + pd.HtlcIndex, failure, obfuscator, + pd.SourceRef, ) needUpdate = true continue @@ -2220,7 +2203,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // Grab the latest routing policy so the // sending node is up to date with our current // policy. - update, err := l.cfg.GetLastChannelUpdate() + update, err := l.cfg.FetchLastChannelUpdate( + l.shortChanID, + ) if err != nil { l.fail("unable to create channel update "+ "while handling the error: %v", err) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 0fc30c831..f61205562 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -893,13 +893,37 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { } interfaceLinks, _ := s.getLinks(targetLink.Peer().PubKey()) + // We'll keep track of any HTLC failures during the link + // selection process. This way we can return the error for + // precise link that the sender selected, while optimistically + // trying all links to utilize our available bandwidth. + linkErrs := make(map[lnwire.ShortChannelID]lnwire.FailureMessage) + // Try to find destination channel link with appropriate // bandwidth. var destination ChannelLink for _, link := range interfaceLinks { // We'll skip any links that aren't yet eligible for // forwarding. - if !link.EligibleToForward() { + switch { + case !link.EligibleToForward(): + continue + + // If the link doesn't yet have a source chan ID, then + // we'll skip it as well. + case link.ShortChanID() == sourceHop: + continue + } + + // Before we check the link's bandwidth, we'll ensure + // that the HTLC satisfies the current forwarding + // policy of this target link. + err := link.HtlcSatifiesPolicy( + htlc.PaymentHash, packet.incomingAmount, + packet.amount, + ) + if err != nil { + linkErrs[link.ShortChanID()] = err continue } @@ -910,10 +934,12 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { } } + switch { // If the channel link we're attempting to forward the update - // over has insufficient capacity, then we'll cancel the htlc - // as the payment cannot succeed. - if destination == nil { + // over has insufficient capacity, and didn't violate any + // forwarding policies, then we'll cancel the htlc as the + // payment cannot succeed. + case destination == nil && len(linkErrs) == 0: // If packet was forwarded from another channel link // than we should notify this link that some error // occurred. @@ -923,6 +949,34 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { "%v", htlc.Amount) return s.failAddPacket(packet, failure, addErr) + + // If we had a forwarding failure due to the HTLC not + // satisfying the current policy, then we'll send back an + // error, but ensure we send back the error sourced at the + // *target* link. + case destination == nil && len(linkErrs) != 0: + // At this point, some or all of the links rejected the + // HTLC so we couldn't forward it. So we'll try to look + // up the error that came from the source. + linkErr, ok := linkErrs[packet.outgoingChanID] + if !ok { + // If we can't find the error of the source, + // then we'll return an unknown next peer, + // though this should never happen. + linkErr = &lnwire.FailUnknownNextPeer{} + log.Warnf("unable to find err source for "+ + "outgoing_link=%v, errors=%v", + packet.outgoingChanID, newLogClosure(func() string { + return spew.Sdump(linkErrs) + })) + } + + addErr := fmt.Errorf("incoming HTLC(%x) violated "+ + "target outgoing link (id=%v) policy: %v", + htlc.PaymentHash[:], packet.outgoingChanID, + linkErr) + + return s.failAddPacket(packet, linkErr, addErr) } // Send the packet to the destination channel link which