diff --git a/docs/release-notes/release-notes-0.20.0.md b/docs/release-notes/release-notes-0.20.0.md index 0db6fa11e..85cca3529 100644 --- a/docs/release-notes/release-notes-0.20.0.md +++ b/docs/release-notes/release-notes-0.20.0.md @@ -111,6 +111,9 @@ circuit. The indices are only available for forwarding events saved after v0.20. for payments. Now the payment address is mandatory for the writer and reader of a payment request. +- [Refactored](https://github.com/lightningnetwork/lnd/pull/10018) `channelLink` + to improve readability and maintainability of the code. + ## Breaking Changes ## Performance Improvements diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 33551fa3a..25d06d83c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -36,10 +36,6 @@ import ( "github.com/lightningnetwork/lnd/tlv" ) -func init() { - prand.Seed(time.Now().UnixNano()) -} - const ( // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that // the node accepts for forwarded payments. The value is relative to the @@ -1022,7 +1018,11 @@ func (l *channelLink) syncChanStates(ctx context.Context) error { // immediately so we return to a synchronized state as soon as // possible. for _, msg := range msgsToReSend { - l.cfg.Peer.SendMessage(false, msg) + err := l.cfg.Peer.SendMessage(false, msg) + if err != nil { + l.log.Errorf("failed to send %v: %v", + msg.MsgType(), err) + } } case <-l.cg.Done(): @@ -1252,8 +1252,6 @@ func (l *channelLink) handleChanSyncErr(err error) { // and also the htlc trickle queue+timer for this active channels. // // NOTE: This MUST be run as a goroutine. -// -//nolint:funlen func (l *channelLink) htlcManager(ctx context.Context) { defer func() { l.cfg.BatchTicker.Stop() @@ -1270,43 +1268,14 @@ func (l *channelLink) htlcManager(ctx context.Context) { l.cfg.NotifyActiveLink(l.ChannelPoint()) defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint()) - // TODO(roasbeef): need to call wipe chan whenever D/C? - - // If this isn't the first time that this channel link has been - // created, then we'll need to check to see if we need to - // re-synchronize state with the remote peer. settledHtlcs is a map of - // HTLC's that we re-settled as part of the channel state sync. - if l.cfg.SyncStates { - err := l.syncChanStates(ctx) - if err != nil { - l.handleChanSyncErr(err) - return - } + // If the link is not started for the first time, we need to take extra + // steps to resume its state. + err := l.resumeLink(ctx) + if err != nil { + l.log.Errorf("resuming link failed: %v", err) + return } - // If a shutdown message has previously been sent on this link, then we - // need to make sure that we have disabled any HTLC adds on the outgoing - // direction of the link and that we re-resend the same shutdown message - // that we previously sent. - l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) { - // Immediately disallow any new outgoing HTLCs. - if !l.DisableAdds(Outgoing) { - l.log.Warnf("Outgoing link adds already disabled") - } - - // Re-send the shutdown message the peer. Since syncChanStates - // would have sent any outstanding CommitSig, it is fine for us - // to immediately queue the shutdown message now. - err := l.cfg.Peer.SendMessage(false, &shutdown) - if err != nil { - l.log.Warnf("Error sending shutdown message: %v", err) - } - }) - - // We've successfully reestablished the channel, mark it as such to - // allow the switch to forward HTLCs in the outbound direction. - l.markReestablished() - // Now that we've received both channel_ready and channel reestablish, // we can go ahead and send the active channel notification. We'll also // defer the inactive notification for when the link exits to ensure @@ -1314,47 +1283,6 @@ func (l *channelLink) htlcManager(ctx context.Context) { l.cfg.NotifyActiveChannel(l.ChannelPoint()) defer l.cfg.NotifyInactiveChannel(l.ChannelPoint()) - // With the channel states synced, we now reset the mailbox to ensure - // we start processing all unacked packets in order. This is done here - // to ensure that all acknowledgments that occur during channel - // resynchronization have taken affect, causing us only to pull unacked - // packets after starting to read from the downstream mailbox. - l.mailBox.ResetPackets() - - // After cleaning up any memory pertaining to incoming packets, we now - // replay our forwarding packages to handle any htlcs that can be - // processed locally, or need to be forwarded out to the switch. We will - // only attempt to resolve packages if our short chan id indicates that - // the channel is not pending, otherwise we should have no htlcs to - // reforward. - if l.ShortChanID() != hop.Source { - err := l.resolveFwdPkgs(ctx) - switch err { - // No error was encountered, success. - case nil: - - // If the duplicate keystone error was encountered, we'll fail - // without sending an Error message to the peer. - case ErrDuplicateKeystone: - l.failf(LinkFailureError{code: ErrCircuitError}, - "temporary circuit error: %v", err) - return - - // A non-nil error was encountered, send an Error message to - // the peer. - default: - l.failf(LinkFailureError{code: ErrInternalError}, - "unable to resolve fwd pkgs: %v", err) - return - } - - // With our link's in-memory state fully reconstructed, spawn a - // goroutine to manage the reclamation of disk space occupied by - // completed forwarding packages. - l.cg.WgAdd(1) - go l.fwdPkgGarbager() - } - for { // We must always check if we failed at some point processing // the last update before processing the next. @@ -1363,24 +1291,8 @@ func (l *channelLink) htlcManager(ctx context.Context) { return } - // If the previous event resulted in a non-empty batch, resume - // the batch ticker so that it can be cleared. Otherwise pause - // the ticker to prevent waking up the htlcManager while the - // batch is empty. - numUpdates := l.channel.NumPendingUpdates( - lntypes.Local, lntypes.Remote, - ) - if numUpdates > 0 { - l.cfg.BatchTicker.Resume() - l.log.Tracef("BatchTicker resumed, "+ - "NumPendingUpdates(Local, Remote)=%d", - numUpdates, - ) - } else { - l.cfg.BatchTicker.Pause() - l.log.Trace("BatchTicker paused due to zero " + - "NumPendingUpdates(Local, Remote)") - } + // Pause or resume the batch ticker. + l.toggleBatchTicker() select { // We have a new hook that needs to be run when we reach a clean @@ -1414,50 +1326,10 @@ func (l *channelLink) htlcManager(ctx context.Context) { // fee to see if we should adjust our commitment fee. case <-l.updateFeeTimer.C: l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout()) - - // If we're not the initiator of the channel, don't we - // don't control the fees, so we can ignore this. - if !l.channel.IsInitiator() { - continue - } - - // If we are the initiator, then we'll sample the - // current fee rate to get into the chain within 3 - // blocks. - netFee, err := l.sampleNetworkFee() + err := l.handleUpdateFee(ctx) if err != nil { - l.log.Errorf("unable to sample network fee: %v", - err) - continue - } - - minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW() - - newCommitFee := l.channel.IdealCommitFeeRate( - netFee, minRelayFee, - l.cfg.MaxAnchorsCommitFeeRate, - l.cfg.MaxFeeAllocation, - ) - - // We determine if we should adjust the commitment fee - // based on the current commitment fee, the suggested - // new commitment fee and the current minimum relay fee - // rate. - commitFee := l.channel.CommitFeeRate() - if !shouldAdjustCommitFee( - newCommitFee, commitFee, minRelayFee, - ) { - - continue - } - - // If we do, then we'll send a new UpdateFee message to - // the remote party, to be locked in with a new update. - err = l.updateChannelFee(ctx, newCommitFee) - if err != nil { - l.log.Errorf("unable to update fee rate: %v", - err) - continue + l.log.Errorf("failed to handle update fee: "+ + "%v", err) } // The underlying channel has notified us of a unilateral close @@ -1511,43 +1383,19 @@ func (l *channelLink) htlcManager(ctx context.Context) { // A htlc resolution is received. This means that we now have a // resolution for a previously accepted htlc. case hodlItem := <-l.hodlQueue.ChanOut(): - htlcResolution := hodlItem.(invoices.HtlcResolution) - err := l.processHodlQueue(ctx, htlcResolution) - switch err { - // No error, success. - case nil: - - // If the duplicate keystone error was encountered, - // fail back gracefully. - case ErrDuplicateKeystone: - l.failf(LinkFailureError{ - code: ErrCircuitError, - }, "process hodl queue: "+ - "temporary circuit error: %v", - err, - ) - - // Send an Error message to the peer. - default: - l.failf(LinkFailureError{ - code: ErrInternalError, - }, "process hodl queue: unable to update "+ - "commitment: %v", err, - ) + err := l.handleHtlcResolution(ctx, hodlItem) + if err != nil { + l.log.Errorf("failed to handle htlc "+ + "resolution: %v", err) } + // A user-initiated quiescence request is received. We now + // forward it to the quiescer. case qReq := <-l.quiescenceReqs: - l.quiescer.InitStfu(qReq) - - if l.noDanglingUpdates(lntypes.Local) { - err := l.quiescer.SendOwedStfu() - if err != nil { - l.stfuFailf( - "SendOwedStfu: %s", err.Error(), - ) - res := fn.Err[lntypes.ChannelParty](err) - qReq.Resolve(res) - } + err := l.handleQuiescenceReq(qReq) + if err != nil { + l.log.Errorf("failed handle quiescence "+ + "req: %v", err) } case <-l.cg.Done(): @@ -1775,7 +1623,10 @@ func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context, l.openedCircuits = append(l.openedCircuits, pkt.inKey()) l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) - _ = l.cfg.Peer.SendMessage(false, htlc) + err = l.cfg.Peer.SendMessage(false, htlc) + if err != nil { + l.log.Errorf("failed to send UpdateAddHTLC: %v", err) + } // Send a forward event notification to htlcNotifier. l.cfg.HtlcNotifier.NotifyForwardingEvent( @@ -1819,161 +1670,10 @@ func (l *channelLink) handleDownstreamPkt(ctx context.Context, _ = l.handleDownstreamUpdateAdd(ctx, pkt) case *lnwire.UpdateFulfillHTLC: - // If hodl.SettleOutgoing mode is active, we exit early to - // simulate arbitrary delays between the switch adding the - // SETTLE to the mailbox, and the HTLC being added to the - // commitment state. - if l.cfg.HodlMask.Active(hodl.SettleOutgoing) { - l.log.Warnf(hodl.SettleOutgoing.Warning()) - l.mailBox.AckPacket(pkt.inKey()) - return - } - - // An HTLC we forward to the switch has just settled somewhere - // upstream. Therefore we settle the HTLC within the our local - // state machine. - inKey := pkt.inKey() - err := l.channel.SettleHTLC( - htlc.PaymentPreimage, - pkt.incomingHTLCID, - pkt.sourceRef, - pkt.destRef, - &inKey, - ) - if err != nil { - l.log.Errorf("unable to settle incoming HTLC for "+ - "circuit-key=%v: %v", inKey, err) - - // If the HTLC index for Settle response was not known - // to our commitment state, it has already been - // cleaned up by a prior response. We'll thus try to - // clean up any lingering state to ensure we don't - // continue reforwarding. - if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { - l.cleanupSpuriousResponse(pkt) - } - - // Remove the packet from the link's mailbox to ensure - // it doesn't get replayed after a reconnection. - l.mailBox.AckPacket(inKey) - - return - } - - l.log.Debugf("queueing removal of SETTLE closed circuit: "+ - "%s->%s", pkt.inKey(), pkt.outKey()) - - l.closedCircuits = append(l.closedCircuits, pkt.inKey()) - - // With the HTLC settled, we'll need to populate the wire - // message to target the specific channel and HTLC to be - // canceled. - htlc.ChanID = l.ChanID() - htlc.ID = pkt.incomingHTLCID - - // Then we send the HTLC settle message to the connected peer - // so we can continue the propagation of the settle message. - l.cfg.Peer.SendMessage(false, htlc) - - // Send a settle event notification to htlcNotifier. - l.cfg.HtlcNotifier.NotifySettleEvent( - newHtlcKey(pkt), - htlc.PaymentPreimage, - getEventType(pkt), - ) - - // Immediately update the commitment tx to minimize latency. - l.updateCommitTxOrFail(ctx) + l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc) case *lnwire.UpdateFailHTLC: - // If hodl.FailOutgoing mode is active, we exit early to - // simulate arbitrary delays between the switch adding a FAIL to - // the mailbox, and the HTLC being added to the commitment - // state. - if l.cfg.HodlMask.Active(hodl.FailOutgoing) { - l.log.Warnf(hodl.FailOutgoing.Warning()) - l.mailBox.AckPacket(pkt.inKey()) - return - } - - // An HTLC cancellation has been triggered somewhere upstream, - // we'll remove then HTLC from our local state machine. - inKey := pkt.inKey() - err := l.channel.FailHTLC( - pkt.incomingHTLCID, - htlc.Reason, - pkt.sourceRef, - pkt.destRef, - &inKey, - ) - if err != nil { - l.log.Errorf("unable to cancel incoming HTLC for "+ - "circuit-key=%v: %v", inKey, err) - - // If the HTLC index for Fail response was not known to - // our commitment state, it has already been cleaned up - // by a prior response. We'll thus try to clean up any - // lingering state to ensure we don't continue - // reforwarding. - if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { - l.cleanupSpuriousResponse(pkt) - } - - // Remove the packet from the link's mailbox to ensure - // it doesn't get replayed after a reconnection. - l.mailBox.AckPacket(inKey) - - return - } - - l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s", - pkt.inKey(), pkt.outKey()) - - l.closedCircuits = append(l.closedCircuits, pkt.inKey()) - - // With the HTLC removed, we'll need to populate the wire - // message to target the specific channel and HTLC to be - // canceled. The "Reason" field will have already been set - // within the switch. - htlc.ChanID = l.ChanID() - htlc.ID = pkt.incomingHTLCID - - // We send the HTLC message to the peer which initially created - // the HTLC. If the incoming blinding point is non-nil, we - // know that we are a relaying node in a blinded path. - // Otherwise, we're either an introduction node or not part of - // a blinded path at all. - if err := l.sendIncomingHTLCFailureMsg( - htlc.ID, - pkt.obfuscator, - htlc.Reason, - ); err != nil { - l.log.Errorf("unable to send HTLC failure: %v", - err) - - return - } - - // If the packet does not have a link failure set, it failed - // further down the route so we notify a forwarding failure. - // Otherwise, we notify a link failure because it failed at our - // node. - if pkt.linkFailure != nil { - l.cfg.HtlcNotifier.NotifyLinkFailEvent( - newHtlcKey(pkt), - newHtlcInfo(pkt), - getEventType(pkt), - pkt.linkFailure, - false, - ) - } else { - l.cfg.HtlcNotifier.NotifyForwardingFailEvent( - newHtlcKey(pkt), getEventType(pkt), - ) - } - - // Immediately update the commitment tx to minimize latency. - l.updateCommitTxOrFail(ctx) + l.processLocalUpdateFailHTLC(ctx, pkt, htlc) } } @@ -2055,8 +1755,6 @@ func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { // handleUpstreamMsg processes wire messages related to commitment state // updates from the upstream peer. The upstream peer is the peer whom we have a // direct channel with, updating our respective commitment chains. -// -//nolint:funlen func (l *channelLink) handleUpstreamMsg(ctx context.Context, msg lnwire.Message) { @@ -2070,552 +1768,32 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, return } + var err error + switch msg := msg.(type) { case *lnwire.UpdateAddHTLC: - if l.IsFlushing(Incoming) { - // This is forbidden by the protocol specification. - // The best chance we have to deal with this is to drop - // the connection. This should roll back the channel - // state to the last CommitSig. If the remote has - // already sent a CommitSig we haven't received yet, - // channel state will be re-synchronized with a - // ChannelReestablish message upon reconnection and the - // protocol state that caused us to flush the link will - // be rolled back. In the event that there was some - // non-deterministic behavior in the remote that caused - // them to violate the protocol, we have a decent shot - // at correcting it this way, since reconnecting will - // put us in the cleanest possible state to try again. - // - // In addition to the above, it is possible for us to - // hit this case in situations where we improperly - // handle message ordering due to concurrency choices. - // An issue has been filed to address this here: - // https://github.com/lightningnetwork/lnd/issues/8393 - l.failf( - LinkFailureError{ - code: ErrInvalidUpdate, - FailureAction: LinkFailureDisconnect, - PermanentFailure: false, - Warning: true, - }, - "received add while link is flushing", - ) - - return - } - - // Disallow htlcs with blinding points set if we haven't - // enabled the feature. This saves us from having to process - // the onion at all, but will only catch blinded payments - // where we are a relaying node (as the blinding point will - // be in the payload when we're the introduction node). - if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "blinding point included when route blinding "+ - "is disabled") - - return - } - - // We have to check the limit here rather than later in the - // switch because the counterparty can keep sending HTLC's - // without sending a revoke. This would mean that the switch - // check would only occur later. - if l.isOverexposedWithHtlc(msg, true) { - l.failf(LinkFailureError{code: ErrInternalError}, - "peer sent us an HTLC that exceeded our max "+ - "fee exposure") - - return - } - - // We just received an add request from an upstream peer, so we - // add it to our state machine, then add the HTLC to our - // "settle" list in the event that we know the preimage. - index, err := l.channel.ReceiveHTLC(msg) - if err != nil { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "unable to handle upstream add HTLC: %v", err) - return - } - - l.log.Tracef("receive upstream htlc with payment hash(%x), "+ - "assigning index: %v", msg.PaymentHash[:], index) + err = l.processRemoteUpdateAddHTLC(msg) case *lnwire.UpdateFulfillHTLC: - pre := msg.PaymentPreimage - idx := msg.ID - - // Before we pipeline the settle, we'll check the set of active - // htlc's to see if the related UpdateAddHTLC has been fully - // locked-in. - var lockedin bool - htlcs := l.channel.ActiveHtlcs() - for _, add := range htlcs { - // The HTLC will be outgoing and match idx. - if !add.Incoming && add.HtlcIndex == idx { - lockedin = true - break - } - } - - if !lockedin { - l.failf( - LinkFailureError{code: ErrInvalidUpdate}, - "unable to handle upstream settle", - ) - return - } - - if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { - l.failf( - LinkFailureError{ - code: ErrInvalidUpdate, - FailureAction: LinkFailureForceClose, - }, - "unable to handle upstream settle HTLC: %v", err, - ) - return - } - - settlePacket := &htlcPacket{ - outgoingChanID: l.ShortChanID(), - outgoingHTLCID: idx, - htlc: &lnwire.UpdateFulfillHTLC{ - PaymentPreimage: pre, - }, - } - - // Add the newly discovered preimage to our growing list of - // uncommitted preimage. These will be written to the witness - // cache just before accepting the next commitment signature - // from the remote peer. - l.uncommittedPreimages = append(l.uncommittedPreimages, pre) - - // Pipeline this settle, send it to the switch. - go l.forwardBatch(false, settlePacket) + err = l.processRemoteUpdateFulfillHTLC(msg) case *lnwire.UpdateFailMalformedHTLC: - // Convert the failure type encoded within the HTLC fail - // message to the proper generic lnwire error code. - var failure lnwire.FailureMessage - switch msg.FailureCode { - case lnwire.CodeInvalidOnionVersion: - failure = &lnwire.FailInvalidOnionVersion{ - OnionSHA256: msg.ShaOnionBlob, - } - case lnwire.CodeInvalidOnionHmac: - failure = &lnwire.FailInvalidOnionHmac{ - OnionSHA256: msg.ShaOnionBlob, - } - - case lnwire.CodeInvalidOnionKey: - failure = &lnwire.FailInvalidOnionKey{ - OnionSHA256: msg.ShaOnionBlob, - } - - // Handle malformed errors that are part of a blinded route. - // This case is slightly different, because we expect every - // relaying node in the blinded portion of the route to send - // malformed errors. If we're also a relaying node, we're - // likely going to switch this error out anyway for our own - // malformed error, but we handle the case here for - // completeness. - case lnwire.CodeInvalidBlinding: - failure = &lnwire.FailInvalidBlinding{ - OnionSHA256: msg.ShaOnionBlob, - } - - default: - l.log.Warnf("unexpected failure code received in "+ - "UpdateFailMailformedHTLC: %v", msg.FailureCode) - - // We don't just pass back the error we received from - // our successor. Otherwise we might report a failure - // that penalizes us more than needed. If the onion that - // we forwarded was correct, the node should have been - // able to send back its own failure. The node did not - // send back its own failure, so we assume there was a - // problem with the onion and report that back. We reuse - // the invalid onion key failure because there is no - // specific error for this case. - failure = &lnwire.FailInvalidOnionKey{ - OnionSHA256: msg.ShaOnionBlob, - } - } - - // With the error parsed, we'll convert the into it's opaque - // form. - var b bytes.Buffer - if err := lnwire.EncodeFailure(&b, failure, 0); err != nil { - l.log.Errorf("unable to encode malformed error: %v", err) - return - } - - // If remote side have been unable to parse the onion blob we - // have sent to it, than we should transform the malformed HTLC - // message to the usual HTLC fail message. - err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes()) - if err != nil { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "unable to handle upstream fail HTLC: %v", err) - return - } + err = l.processRemoteUpdateFailMalformedHTLC(msg) case *lnwire.UpdateFailHTLC: - // Verify that the failure reason is at least 256 bytes plus - // overhead. - const minimumFailReasonLength = lnwire.FailureMessageLength + - 2 + 2 + 32 - - if len(msg.Reason) < minimumFailReasonLength { - // We've received a reason with a non-compliant length. - // Older nodes happily relay back these failures that - // may originate from a node further downstream. - // Therefore we can't just fail the channel. - // - // We want to be compliant ourselves, so we also can't - // pass back the reason unmodified. And we must make - // sure that we don't hit the magic length check of 260 - // bytes in processRemoteSettleFails either. - // - // Because the reason is unreadable for the payer - // anyway, we just replace it by a compliant-length - // series of random bytes. - msg.Reason = make([]byte, minimumFailReasonLength) - _, err := crand.Read(msg.Reason[:]) - if err != nil { - l.log.Errorf("Random generation error: %v", err) - - return - } - } - - // Add fail to the update log. - idx := msg.ID - err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) - if err != nil { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "unable to handle upstream fail HTLC: %v", err) - return - } + err = l.processRemoteUpdateFailHTLC(msg) case *lnwire.CommitSig: - // Since we may have learned new preimages for the first time, - // we'll add them to our preimage cache. By doing this, we - // ensure any contested contracts watched by any on-chain - // arbitrators can now sweep this HTLC on-chain. We delay - // committing the preimages until just before accepting the new - // remote commitment, as afterwards the peer won't resend the - // Settle messages on the next channel reestablishment. Doing so - // allows us to more effectively batch this operation, instead - // of doing a single write per preimage. - err := l.cfg.PreimageCache.AddPreimages( - l.uncommittedPreimages..., - ) - if err != nil { - l.failf( - LinkFailureError{code: ErrInternalError}, - "unable to add preimages=%v to cache: %v", - l.uncommittedPreimages, err, - ) - return - } - - // Instead of truncating the slice to conserve memory - // allocations, we simply set the uncommitted preimage slice to - // nil so that a new one will be initialized if any more - // witnesses are discovered. We do this because the maximum size - // that the slice can occupy is 15KB, and we want to ensure we - // release that memory back to the runtime. - l.uncommittedPreimages = nil - - // We just received a new updates to our local commitment - // chain, validate this new commitment, closing the link if - // invalid. - auxSigBlob, err := msg.CustomRecords.Serialize() - if err != nil { - l.failf( - LinkFailureError{code: ErrInvalidCommitment}, - "unable to serialize custom records: %v", err, - ) - - return - } - err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{ - CommitSig: msg.CommitSig, - HtlcSigs: msg.HtlcSigs, - PartialSig: msg.PartialSig, - AuxSigBlob: auxSigBlob, - }) - if err != nil { - // If we were unable to reconstruct their proposed - // commitment, then we'll examine the type of error. If - // it's an InvalidCommitSigError, then we'll send a - // direct error. - var sendData []byte - switch err.(type) { - case *lnwallet.InvalidCommitSigError: - sendData = []byte(err.Error()) - case *lnwallet.InvalidHtlcSigError: - sendData = []byte(err.Error()) - } - l.failf( - LinkFailureError{ - code: ErrInvalidCommitment, - FailureAction: LinkFailureForceClose, - SendData: sendData, - }, - "ChannelPoint(%v): unable to accept new "+ - "commitment: %v", - l.channel.ChannelPoint(), err, - ) - return - } - - // As we've just accepted a new state, we'll now - // immediately send the remote peer a revocation for our prior - // state. - nextRevocation, currentHtlcs, finalHTLCs, err := - l.channel.RevokeCurrentCommitment() - if err != nil { - l.log.Errorf("unable to revoke commitment: %v", err) - - // We need to fail the channel in case revoking our - // local commitment does not succeed. We might have - // already advanced our channel state which would lead - // us to proceed with an unclean state. - // - // NOTE: We do not trigger a force close because this - // could resolve itself in case our db was just busy - // not accepting new transactions. - l.failf( - LinkFailureError{ - code: ErrInternalError, - Warning: true, - FailureAction: LinkFailureDisconnect, - }, - "ChannelPoint(%v): unable to accept new "+ - "commitment: %v", - l.channel.ChannelPoint(), err, - ) - return - } - - // As soon as we are ready to send our next revocation, we can - // invoke the incoming commit hooks. - l.RWMutex.Lock() - l.incomingCommitHooks.invoke() - l.RWMutex.Unlock() - - l.cfg.Peer.SendMessage(false, nextRevocation) - - // Notify the incoming htlcs of which the resolutions were - // locked in. - for id, settled := range finalHTLCs { - l.cfg.HtlcNotifier.NotifyFinalHtlcEvent( - models.CircuitKey{ - ChanID: l.ShortChanID(), - HtlcID: id, - }, - channeldb.FinalHtlcInfo{ - Settled: settled, - Offchain: true, - }, - ) - } - - // Since we just revoked our commitment, we may have a new set - // of HTLC's on our commitment, so we'll send them using our - // function closure NotifyContractUpdate. - newUpdate := &contractcourt.ContractUpdate{ - HtlcKey: contractcourt.LocalHtlcSet, - Htlcs: currentHtlcs, - } - err = l.cfg.NotifyContractUpdate(newUpdate) - if err != nil { - l.log.Errorf("unable to notify contract update: %v", - err) - return - } - - select { - case <-l.cg.Done(): - return - default: - } - - // If the remote party initiated the state transition, - // we'll reply with a signature to provide them with their - // version of the latest commitment. Otherwise, both commitment - // chains are fully synced from our PoV, then we don't need to - // reply with a signature as both sides already have a - // commitment with the latest accepted. - if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail(ctx) { - return - } - } - - // If we need to send out an Stfu, this would be the time to do - // so. - if l.noDanglingUpdates(lntypes.Local) { - err = l.quiescer.SendOwedStfu() - if err != nil { - l.stfuFailf("sendOwedStfu: %v", err.Error()) - } - } - - // Now that we have finished processing the incoming CommitSig - // and sent out our RevokeAndAck, we invoke the flushHooks if - // the channel state is clean. - l.RWMutex.Lock() - if l.channel.IsChannelClean() { - l.flushHooks.invoke() - } - l.RWMutex.Unlock() + err = l.processRemoteCommitSig(ctx, msg) case *lnwire.RevokeAndAck: - // We've received a revocation from the remote chain, if valid, - // this moves the remote chain forward, and expands our - // revocation window. - - // We now process the message and advance our remote commit - // chain. - fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg) - if err != nil { - // TODO(halseth): force close? - l.failf( - LinkFailureError{ - code: ErrInvalidRevocation, - FailureAction: LinkFailureDisconnect, - }, - "unable to accept revocation: %v", err, - ) - return - } - - // The remote party now has a new primary commitment, so we'll - // update the contract court to be aware of this new set (the - // prior old remote pending). - newUpdate := &contractcourt.ContractUpdate{ - HtlcKey: contractcourt.RemoteHtlcSet, - Htlcs: remoteHTLCs, - } - err = l.cfg.NotifyContractUpdate(newUpdate) - if err != nil { - l.log.Errorf("unable to notify contract update: %v", - err) - return - } - - select { - case <-l.cg.Done(): - return - default: - } - - // If we have a tower client for this channel type, we'll - // create a backup for the current state. - if l.cfg.TowerClient != nil { - state := l.channel.State() - chanID := l.ChanID() - - err = l.cfg.TowerClient.BackupState( - &chanID, state.RemoteCommitment.CommitHeight-1, - ) - if err != nil { - l.failf(LinkFailureError{ - code: ErrInternalError, - }, "unable to queue breach backup: %v", err) - return - } - } - - // If we can send updates then we can process adds in case we - // are the exit hop and need to send back resolutions, or in - // case there are validity issues with the packets. Otherwise - // we defer the action until resume. - // - // We are free to process the settles and fails without this - // check since processing those can't result in further updates - // to this channel link. - if l.quiescer.CanSendUpdates() { - l.processRemoteAdds(fwdPkg) - } else { - l.quiescer.OnResume(func() { - l.processRemoteAdds(fwdPkg) - }) - } - l.processRemoteSettleFails(fwdPkg) - - // If the link failed during processing the adds, we must - // return to ensure we won't attempted to update the state - // further. - if l.failed { - return - } - - // The revocation window opened up. If there are pending local - // updates, try to update the commit tx. Pending updates could - // already have been present because of a previously failed - // update to the commit tx or freshly added in by - // processRemoteAdds. Also in case there are no local updates, - // but there are still remote updates that are not in the remote - // commit tx yet, send out an update. - if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail(ctx) { - return - } - } - - // Now that we have finished processing the RevokeAndAck, we - // can invoke the flushHooks if the channel state is clean. - l.RWMutex.Lock() - if l.channel.IsChannelClean() { - l.flushHooks.invoke() - } - l.RWMutex.Unlock() + err = l.processRemoteRevokeAndAck(ctx, msg) case *lnwire.UpdateFee: - // Check and see if their proposed fee-rate would make us - // exceed the fee threshold. - fee := chainfee.SatPerKWeight(msg.FeePerKw) - - isDust, err := l.exceedsFeeExposureLimit(fee) - if err != nil { - // This shouldn't typically happen. If it does, it - // indicates something is wrong with our channel state. - l.log.Errorf("Unable to determine if fee threshold " + - "exceeded") - l.failf(LinkFailureError{code: ErrInternalError}, - "error calculating fee exposure: %v", err) - - return - } - - if isDust { - // The proposed fee-rate makes us exceed the fee - // threshold. - l.failf(LinkFailureError{code: ErrInternalError}, - "fee threshold exceeded: %v", err) - return - } - - // We received fee update from peer. If we are the initiator we - // will fail the channel, if not we will apply the update. - if err := l.channel.ReceiveUpdateFee(fee); err != nil { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "error receiving fee update: %v", err) - return - } - - // Update the mailbox's feerate as well. - l.mailBox.SetFeeRate(fee) + err = l.processRemoteUpdateFee(msg) case *lnwire.Stfu: - err := l.handleStfu(msg) + err = l.handleStfu(msg) if err != nil { l.stfuFailf("handleStfu: %v", err.Error()) } @@ -2628,28 +1806,16 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, msg.Warning()) case *lnwire.Error: - // Error received from remote, MUST fail channel, but should - // only print the contents of the error message if all - // characters are printable ASCII. - l.failf( - LinkFailureError{ - code: ErrRemoteError, + l.processRemoteError(msg) - // TODO(halseth): we currently don't fail the - // channel permanently, as there are some sync - // issues with other implementations that will - // lead to them sending an error message, but - // we can recover from on next connection. See - // https://github.com/ElementsProject/lightning/issues/4212 - PermanentFailure: false, - }, - "ChannelPoint(%v): received error from peer: %v", - l.channel.ChannelPoint(), msg.Error(), - ) default: l.log.Warnf("received unknown message of type %T", msg) } + if err != nil { + l.log.Errorf("failed to process remote %v: %v", msg.MsgType(), + err) + } } // handleStfu implements the top-level logic for handling the Stfu message from @@ -2757,13 +1923,13 @@ func (l *channelLink) ackDownStreamPackets() error { // the link. func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool { err := l.updateCommitTx(ctx) - switch err { + switch { // No error encountered, success. - case nil: + case err == nil: // A duplicate keystone error should be resolved and is not fatal, so // we won't send an Error message to the peer. - case ErrDuplicateKeystone: + case errors.Is(err, ErrDuplicateKeystone): l.failf(LinkFailureError{code: ErrCircuitError}, "temporary circuit error: %v", err) return false @@ -2862,7 +2028,10 @@ func (l *channelLink) updateCommitTx(ctx context.Context) error { PartialSig: newCommit.PartialSig, CustomRecords: auxBlobRecords, } - l.cfg.Peer.SendMessage(false, commitSig) + err = l.cfg.Peer.SendMessage(false, commitSig) + if err != nil { + l.log.Errorf("failed to send CommitSig: %v", err) + } // Now that we have sent out a new CommitSig, we invoke the outgoing set // of commit hooks. @@ -4323,11 +3492,14 @@ func (l *channelLink) settleHTLC(preimage lntypes.Preimage, // HTLC was successfully settled locally send notification about it // remote peer. - l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{ + err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{ ChanID: l.ChanID(), ID: htlcIndex, PaymentPreimage: preimage, }) + if err != nil { + l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err) + } // Once we have successfully settled the htlc, notify a settle event. l.cfg.HtlcNotifier.NotifySettleEvent( @@ -4522,12 +3694,15 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64, return } - l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{ + err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{ ChanID: l.ChanID(), ID: htlcIndex, ShaOnionBlob: shaOnionBlob, FailureCode: code, }) + if err != nil { + l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err) + } } // failf is a function which is used to encapsulate the action necessary for @@ -4579,3 +3754,959 @@ func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] { return l.channel.LocalCommitmentBlob() } + +// handleHtlcResolution takes an HTLC resolution and processes it by draining +// the hodlQueue. Once processed, a commit_sig is sent to the remote to update +// their commitment. +func (l *channelLink) handleHtlcResolution(ctx context.Context, + hodlItem any) error { + + htlcResolution, ok := hodlItem.(invoices.HtlcResolution) + if !ok { + return fmt.Errorf("expect HtlcResolution, got %T", hodlItem) + } + + err := l.processHodlQueue(ctx, htlcResolution) + // No error, success. + if err == nil { + return nil + } + + switch { + // If the duplicate keystone error was encountered, fail back + // gracefully. + case errors.Is(err, ErrDuplicateKeystone): + l.failf( + LinkFailureError{ + code: ErrCircuitError, + }, + "process hodl queue: temporary circuit error: %v", err, + ) + + // Send an Error message to the peer. + default: + l.failf( + LinkFailureError{ + code: ErrInternalError, + }, + "process hodl queue: unable to update commitment: %v", + err, + ) + } + + return err +} + +// handleQuiescenceReq takes a locally initialized (RPC) quiescence request and +// forwards it to the quiescer for further processing. +func (l *channelLink) handleQuiescenceReq(req StfuReq) error { + l.quiescer.InitStfu(req) + + if !l.noDanglingUpdates(lntypes.Local) { + return nil + } + + err := l.quiescer.SendOwedStfu() + if err != nil { + l.stfuFailf("SendOwedStfu: %s", err.Error()) + res := fn.Err[lntypes.ChannelParty](err) + req.Resolve(res) + } + + return err +} + +// handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to +// decide whether we should send an `update_fee` msg to update the commitment's +// feerate. +func (l *channelLink) handleUpdateFee(ctx context.Context) error { + // If we're not the initiator of the channel, we don't control the fees, + // so we can ignore this. + if !l.channel.IsInitiator() { + return nil + } + + // If we are the initiator, then we'll sample the current fee rate to + // get into the chain within 3 blocks. + netFee, err := l.sampleNetworkFee() + if err != nil { + return fmt.Errorf("unable to sample network fee: %w", err) + } + + minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW() + + newCommitFee := l.channel.IdealCommitFeeRate( + netFee, minRelayFee, + l.cfg.MaxAnchorsCommitFeeRate, + l.cfg.MaxFeeAllocation, + ) + + // We determine if we should adjust the commitment fee based on the + // current commitment fee, the suggested new commitment fee and the + // current minimum relay fee rate. + commitFee := l.channel.CommitFeeRate() + if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) { + return nil + } + + // If we do, then we'll send a new UpdateFee message to the remote + // party, to be locked in with a new update. + err = l.updateChannelFee(ctx, newCommitFee) + if err != nil { + return fmt.Errorf("unable to update fee rate: %w", err) + } + + return nil +} + +// toggleBatchTicker checks whether we need to resume or pause the batch ticker. +// When we have no pending updates, the ticker is paused, otherwise resumed. +func (l *channelLink) toggleBatchTicker() { + // If the previous event resulted in a non-empty batch, resume the batch + // ticker so that it can be cleared. Otherwise pause the ticker to + // prevent waking up the htlcManager while the batch is empty. + numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) + if numUpdates > 0 { + l.cfg.BatchTicker.Resume() + l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+ + "Remote)=%d", numUpdates) + + return + } + + l.cfg.BatchTicker.Pause() + l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" + + "(Local, Remote)") +} + +// resumeLink is called when starting a previous link. It will go through the +// reestablishment protocol and reforwarding packets that are yet resolved. +func (l *channelLink) resumeLink(ctx context.Context) error { + // If this isn't the first time that this channel link has been created, + // then we'll need to check to see if we need to re-synchronize state + // with the remote peer. settledHtlcs is a map of HTLC's that we + // re-settled as part of the channel state sync. + if l.cfg.SyncStates { + err := l.syncChanStates(ctx) + if err != nil { + l.handleChanSyncErr(err) + + return err + } + } + + // If a shutdown message has previously been sent on this link, then we + // need to make sure that we have disabled any HTLC adds on the outgoing + // direction of the link and that we re-resend the same shutdown message + // that we previously sent. + // + // TODO(yy): we should either move this to chanCloser, or move all + // shutdown handling logic to be managed by the link, but not a mixed of + // partial management by two subsystems. + l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) { + // Immediately disallow any new outgoing HTLCs. + if !l.DisableAdds(Outgoing) { + l.log.Warnf("Outgoing link adds already disabled") + } + + // Re-send the shutdown message the peer. Since syncChanStates + // would have sent any outstanding CommitSig, it is fine for us + // to immediately queue the shutdown message now. + err := l.cfg.Peer.SendMessage(false, &shutdown) + if err != nil { + l.log.Warnf("Error sending shutdown message: %v", err) + } + }) + + // We've successfully reestablished the channel, mark it as such to + // allow the switch to forward HTLCs in the outbound direction. + l.markReestablished() + + // With the channel states synced, we now reset the mailbox to ensure we + // start processing all unacked packets in order. This is done here to + // ensure that all acknowledgments that occur during channel + // resynchronization have taken affect, causing us only to pull unacked + // packets after starting to read from the downstream mailbox. + err := l.mailBox.ResetPackets() + if err != nil { + l.log.Errorf("failed to reset packets: %v", err) + } + + // If the channel is pending, there's no need to reforwarding packets. + if l.ShortChanID() == hop.Source { + return nil + } + + // After cleaning up any memory pertaining to incoming packets, we now + // replay our forwarding packages to handle any htlcs that can be + // processed locally, or need to be forwarded out to the switch. We will + // only attempt to resolve packages if our short chan id indicates that + // the channel is not pending, otherwise we should have no htlcs to + // reforward. + err = l.resolveFwdPkgs(ctx) + switch { + // No error was encountered, success. + case err == nil: + // With our link's in-memory state fully reconstructed, spawn a + // goroutine to manage the reclamation of disk space occupied by + // completed forwarding packages. + l.cg.WgAdd(1) + go l.fwdPkgGarbager() + + return nil + + // If the duplicate keystone error was encountered, we'll fail without + // sending an Error message to the peer. + case errors.Is(err, ErrDuplicateKeystone): + l.failf(LinkFailureError{code: ErrCircuitError}, + "temporary circuit error: %v", err) + + // A non-nil error was encountered, send an Error message to + // the peer. + default: + l.failf(LinkFailureError{code: ErrInternalError}, + "unable to resolve fwd pkgs: %v", err) + } + + return err +} + +// processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote +// and processes it. +func (l *channelLink) processRemoteUpdateAddHTLC( + msg *lnwire.UpdateAddHTLC) error { + + if l.IsFlushing(Incoming) { + // This is forbidden by the protocol specification. The best + // chance we have to deal with this is to drop the connection. + // This should roll back the channel state to the last + // CommitSig. If the remote has already sent a CommitSig we + // haven't received yet, channel state will be re-synchronized + // with a ChannelReestablish message upon reconnection and the + // protocol state that caused us to flush the link will be + // rolled back. In the event that there was some + // non-deterministic behavior in the remote that caused them to + // violate the protocol, we have a decent shot at correcting it + // this way, since reconnecting will put us in the cleanest + // possible state to try again. + // + // In addition to the above, it is possible for us to hit this + // case in situations where we improperly handle message + // ordering due to concurrency choices. An issue has been filed + // to address this here: + // https://github.com/lightningnetwork/lnd/issues/8393 + err := errors.New("received add while link is flushing") + l.failf( + LinkFailureError{ + code: ErrInvalidUpdate, + FailureAction: LinkFailureDisconnect, + PermanentFailure: false, + Warning: true, + }, err.Error(), + ) + + return err + } + + // Disallow htlcs with blinding points set if we haven't enabled the + // feature. This saves us from having to process the onion at all, but + // will only catch blinded payments where we are a relaying node (as the + // blinding point will be in the payload when we're the introduction + // node). + if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding { + err := errors.New("blinding point included when route " + + "blinding is disabled") + + l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error()) + + return err + } + + // We have to check the limit here rather than later in the switch + // because the counterparty can keep sending HTLC's without sending a + // revoke. This would mean that the switch check would only occur later. + if l.isOverexposedWithHtlc(msg, true) { + err := errors.New("peer sent us an HTLC that exceeded our " + + "max fee exposure") + l.failf(LinkFailureError{code: ErrInternalError}, err.Error()) + + return err + } + + // We just received an add request from an upstream peer, so we add it + // to our state machine, then add the HTLC to our "settle" list in the + // event that we know the preimage. + index, err := l.channel.ReceiveHTLC(msg) + if err != nil { + l.failf(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream add HTLC: %v", err) + + return err + } + + l.log.Tracef("receive upstream htlc with payment hash(%x), "+ + "assigning index: %v", msg.PaymentHash[:], index) + + return nil +} + +// processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the +// remote and processes it. +func (l *channelLink) processRemoteUpdateFulfillHTLC( + msg *lnwire.UpdateFulfillHTLC) error { + + pre := msg.PaymentPreimage + idx := msg.ID + + // Before we pipeline the settle, we'll check the set of active htlc's + // to see if the related UpdateAddHTLC has been fully locked-in. + var lockedin bool + htlcs := l.channel.ActiveHtlcs() + for _, add := range htlcs { + // The HTLC will be outgoing and match idx. + if !add.Incoming && add.HtlcIndex == idx { + lockedin = true + break + } + } + + if !lockedin { + err := errors.New("unable to handle upstream settle") + l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error()) + + return err + } + + if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { + l.failf( + LinkFailureError{ + code: ErrInvalidUpdate, + FailureAction: LinkFailureForceClose, + }, + "unable to handle upstream settle HTLC: %v", err, + ) + + return err + } + + settlePacket := &htlcPacket{ + outgoingChanID: l.ShortChanID(), + outgoingHTLCID: idx, + htlc: &lnwire.UpdateFulfillHTLC{ + PaymentPreimage: pre, + }, + } + + // Add the newly discovered preimage to our growing list of uncommitted + // preimage. These will be written to the witness cache just before + // accepting the next commitment signature from the remote peer. + l.uncommittedPreimages = append(l.uncommittedPreimages, pre) + + // Pipeline this settle, send it to the switch. + go l.forwardBatch(false, settlePacket) + + return nil +} + +// processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg +// sent from the remote and processes it. +func (l *channelLink) processRemoteUpdateFailMalformedHTLC( + msg *lnwire.UpdateFailMalformedHTLC) error { + + // Convert the failure type encoded within the HTLC fail message to the + // proper generic lnwire error code. + var failure lnwire.FailureMessage + switch msg.FailureCode { + case lnwire.CodeInvalidOnionVersion: + failure = &lnwire.FailInvalidOnionVersion{ + OnionSHA256: msg.ShaOnionBlob, + } + case lnwire.CodeInvalidOnionHmac: + failure = &lnwire.FailInvalidOnionHmac{ + OnionSHA256: msg.ShaOnionBlob, + } + + case lnwire.CodeInvalidOnionKey: + failure = &lnwire.FailInvalidOnionKey{ + OnionSHA256: msg.ShaOnionBlob, + } + + // Handle malformed errors that are part of a blinded route. This case + // is slightly different, because we expect every relaying node in the + // blinded portion of the route to send malformed errors. If we're also + // a relaying node, we're likely going to switch this error out anyway + // for our own malformed error, but we handle the case here for + // completeness. + case lnwire.CodeInvalidBlinding: + failure = &lnwire.FailInvalidBlinding{ + OnionSHA256: msg.ShaOnionBlob, + } + + default: + l.log.Warnf("unexpected failure code received in "+ + "UpdateFailMailformedHTLC: %v", msg.FailureCode) + + // We don't just pass back the error we received from our + // successor. Otherwise we might report a failure that penalizes + // us more than needed. If the onion that we forwarded was + // correct, the node should have been able to send back its own + // failure. The node did not send back its own failure, so we + // assume there was a problem with the onion and report that + // back. We reuse the invalid onion key failure because there is + // no specific error for this case. + failure = &lnwire.FailInvalidOnionKey{ + OnionSHA256: msg.ShaOnionBlob, + } + } + + // With the error parsed, we'll convert the into it's opaque form. + var b bytes.Buffer + if err := lnwire.EncodeFailure(&b, failure, 0); err != nil { + return fmt.Errorf("unable to encode malformed error: %w", err) + } + + // If remote side have been unable to parse the onion blob we have sent + // to it, than we should transform the malformed HTLC message to the + // usual HTLC fail message. + err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes()) + if err != nil { + l.failf(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream fail HTLC: %v", err) + + return err + } + + return nil +} + +// processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the +// remote and processes it. +func (l *channelLink) processRemoteUpdateFailHTLC( + msg *lnwire.UpdateFailHTLC) error { + + // Verify that the failure reason is at least 256 bytes plus overhead. + const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32 + + if len(msg.Reason) < minimumFailReasonLength { + // We've received a reason with a non-compliant length. Older + // nodes happily relay back these failures that may originate + // from a node further downstream. Therefore we can't just fail + // the channel. + // + // We want to be compliant ourselves, so we also can't pass back + // the reason unmodified. And we must make sure that we don't + // hit the magic length check of 260 bytes in + // processRemoteSettleFails either. + // + // Because the reason is unreadable for the payer anyway, we + // just replace it by a compliant-length series of random bytes. + msg.Reason = make([]byte, minimumFailReasonLength) + _, err := crand.Read(msg.Reason[:]) + if err != nil { + return fmt.Errorf("random generation error: %w", err) + } + } + + // Add fail to the update log. + idx := msg.ID + err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) + if err != nil { + l.failf(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream fail HTLC: %v", err) + + return err + } + + return nil +} + +// processRemoteCommitSig takes a `CommitSig` msg sent from the remote and +// processes it. +func (l *channelLink) processRemoteCommitSig(ctx context.Context, + msg *lnwire.CommitSig) error { + + // Since we may have learned new preimages for the first time, we'll add + // them to our preimage cache. By doing this, we ensure any contested + // contracts watched by any on-chain arbitrators can now sweep this HTLC + // on-chain. We delay committing the preimages until just before + // accepting the new remote commitment, as afterwards the peer won't + // resend the Settle messages on the next channel reestablishment. Doing + // so allows us to more effectively batch this operation, instead of + // doing a single write per preimage. + err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...) + if err != nil { + l.failf( + LinkFailureError{code: ErrInternalError}, + "unable to add preimages=%v to cache: %v", + l.uncommittedPreimages, err, + ) + + return err + } + + // Instead of truncating the slice to conserve memory allocations, we + // simply set the uncommitted preimage slice to nil so that a new one + // will be initialized if any more witnesses are discovered. We do this + // because the maximum size that the slice can occupy is 15KB, and we + // want to ensure we release that memory back to the runtime. + l.uncommittedPreimages = nil + + // We just received a new updates to our local commitment chain, + // validate this new commitment, closing the link if invalid. + auxSigBlob, err := msg.CustomRecords.Serialize() + if err != nil { + l.failf( + LinkFailureError{code: ErrInvalidCommitment}, + "unable to serialize custom records: %v", err, + ) + + return err + } + err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{ + CommitSig: msg.CommitSig, + HtlcSigs: msg.HtlcSigs, + PartialSig: msg.PartialSig, + AuxSigBlob: auxSigBlob, + }) + if err != nil { + // If we were unable to reconstruct their proposed commitment, + // then we'll examine the type of error. If it's an + // InvalidCommitSigError, then we'll send a direct error. + var sendData []byte + switch { + case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err): + sendData = []byte(err.Error()) + case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err): + sendData = []byte(err.Error()) + } + l.failf( + LinkFailureError{ + code: ErrInvalidCommitment, + FailureAction: LinkFailureForceClose, + SendData: sendData, + }, + "ChannelPoint(%v): unable to accept new "+ + "commitment: %v", + l.channel.ChannelPoint(), err, + ) + + return err + } + + // As we've just accepted a new state, we'll now immediately send the + // remote peer a revocation for our prior state. + nextRevocation, currentHtlcs, finalHTLCs, err := + l.channel.RevokeCurrentCommitment() + if err != nil { + l.log.Errorf("unable to revoke commitment: %v", err) + + // We need to fail the channel in case revoking our local + // commitment does not succeed. We might have already advanced + // our channel state which would lead us to proceed with an + // unclean state. + // + // NOTE: We do not trigger a force close because this could + // resolve itself in case our db was just busy not accepting new + // transactions. + l.failf( + LinkFailureError{ + code: ErrInternalError, + Warning: true, + FailureAction: LinkFailureDisconnect, + }, + "ChannelPoint(%v): unable to accept new "+ + "commitment: %v", + l.channel.ChannelPoint(), err, + ) + + return err + } + + // As soon as we are ready to send our next revocation, we can invoke + // the incoming commit hooks. + l.Lock() + l.incomingCommitHooks.invoke() + l.Unlock() + + err = l.cfg.Peer.SendMessage(false, nextRevocation) + if err != nil { + l.log.Errorf("failed to send RevokeAndAck: %v", err) + } + + // Notify the incoming htlcs of which the resolutions were locked in. + for id, settled := range finalHTLCs { + l.cfg.HtlcNotifier.NotifyFinalHtlcEvent( + models.CircuitKey{ + ChanID: l.ShortChanID(), + HtlcID: id, + }, + channeldb.FinalHtlcInfo{ + Settled: settled, + Offchain: true, + }, + ) + } + + // Since we just revoked our commitment, we may have a new set of HTLC's + // on our commitment, so we'll send them using our function closure + // NotifyContractUpdate. + newUpdate := &contractcourt.ContractUpdate{ + HtlcKey: contractcourt.LocalHtlcSet, + Htlcs: currentHtlcs, + } + err = l.cfg.NotifyContractUpdate(newUpdate) + if err != nil { + return fmt.Errorf("unable to notify contract update: %w", err) + } + + select { + case <-l.cg.Done(): + return nil + default: + } + + // If the remote party initiated the state transition, we'll reply with + // a signature to provide them with their version of the latest + // commitment. Otherwise, both commitment chains are fully synced from + // our PoV, then we don't need to reply with a signature as both sides + // already have a commitment with the latest accepted. + if l.channel.OweCommitment() { + if !l.updateCommitTxOrFail(ctx) { + return nil + } + } + + // If we need to send out an Stfu, this would be the time to do so. + if l.noDanglingUpdates(lntypes.Local) { + err = l.quiescer.SendOwedStfu() + if err != nil { + l.stfuFailf("sendOwedStfu: %v", err.Error()) + } + } + + // Now that we have finished processing the incoming CommitSig and sent + // out our RevokeAndAck, we invoke the flushHooks if the channel state + // is clean. + l.Lock() + if l.channel.IsChannelClean() { + l.flushHooks.invoke() + } + l.Unlock() + + return nil +} + +// processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and +// processes it. +func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context, + msg *lnwire.RevokeAndAck) error { + + // We've received a revocation from the remote chain, if valid, this + // moves the remote chain forward, and expands our revocation window. + + // We now process the message and advance our remote commit chain. + fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg) + if err != nil { + // TODO(halseth): force close? + l.failf( + LinkFailureError{ + code: ErrInvalidRevocation, + FailureAction: LinkFailureDisconnect, + }, + "unable to accept revocation: %v", err, + ) + + return err + } + + // The remote party now has a new primary commitment, so we'll update + // the contract court to be aware of this new set (the prior old remote + // pending). + newUpdate := &contractcourt.ContractUpdate{ + HtlcKey: contractcourt.RemoteHtlcSet, + Htlcs: remoteHTLCs, + } + err = l.cfg.NotifyContractUpdate(newUpdate) + if err != nil { + return fmt.Errorf("unable to notify contract update: %w", err) + } + + select { + case <-l.cg.Done(): + return nil + default: + } + + // If we have a tower client for this channel type, we'll create a + // backup for the current state. + if l.cfg.TowerClient != nil { + state := l.channel.State() + chanID := l.ChanID() + + err = l.cfg.TowerClient.BackupState( + &chanID, state.RemoteCommitment.CommitHeight-1, + ) + if err != nil { + l.failf(LinkFailureError{ + code: ErrInternalError, + }, "unable to queue breach backup: %v", err) + + return err + } + } + + // If we can send updates then we can process adds in case we are the + // exit hop and need to send back resolutions, or in case there are + // validity issues with the packets. Otherwise we defer the action until + // resume. + // + // We are free to process the settles and fails without this check since + // processing those can't result in further updates to this channel + // link. + if l.quiescer.CanSendUpdates() { + l.processRemoteAdds(fwdPkg) + } else { + l.quiescer.OnResume(func() { + l.processRemoteAdds(fwdPkg) + }) + } + l.processRemoteSettleFails(fwdPkg) + + // If the link failed during processing the adds, we must return to + // ensure we won't attempted to update the state further. + if l.failed { + return nil + } + + // The revocation window opened up. If there are pending local updates, + // try to update the commit tx. Pending updates could already have been + // present because of a previously failed update to the commit tx or + // freshly added in by processRemoteAdds. Also in case there are no + // local updates, but there are still remote updates that are not in the + // remote commit tx yet, send out an update. + if l.channel.OweCommitment() { + if !l.updateCommitTxOrFail(ctx) { + return nil + } + } + + // Now that we have finished processing the RevokeAndAck, we can invoke + // the flushHooks if the channel state is clean. + l.Lock() + if l.channel.IsChannelClean() { + l.flushHooks.invoke() + } + l.Unlock() + + return nil +} + +// processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and +// processes it. +func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error { + // Check and see if their proposed fee-rate would make us exceed the fee + // threshold. + fee := chainfee.SatPerKWeight(msg.FeePerKw) + + isDust, err := l.exceedsFeeExposureLimit(fee) + if err != nil { + // This shouldn't typically happen. If it does, it indicates + // something is wrong with our channel state. + l.log.Errorf("Unable to determine if fee threshold " + + "exceeded") + l.failf(LinkFailureError{code: ErrInternalError}, + "error calculating fee exposure: %v", err) + + return err + } + + if isDust { + // The proposed fee-rate makes us exceed the fee threshold. + l.failf(LinkFailureError{code: ErrInternalError}, + "fee threshold exceeded: %v", err) + return err + } + + // We received fee update from peer. If we are the initiator we will + // fail the channel, if not we will apply the update. + if err := l.channel.ReceiveUpdateFee(fee); err != nil { + l.failf(LinkFailureError{code: ErrInvalidUpdate}, + "error receiving fee update: %v", err) + return err + } + + // Update the mailbox's feerate as well. + l.mailBox.SetFeeRate(fee) + + return nil +} + +// processRemoteError takes an `Error` msg sent from the remote and fails the +// channel link. +func (l *channelLink) processRemoteError(msg *lnwire.Error) { + // Error received from remote, MUST fail channel, but should only print + // the contents of the error message if all characters are printable + // ASCII. + l.failf( + // TODO(halseth): we currently don't fail the channel + // permanently, as there are some sync issues with other + // implementations that will lead to them sending an + // error message, but we can recover from on next + // connection. See + // https://github.com/ElementsProject/lightning/issues/4212 + LinkFailureError{ + code: ErrRemoteError, + PermanentFailure: false, + }, + "ChannelPoint(%v): received error from peer: %v", + l.channel.ChannelPoint(), msg.Error(), + ) +} + +// processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and +// processes it. +func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context, + pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) { + + // If hodl.SettleOutgoing mode is active, we exit early to simulate + // arbitrary delays between the switch adding the SETTLE to the mailbox, + // and the HTLC being added to the commitment state. + if l.cfg.HodlMask.Active(hodl.SettleOutgoing) { + l.log.Warnf(hodl.SettleOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) + + return + } + + // An HTLC we forward to the switch has just settled somewhere upstream. + // Therefore we settle the HTLC within the our local state machine. + inKey := pkt.inKey() + err := l.channel.SettleHTLC( + htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef, + pkt.destRef, &inKey, + ) + if err != nil { + l.log.Errorf("unable to settle incoming HTLC for "+ + "circuit-key=%v: %v", inKey, err) + + // If the HTLC index for Settle response was not known to our + // commitment state, it has already been cleaned up by a prior + // response. We'll thus try to clean up any lingering state to + // ensure we don't continue reforwarding. + if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) { + l.cleanupSpuriousResponse(pkt) + } + + // Remove the packet from the link's mailbox to ensure it + // doesn't get replayed after a reconnection. + l.mailBox.AckPacket(inKey) + + return + } + + l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s", + pkt.inKey(), pkt.outKey()) + + l.closedCircuits = append(l.closedCircuits, pkt.inKey()) + + // With the HTLC settled, we'll need to populate the wire message to + // target the specific channel and HTLC to be canceled. + htlc.ChanID = l.ChanID() + htlc.ID = pkt.incomingHTLCID + + // Then we send the HTLC settle message to the connected peer so we can + // continue the propagation of the settle message. + err = l.cfg.Peer.SendMessage(false, htlc) + if err != nil { + l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err) + } + + // Send a settle event notification to htlcNotifier. + l.cfg.HtlcNotifier.NotifySettleEvent( + newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt), + ) + + // Immediately update the commitment tx to minimize latency. + l.updateCommitTxOrFail(ctx) +} + +// processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and +// processes it. +func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context, + pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) { + + // If hodl.FailOutgoing mode is active, we exit early to simulate + // arbitrary delays between the switch adding a FAIL to the mailbox, and + // the HTLC being added to the commitment state. + if l.cfg.HodlMask.Active(hodl.FailOutgoing) { + l.log.Warnf(hodl.FailOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) + + return + } + + // An HTLC cancellation has been triggered somewhere upstream, we'll + // remove then HTLC from our local state machine. + inKey := pkt.inKey() + err := l.channel.FailHTLC( + pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef, + &inKey, + ) + if err != nil { + l.log.Errorf("unable to cancel incoming HTLC for "+ + "circuit-key=%v: %v", inKey, err) + + // If the HTLC index for Fail response was not known to our + // commitment state, it has already been cleaned up by a prior + // response. We'll thus try to clean up any lingering state to + // ensure we don't continue reforwarding. + if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) { + l.cleanupSpuriousResponse(pkt) + } + + // Remove the packet from the link's mailbox to ensure it + // doesn't get replayed after a reconnection. + l.mailBox.AckPacket(inKey) + + return + } + + l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s", + pkt.inKey(), pkt.outKey()) + + l.closedCircuits = append(l.closedCircuits, pkt.inKey()) + + // With the HTLC removed, we'll need to populate the wire message to + // target the specific channel and HTLC to be canceled. The "Reason" + // field will have already been set within the switch. + htlc.ChanID = l.ChanID() + htlc.ID = pkt.incomingHTLCID + + // We send the HTLC message to the peer which initially created the + // HTLC. If the incoming blinding point is non-nil, we know that we are + // a relaying node in a blinded path. Otherwise, we're either an + // introduction node or not part of a blinded path at all. + err = l.sendIncomingHTLCFailureMsg(htlc.ID, pkt.obfuscator, htlc.Reason) + if err != nil { + l.log.Errorf("unable to send HTLC failure: %v", err) + + return + } + + // If the packet does not have a link failure set, it failed further + // down the route so we notify a forwarding failure. Otherwise, we + // notify a link failure because it failed at our node. + if pkt.linkFailure != nil { + l.cfg.HtlcNotifier.NotifyLinkFailEvent( + newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt), + pkt.linkFailure, false, + ) + } else { + l.cfg.HtlcNotifier.NotifyForwardingFailEvent( + newHtlcKey(pkt), getEventType(pkt), + ) + } + + // Immediately update the commitment tx to minimize latency. + l.updateCommitTxOrFail(ctx) +}