From 30f9257a6b4057387d97542bccfabf7cac766d15 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 20:41:46 +0800 Subject: [PATCH 01/22] htlcswitch: add handler `handleHtlcResolution` --- htlcswitch/link.go | 69 ++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 3aa47caf3..0842a8c3c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1511,29 +1511,10 @@ func (l *channelLink) htlcManager(ctx context.Context) { // A htlc resolution is received. This means that we now have a // resolution for a previously accepted htlc. case hodlItem := <-l.hodlQueue.ChanOut(): - htlcResolution := hodlItem.(invoices.HtlcResolution) - err := l.processHodlQueue(ctx, htlcResolution) - switch err { - // No error, success. - case nil: - - // If the duplicate keystone error was encountered, - // fail back gracefully. - case ErrDuplicateKeystone: - l.failf(LinkFailureError{ - code: ErrCircuitError, - }, "process hodl queue: "+ - "temporary circuit error: %v", - err, - ) - - // Send an Error message to the peer. - default: - l.failf(LinkFailureError{ - code: ErrInternalError, - }, "process hodl queue: unable to update "+ - "commitment: %v", err, - ) + err := l.handleHtlcResolution(ctx, hodlItem) + if err != nil { + l.log.Errorf("failed to handle htlc "+ + "resolution: %v", err) } case qReq := <-l.quiescenceReqs: @@ -4582,3 +4563,45 @@ func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] { return l.channel.LocalCommitmentBlob() } + +// handleHtlcResolution takes an HTLC resolution and processes it by draining +// the hodlQueue. Once processed, a commit_sig is sent to the remote to update +// their commitment. +func (l *channelLink) handleHtlcResolution(ctx context.Context, + hodlItem any) error { + + htlcResolution, ok := hodlItem.(invoices.HtlcResolution) + if !ok { + return fmt.Errorf("expect HtlcResolution, got %T", hodlItem) + } + + err := l.processHodlQueue(ctx, htlcResolution) + // No error, success. + if err == nil { + return nil + + } + switch err { + // If the duplicate keystone error was encountered, fail back + // gracefully. + case ErrDuplicateKeystone: + l.failf( + LinkFailureError{ + code: ErrCircuitError, + }, + "process hodl queue: temporary circuit error: %v", err, + ) + + // Send an Error message to the peer. + default: + l.failf( + LinkFailureError{ + code: ErrInternalError, + }, + "process hodl queue: unable to update commitment: %v", + err, + ) + } + + return err +} From e8b20354845604f48acf864f4940bd57c59eb9e2 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 20:45:28 +0800 Subject: [PATCH 02/22] htlcswitch: add handler `handleQuiescenceReq` --- htlcswitch/link.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 0842a8c3c..441ea5fcc 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1517,18 +1517,13 @@ func (l *channelLink) htlcManager(ctx context.Context) { "resolution: %v", err) } + // A user-initiated quiescence request is received. We now + // forward it to the quiescer. case qReq := <-l.quiescenceReqs: - l.quiescer.InitStfu(qReq) - - if l.noDanglingUpdates(lntypes.Local) { - err := l.quiescer.SendOwedStfu() - if err != nil { - l.stfuFailf( - "SendOwedStfu: %s", err.Error(), - ) - res := fn.Err[lntypes.ChannelParty](err) - qReq.Resolve(res) - } + err := l.handleQuiescenceReq(qReq) + if err != nil { + l.log.Errorf("failed handle quiescence "+ + "req: %v", err) } case <-l.cg.Done(): @@ -4605,3 +4600,22 @@ func (l *channelLink) handleHtlcResolution(ctx context.Context, return err } + +// handleQuiescenceReq takes a locally initialized (RPC) quiescence request and +// forwards it to the quiescer for further processing. +func (l *channelLink) handleQuiescenceReq(req StfuReq) error { + l.quiescer.InitStfu(req) + + if !l.noDanglingUpdates(lntypes.Local) { + return nil + } + + err := l.quiescer.SendOwedStfu() + if err != nil { + l.stfuFailf("SendOwedStfu: %s", err.Error()) + res := fn.Err[lntypes.ChannelParty](err) + req.Resolve(res) + } + + return err +} From cccb4474391cd69312cdfb0574507f1190274040 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 20:50:05 +0800 Subject: [PATCH 03/22] htlcswitch: add handler `handleUpdateFee` --- htlcswitch/link.go | 89 ++++++++++++++++++++++++---------------------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 441ea5fcc..68c5d1c6a 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1414,50 +1414,10 @@ func (l *channelLink) htlcManager(ctx context.Context) { // fee to see if we should adjust our commitment fee. case <-l.updateFeeTimer.C: l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout()) - - // If we're not the initiator of the channel, don't we - // don't control the fees, so we can ignore this. - if !l.channel.IsInitiator() { - continue - } - - // If we are the initiator, then we'll sample the - // current fee rate to get into the chain within 3 - // blocks. - netFee, err := l.sampleNetworkFee() + err := l.handleUpdateFee(ctx) if err != nil { - l.log.Errorf("unable to sample network fee: %v", - err) - continue - } - - minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW() - - newCommitFee := l.channel.IdealCommitFeeRate( - netFee, minRelayFee, - l.cfg.MaxAnchorsCommitFeeRate, - l.cfg.MaxFeeAllocation, - ) - - // We determine if we should adjust the commitment fee - // based on the current commitment fee, the suggested - // new commitment fee and the current minimum relay fee - // rate. - commitFee := l.channel.CommitFeeRate() - if !shouldAdjustCommitFee( - newCommitFee, commitFee, minRelayFee, - ) { - - continue - } - - // If we do, then we'll send a new UpdateFee message to - // the remote party, to be locked in with a new update. - err = l.updateChannelFee(ctx, newCommitFee) - if err != nil { - l.log.Errorf("unable to update fee rate: %v", - err) - continue + l.log.Errorf("failed to handle update fee: "+ + "%v", err) } // The underlying channel has notified us of a unilateral close @@ -4619,3 +4579,46 @@ func (l *channelLink) handleQuiescenceReq(req StfuReq) error { return err } + +// handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to +// decide whether we should send an `update_fee` msg to update the commitment's +// feerate. +func (l *channelLink) handleUpdateFee(ctx context.Context) error { + // If we're not the initiator of the channel, we don't control the fees, + // so we can ignore this. + if !l.channel.IsInitiator() { + return nil + } + + // If we are the initiator, then we'll sample the current fee rate to + // get into the chain within 3 blocks. + netFee, err := l.sampleNetworkFee() + if err != nil { + return fmt.Errorf("unable to sample network fee: %w", err) + } + + minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW() + + newCommitFee := l.channel.IdealCommitFeeRate( + netFee, minRelayFee, + l.cfg.MaxAnchorsCommitFeeRate, + l.cfg.MaxFeeAllocation, + ) + + // We determine if we should adjust the commitment fee based on the + // current commitment fee, the suggested new commitment fee and the + // current minimum relay fee rate. + commitFee := l.channel.CommitFeeRate() + if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) { + return nil + } + + // If we do, then we'll send a new UpdateFee message to the remote + // party, to be locked in with a new update. + err = l.updateChannelFee(ctx, newCommitFee) + if err != nil { + return fmt.Errorf("unable to update fee rate: %w", err) + } + + return nil +} From 09022b5f56206b4b8bf1ee3d9eae7f449cb1ea1b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 20:55:26 +0800 Subject: [PATCH 04/22] htlcswitch: add handler `toggleBatchTicker` --- htlcswitch/link.go | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 68c5d1c6a..5398978da 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1363,24 +1363,8 @@ func (l *channelLink) htlcManager(ctx context.Context) { return } - // If the previous event resulted in a non-empty batch, resume - // the batch ticker so that it can be cleared. Otherwise pause - // the ticker to prevent waking up the htlcManager while the - // batch is empty. - numUpdates := l.channel.NumPendingUpdates( - lntypes.Local, lntypes.Remote, - ) - if numUpdates > 0 { - l.cfg.BatchTicker.Resume() - l.log.Tracef("BatchTicker resumed, "+ - "NumPendingUpdates(Local, Remote)=%d", - numUpdates, - ) - } else { - l.cfg.BatchTicker.Pause() - l.log.Trace("BatchTicker paused due to zero " + - "NumPendingUpdates(Local, Remote)") - } + // Pause or resume the batch ticker. + l.toggleBatchTicker() select { // We have a new hook that needs to be run when we reach a clean @@ -4622,3 +4606,23 @@ func (l *channelLink) handleUpdateFee(ctx context.Context) error { return nil } + +// toggleBatchTicker checks whether we need to resume or pause the batch ticker. +// When we have no pending updates, the ticker is paused, otherwise resumed. +func (l *channelLink) toggleBatchTicker() { + // If the previous event resulted in a non-empty batch, resume the batch + // ticker so that it can be cleared. Otherwise pause the ticker to + // prevent waking up the htlcManager while the batch is empty. + numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) + if numUpdates > 0 { + l.cfg.BatchTicker.Resume() + l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+ + "Remote)=%d", numUpdates) + + return + } + + l.cfg.BatchTicker.Pause() + l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" + + "(Local, Remote)") +} From ca770bb4afccee33b142eda028760ebc99073411 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 20:56:50 +0800 Subject: [PATCH 05/22] htlcswitch: notify channel active after reforwarding The channel should only be considered active when its pending tasks are finished, which includes, 1. sync channel state via reestablish. 2. send previous shutdown msg. 3. reset the mailbox's packets. 4. reforwarding logs loaded from restart. When the above tasks are finished, the channel can be considered as fully resumed from its previous disconnection. --- htlcswitch/link.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 5398978da..d1f27fa12 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1307,13 +1307,6 @@ func (l *channelLink) htlcManager(ctx context.Context) { // allow the switch to forward HTLCs in the outbound direction. l.markReestablished() - // Now that we've received both channel_ready and channel reestablish, - // we can go ahead and send the active channel notification. We'll also - // defer the inactive notification for when the link exits to ensure - // that every active notification is matched by an inactive one. - l.cfg.NotifyActiveChannel(l.ChannelPoint()) - defer l.cfg.NotifyInactiveChannel(l.ChannelPoint()) - // With the channel states synced, we now reset the mailbox to ensure // we start processing all unacked packets in order. This is done here // to ensure that all acknowledgments that occur during channel @@ -1355,6 +1348,13 @@ func (l *channelLink) htlcManager(ctx context.Context) { go l.fwdPkgGarbager() } + // Now that we've received both channel_ready and channel reestablish, + // we can go ahead and send the active channel notification. We'll also + // defer the inactive notification for when the link exits to ensure + // that every active notification is matched by an inactive one. + l.cfg.NotifyActiveChannel(l.ChannelPoint()) + defer l.cfg.NotifyInactiveChannel(l.ChannelPoint()) + for { // We must always check if we failed at some point processing // the last update before processing the next. From 3af8281370136413e9f6a17249463a328fb7412f Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:07:32 +0800 Subject: [PATCH 06/22] htlcswitch: add handler `resumeLink` --- htlcswitch/link.go | 169 +++++++++++++++++++++++++-------------------- 1 file changed, 95 insertions(+), 74 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index d1f27fa12..194415f4c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1272,80 +1272,12 @@ func (l *channelLink) htlcManager(ctx context.Context) { // TODO(roasbeef): need to call wipe chan whenever D/C? - // If this isn't the first time that this channel link has been - // created, then we'll need to check to see if we need to - // re-synchronize state with the remote peer. settledHtlcs is a map of - // HTLC's that we re-settled as part of the channel state sync. - if l.cfg.SyncStates { - err := l.syncChanStates(ctx) - if err != nil { - l.handleChanSyncErr(err) - return - } - } - - // If a shutdown message has previously been sent on this link, then we - // need to make sure that we have disabled any HTLC adds on the outgoing - // direction of the link and that we re-resend the same shutdown message - // that we previously sent. - l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) { - // Immediately disallow any new outgoing HTLCs. - if !l.DisableAdds(Outgoing) { - l.log.Warnf("Outgoing link adds already disabled") - } - - // Re-send the shutdown message the peer. Since syncChanStates - // would have sent any outstanding CommitSig, it is fine for us - // to immediately queue the shutdown message now. - err := l.cfg.Peer.SendMessage(false, &shutdown) - if err != nil { - l.log.Warnf("Error sending shutdown message: %v", err) - } - }) - - // We've successfully reestablished the channel, mark it as such to - // allow the switch to forward HTLCs in the outbound direction. - l.markReestablished() - - // With the channel states synced, we now reset the mailbox to ensure - // we start processing all unacked packets in order. This is done here - // to ensure that all acknowledgments that occur during channel - // resynchronization have taken affect, causing us only to pull unacked - // packets after starting to read from the downstream mailbox. - l.mailBox.ResetPackets() - - // After cleaning up any memory pertaining to incoming packets, we now - // replay our forwarding packages to handle any htlcs that can be - // processed locally, or need to be forwarded out to the switch. We will - // only attempt to resolve packages if our short chan id indicates that - // the channel is not pending, otherwise we should have no htlcs to - // reforward. - if l.ShortChanID() != hop.Source { - err := l.resolveFwdPkgs(ctx) - switch err { - // No error was encountered, success. - case nil: - - // If the duplicate keystone error was encountered, we'll fail - // without sending an Error message to the peer. - case ErrDuplicateKeystone: - l.failf(LinkFailureError{code: ErrCircuitError}, - "temporary circuit error: %v", err) - return - - // A non-nil error was encountered, send an Error message to - // the peer. - default: - l.failf(LinkFailureError{code: ErrInternalError}, - "unable to resolve fwd pkgs: %v", err) - return - } - - // With our link's in-memory state fully reconstructed, spawn a - // goroutine to manage the reclamation of disk space occupied by - // completed forwarding packages. - l.cg.WgAdd(1) - go l.fwdPkgGarbager() + // If the link is not started for the first time, we need to take extra + // steps to resume its state. + err := l.resumeLink(ctx) + if err != nil { + l.log.Errorf("resuming link failed: %v", err) + return } // Now that we've received both channel_ready and channel reestablish, @@ -4626,3 +4558,92 @@ func (l *channelLink) toggleBatchTicker() { l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" + "(Local, Remote)") } + +// resumeLink is called when starting a previous link. It will go through the +// reestablishment protocol and reforwarding packets that are yet resolved. +func (l *channelLink) resumeLink(ctx context.Context) error { + // If this isn't the first time that this channel link has been created, + // then we'll need to check to see if we need to re-synchronize state + // with the remote peer. settledHtlcs is a map of HTLC's that we + // re-settled as part of the channel state sync. + if l.cfg.SyncStates { + err := l.syncChanStates(ctx) + if err != nil { + l.handleChanSyncErr(err) + + return err + } + } + + // If a shutdown message has previously been sent on this link, then we + // need to make sure that we have disabled any HTLC adds on the outgoing + // direction of the link and that we re-resend the same shutdown message + // that we previously sent. + // + // TODO(yy): we should either move this to chanCloser, or move all + // shutdown handling logic to be managed by the link, but not a mixed of + // partial management by two subsystems. + l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) { + // Immediately disallow any new outgoing HTLCs. + if !l.DisableAdds(Outgoing) { + l.log.Warnf("Outgoing link adds already disabled") + } + + // Re-send the shutdown message the peer. Since syncChanStates + // would have sent any outstanding CommitSig, it is fine for us + // to immediately queue the shutdown message now. + err := l.cfg.Peer.SendMessage(false, &shutdown) + if err != nil { + l.log.Warnf("Error sending shutdown message: %v", err) + } + }) + + // We've successfully reestablished the channel, mark it as such to + // allow the switch to forward HTLCs in the outbound direction. + l.markReestablished() + + // With the channel states synced, we now reset the mailbox to ensure we + // start processing all unacked packets in order. This is done here to + // ensure that all acknowledgments that occur during channel + // resynchronization have taken affect, causing us only to pull unacked + // packets after starting to read from the downstream mailbox. + l.mailBox.ResetPackets() + + // If the channel is pending, there's no need to reforwarding packets. + if l.ShortChanID() == hop.Source { + return nil + } + + // After cleaning up any memory pertaining to incoming packets, we now + // replay our forwarding packages to handle any htlcs that can be + // processed locally, or need to be forwarded out to the switch. We will + // only attempt to resolve packages if our short chan id indicates that + // the channel is not pending, otherwise we should have no htlcs to + // reforward. + err := l.resolveFwdPkgs(ctx) + switch err { + // No error was encountered, success. + case nil: + // With our link's in-memory state fully reconstructed, spawn a + // goroutine to manage the reclamation of disk space occupied by + // completed forwarding packages. + l.cg.WgAdd(1) + go l.fwdPkgGarbager() + + return nil + + // If the duplicate keystone error was encountered, we'll fail + // without sending an Error message to the peer. + case ErrDuplicateKeystone: + l.failf(LinkFailureError{code: ErrCircuitError}, + "temporary circuit error: %v", err) + + // A non-nil error was encountered, send an Error message to + // the peer. + default: + l.failf(LinkFailureError{code: ErrInternalError}, + "unable to resolve fwd pkgs: %v", err) + } + + return err +} From 83f658f714e9ae60bef276204ff3c52a98b8e1e4 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:08:01 +0800 Subject: [PATCH 07/22] htlcswitch: remove TODO and nolint This old TODO is no longer relevant. --- htlcswitch/link.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 194415f4c..b200364a7 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1252,8 +1252,6 @@ func (l *channelLink) handleChanSyncErr(err error) { // and also the htlc trickle queue+timer for this active channels. // // NOTE: This MUST be run as a goroutine. -// -//nolint:funlen func (l *channelLink) htlcManager(ctx context.Context) { defer func() { l.cfg.BatchTicker.Stop() @@ -1270,8 +1268,6 @@ func (l *channelLink) htlcManager(ctx context.Context) { l.cfg.NotifyActiveLink(l.ChannelPoint()) defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint()) - // TODO(roasbeef): need to call wipe chan whenever D/C? - // If the link is not started for the first time, we need to take extra // steps to resume its state. err := l.resumeLink(ctx) From 212262bf884a3af77bea4ee497da09ba8076b86d Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:32:05 +0800 Subject: [PATCH 08/22] htlcswitch: add `processRemoteUpdateAddHTLC` --- htlcswitch/link.go | 148 +++++++++++++++++++++++++-------------------- 1 file changed, 81 insertions(+), 67 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index b200364a7..27091e811 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1920,77 +1920,12 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, switch msg := msg.(type) { case *lnwire.UpdateAddHTLC: - if l.IsFlushing(Incoming) { - // This is forbidden by the protocol specification. - // The best chance we have to deal with this is to drop - // the connection. This should roll back the channel - // state to the last CommitSig. If the remote has - // already sent a CommitSig we haven't received yet, - // channel state will be re-synchronized with a - // ChannelReestablish message upon reconnection and the - // protocol state that caused us to flush the link will - // be rolled back. In the event that there was some - // non-deterministic behavior in the remote that caused - // them to violate the protocol, we have a decent shot - // at correcting it this way, since reconnecting will - // put us in the cleanest possible state to try again. - // - // In addition to the above, it is possible for us to - // hit this case in situations where we improperly - // handle message ordering due to concurrency choices. - // An issue has been filed to address this here: - // https://github.com/lightningnetwork/lnd/issues/8393 - l.failf( - LinkFailureError{ - code: ErrInvalidUpdate, - FailureAction: LinkFailureDisconnect, - PermanentFailure: false, - Warning: true, - }, - "received add while link is flushing", - ) - - return - } - - // Disallow htlcs with blinding points set if we haven't - // enabled the feature. This saves us from having to process - // the onion at all, but will only catch blinded payments - // where we are a relaying node (as the blinding point will - // be in the payload when we're the introduction node). - if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "blinding point included when route blinding "+ - "is disabled") - - return - } - - // We have to check the limit here rather than later in the - // switch because the counterparty can keep sending HTLC's - // without sending a revoke. This would mean that the switch - // check would only occur later. - if l.isOverexposedWithHtlc(msg, true) { - l.failf(LinkFailureError{code: ErrInternalError}, - "peer sent us an HTLC that exceeded our max "+ - "fee exposure") - - return - } - - // We just received an add request from an upstream peer, so we - // add it to our state machine, then add the HTLC to our - // "settle" list in the event that we know the preimage. - index, err := l.channel.ReceiveHTLC(msg) + err := l.processRemoteUpdateAddHTLC(msg) if err != nil { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "unable to handle upstream add HTLC: %v", err) + l.log.Errorf("failed to process remote ADD: %v", err) return } - l.log.Tracef("receive upstream htlc with payment hash(%x), "+ - "assigning index: %v", msg.PaymentHash[:], index) - case *lnwire.UpdateFulfillHTLC: pre := msg.PaymentPreimage idx := msg.ID @@ -4643,3 +4578,82 @@ func (l *channelLink) resumeLink(ctx context.Context) error { return err } + +// processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote +// and processes it. +func (l *channelLink) processRemoteUpdateAddHTLC( + msg *lnwire.UpdateAddHTLC) error { + + if l.IsFlushing(Incoming) { + // This is forbidden by the protocol specification. The best + // chance we have to deal with this is to drop the connection. + // This should roll back the channel state to the last + // CommitSig. If the remote has already sent a CommitSig we + // haven't received yet, channel state will be re-synchronized + // with a ChannelReestablish message upon reconnection and the + // protocol state that caused us to flush the link will be + // rolled back. In the event that there was some + // non-deterministic behavior in the remote that caused them to + // violate the protocol, we have a decent shot at correcting it + // this way, since reconnecting will put us in the cleanest + // possible state to try again. + // + // In addition to the above, it is possible for us to hit this + // case in situations where we improperly handle message + // ordering due to concurrency choices. An issue has been filed + // to address this here: + // https://github.com/lightningnetwork/lnd/issues/8393 + err := errors.New("received add while link is flushing") + l.failf( + LinkFailureError{ + code: ErrInvalidUpdate, + FailureAction: LinkFailureDisconnect, + PermanentFailure: false, + Warning: true, + }, err.Error(), + ) + + return err + } + + // Disallow htlcs with blinding points set if we haven't enabled the + // feature. This saves us from having to process the onion at all, but + // will only catch blinded payments where we are a relaying node (as the + // blinding point will be in the payload when we're the introduction + // node). + if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding { + err := errors.New("blinding point included when route " + + "blinding is disabled") + + l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error()) + + return err + } + + // We have to check the limit here rather than later in the switch + // because the counterparty can keep sending HTLC's without sending a + // revoke. This would mean that the switch check would only occur later. + if l.isOverexposedWithHtlc(msg, true) { + err := errors.New("peer sent us an HTLC that exceeded our " + + "max fee exposure") + l.failf(LinkFailureError{code: ErrInternalError}, err.Error()) + + return err + } + + // We just received an add request from an upstream peer, so we add it + // to our state machine, then add the HTLC to our "settle" list in the + // event that we know the preimage. + index, err := l.channel.ReceiveHTLC(msg) + if err != nil { + l.failf(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream add HTLC: %v", err) + + return err + } + + l.log.Tracef("receive upstream htlc with payment hash(%x), "+ + "assigning index: %v", msg.PaymentHash[:], index) + + return nil +} From f9df5a9f518ec639a992e583640d1571ad9f1430 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:34:07 +0800 Subject: [PATCH 09/22] htlcswitch: add `processRemoteUpdateFulfillHTLC` --- htlcswitch/link.go | 110 +++++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 48 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 27091e811..254b56f8c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1927,58 +1927,14 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, } case *lnwire.UpdateFulfillHTLC: - pre := msg.PaymentPreimage - idx := msg.ID + err := l.processRemoteUpdateFulfillHTLC(msg) + if err != nil { + l.log.Errorf("failed to process remote fulfill: %v", + err) - // Before we pipeline the settle, we'll check the set of active - // htlc's to see if the related UpdateAddHTLC has been fully - // locked-in. - var lockedin bool - htlcs := l.channel.ActiveHtlcs() - for _, add := range htlcs { - // The HTLC will be outgoing and match idx. - if !add.Incoming && add.HtlcIndex == idx { - lockedin = true - break - } - } - - if !lockedin { - l.failf( - LinkFailureError{code: ErrInvalidUpdate}, - "unable to handle upstream settle", - ) return } - if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { - l.failf( - LinkFailureError{ - code: ErrInvalidUpdate, - FailureAction: LinkFailureForceClose, - }, - "unable to handle upstream settle HTLC: %v", err, - ) - return - } - - settlePacket := &htlcPacket{ - outgoingChanID: l.ShortChanID(), - outgoingHTLCID: idx, - htlc: &lnwire.UpdateFulfillHTLC{ - PaymentPreimage: pre, - }, - } - - // Add the newly discovered preimage to our growing list of - // uncommitted preimage. These will be written to the witness - // cache just before accepting the next commitment signature - // from the remote peer. - l.uncommittedPreimages = append(l.uncommittedPreimages, pre) - - // Pipeline this settle, send it to the switch. - go l.forwardBatch(false, settlePacket) - case *lnwire.UpdateFailMalformedHTLC: // Convert the failure type encoded within the HTLC fail // message to the proper generic lnwire error code. @@ -4657,3 +4613,61 @@ func (l *channelLink) processRemoteUpdateAddHTLC( return nil } + +// processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the +// remote and processes it. +func (l *channelLink) processRemoteUpdateFulfillHTLC( + msg *lnwire.UpdateFulfillHTLC) error { + + pre := msg.PaymentPreimage + idx := msg.ID + + // Before we pipeline the settle, we'll check the set of active htlc's + // to see if the related UpdateAddHTLC has been fully locked-in. + var lockedin bool + htlcs := l.channel.ActiveHtlcs() + for _, add := range htlcs { + // The HTLC will be outgoing and match idx. + if !add.Incoming && add.HtlcIndex == idx { + lockedin = true + break + } + } + + if !lockedin { + err := errors.New("unable to handle upstream settle") + l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error()) + + return err + } + + if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { + l.failf( + LinkFailureError{ + code: ErrInvalidUpdate, + FailureAction: LinkFailureForceClose, + }, + "unable to handle upstream settle HTLC: %v", err, + ) + + return err + } + + settlePacket := &htlcPacket{ + outgoingChanID: l.ShortChanID(), + outgoingHTLCID: idx, + htlc: &lnwire.UpdateFulfillHTLC{ + PaymentPreimage: pre, + }, + } + + // Add the newly discovered preimage to our growing list of uncommitted + // preimage. These will be written to the witness cache just before + // accepting the next commitment signature from the remote peer. + l.uncommittedPreimages = append(l.uncommittedPreimages, pre) + + // Pipeline this settle, send it to the switch. + go l.forwardBatch(false, settlePacket) + + return nil +} From e883a51a638c5ec6c805e1514bf239b35e4c0317 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:36:40 +0800 Subject: [PATCH 10/22] htlcswitch: add `processRemoteUpdateFailMalformedHTLC` --- htlcswitch/link.go | 137 +++++++++++++++++++++++++-------------------- 1 file changed, 75 insertions(+), 62 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 254b56f8c..2dd7ce2c0 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1936,69 +1936,11 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, } case *lnwire.UpdateFailMalformedHTLC: - // Convert the failure type encoded within the HTLC fail - // message to the proper generic lnwire error code. - var failure lnwire.FailureMessage - switch msg.FailureCode { - case lnwire.CodeInvalidOnionVersion: - failure = &lnwire.FailInvalidOnionVersion{ - OnionSHA256: msg.ShaOnionBlob, - } - case lnwire.CodeInvalidOnionHmac: - failure = &lnwire.FailInvalidOnionHmac{ - OnionSHA256: msg.ShaOnionBlob, - } - - case lnwire.CodeInvalidOnionKey: - failure = &lnwire.FailInvalidOnionKey{ - OnionSHA256: msg.ShaOnionBlob, - } - - // Handle malformed errors that are part of a blinded route. - // This case is slightly different, because we expect every - // relaying node in the blinded portion of the route to send - // malformed errors. If we're also a relaying node, we're - // likely going to switch this error out anyway for our own - // malformed error, but we handle the case here for - // completeness. - case lnwire.CodeInvalidBlinding: - failure = &lnwire.FailInvalidBlinding{ - OnionSHA256: msg.ShaOnionBlob, - } - - default: - l.log.Warnf("unexpected failure code received in "+ - "UpdateFailMailformedHTLC: %v", msg.FailureCode) - - // We don't just pass back the error we received from - // our successor. Otherwise we might report a failure - // that penalizes us more than needed. If the onion that - // we forwarded was correct, the node should have been - // able to send back its own failure. The node did not - // send back its own failure, so we assume there was a - // problem with the onion and report that back. We reuse - // the invalid onion key failure because there is no - // specific error for this case. - failure = &lnwire.FailInvalidOnionKey{ - OnionSHA256: msg.ShaOnionBlob, - } - } - - // With the error parsed, we'll convert the into it's opaque - // form. - var b bytes.Buffer - if err := lnwire.EncodeFailure(&b, failure, 0); err != nil { - l.log.Errorf("unable to encode malformed error: %v", err) - return - } - - // If remote side have been unable to parse the onion blob we - // have sent to it, than we should transform the malformed HTLC - // message to the usual HTLC fail message. - err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes()) + err := l.processRemoteUpdateFailMalformedHTLC(msg) if err != nil { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "unable to handle upstream fail HTLC: %v", err) + l.log.Errorf("failed to process remote fail "+ + "malformed: %v", err) + return } @@ -4671,3 +4613,74 @@ func (l *channelLink) processRemoteUpdateFulfillHTLC( return nil } + +// processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg +// sent from the remote and processes it. +func (l *channelLink) processRemoteUpdateFailMalformedHTLC( + msg *lnwire.UpdateFailMalformedHTLC) error { + + // Convert the failure type encoded within the HTLC fail message to the + // proper generic lnwire error code. + var failure lnwire.FailureMessage + switch msg.FailureCode { + case lnwire.CodeInvalidOnionVersion: + failure = &lnwire.FailInvalidOnionVersion{ + OnionSHA256: msg.ShaOnionBlob, + } + case lnwire.CodeInvalidOnionHmac: + failure = &lnwire.FailInvalidOnionHmac{ + OnionSHA256: msg.ShaOnionBlob, + } + + case lnwire.CodeInvalidOnionKey: + failure = &lnwire.FailInvalidOnionKey{ + OnionSHA256: msg.ShaOnionBlob, + } + + // Handle malformed errors that are part of a blinded route. This case + // is slightly different, because we expect every relaying node in the + // blinded portion of the route to send malformed errors. If we're also + // a relaying node, we're likely going to switch this error out anyway + // for our own malformed error, but we handle the case here for + // completeness. + case lnwire.CodeInvalidBlinding: + failure = &lnwire.FailInvalidBlinding{ + OnionSHA256: msg.ShaOnionBlob, + } + + default: + l.log.Warnf("unexpected failure code received in "+ + "UpdateFailMailformedHTLC: %v", msg.FailureCode) + + // We don't just pass back the error we received from our + // successor. Otherwise we might report a failure that penalizes + // us more than needed. If the onion that we forwarded was + // correct, the node should have been able to send back its own + // failure. The node did not send back its own failure, so we + // assume there was a problem with the onion and report that + // back. We reuse the invalid onion key failure because there is + // no specific error for this case. + failure = &lnwire.FailInvalidOnionKey{ + OnionSHA256: msg.ShaOnionBlob, + } + } + + // With the error parsed, we'll convert the into it's opaque form. + var b bytes.Buffer + if err := lnwire.EncodeFailure(&b, failure, 0); err != nil { + return fmt.Errorf("unable to encode malformed error: %w", err) + } + + // If remote side have been unable to parse the onion blob we have sent + // to it, than we should transform the malformed HTLC message to the + // usual HTLC fail message. + err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes()) + if err != nil { + l.failf(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream fail HTLC: %v", err) + + return err + } + + return nil +} From ad1566ee3fccbb7636eb00095db02fd730e0aa27 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:38:13 +0800 Subject: [PATCH 11/22] htlcswitch: add `processRemoteUpdateFailHTLC` --- htlcswitch/link.go | 76 ++++++++++++++++++++++++++-------------------- 1 file changed, 43 insertions(+), 33 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 2dd7ce2c0..444ff82a1 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1945,40 +1945,9 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, } case *lnwire.UpdateFailHTLC: - // Verify that the failure reason is at least 256 bytes plus - // overhead. - const minimumFailReasonLength = lnwire.FailureMessageLength + - 2 + 2 + 32 - - if len(msg.Reason) < minimumFailReasonLength { - // We've received a reason with a non-compliant length. - // Older nodes happily relay back these failures that - // may originate from a node further downstream. - // Therefore we can't just fail the channel. - // - // We want to be compliant ourselves, so we also can't - // pass back the reason unmodified. And we must make - // sure that we don't hit the magic length check of 260 - // bytes in processRemoteSettleFails either. - // - // Because the reason is unreadable for the payer - // anyway, we just replace it by a compliant-length - // series of random bytes. - msg.Reason = make([]byte, minimumFailReasonLength) - _, err := crand.Read(msg.Reason[:]) - if err != nil { - l.log.Errorf("Random generation error: %v", err) - - return - } - } - - // Add fail to the update log. - idx := msg.ID - err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) + err := l.processRemoteUpdateFailHTLC(msg) if err != nil { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "unable to handle upstream fail HTLC: %v", err) + l.log.Errorf("failed to process remote fail: %v", err) return } @@ -4684,3 +4653,44 @@ func (l *channelLink) processRemoteUpdateFailMalformedHTLC( return nil } + +// processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the +// remote and processes it. +func (l *channelLink) processRemoteUpdateFailHTLC( + msg *lnwire.UpdateFailHTLC) error { + + // Verify that the failure reason is at least 256 bytes plus overhead. + const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32 + + if len(msg.Reason) < minimumFailReasonLength { + // We've received a reason with a non-compliant length. Older + // nodes happily relay back these failures that may originate + // from a node further downstream. Therefore we can't just fail + // the channel. + // + // We want to be compliant ourselves, so we also can't pass back + // the reason unmodified. And we must make sure that we don't + // hit the magic length check of 260 bytes in + // processRemoteSettleFails either. + // + // Because the reason is unreadable for the payer anyway, we + // just replace it by a compliant-length series of random bytes. + msg.Reason = make([]byte, minimumFailReasonLength) + _, err := crand.Read(msg.Reason[:]) + if err != nil { + return fmt.Errorf("random generation error: %w", err) + } + } + + // Add fail to the update log. + idx := msg.ID + err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) + if err != nil { + l.failf(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream fail HTLC: %v", err) + + return err + } + + return nil +} From 04abf96f60fd564061f06bc2da4d54da19396dce Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:41:41 +0800 Subject: [PATCH 12/22] htlcswitch: add `processRemoteCommitSig` --- htlcswitch/link.go | 345 +++++++++++++++++++++++---------------------- 1 file changed, 176 insertions(+), 169 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 444ff82a1..72b4a10e9 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1952,180 +1952,14 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, } case *lnwire.CommitSig: - // Since we may have learned new preimages for the first time, - // we'll add them to our preimage cache. By doing this, we - // ensure any contested contracts watched by any on-chain - // arbitrators can now sweep this HTLC on-chain. We delay - // committing the preimages until just before accepting the new - // remote commitment, as afterwards the peer won't resend the - // Settle messages on the next channel reestablishment. Doing so - // allows us to more effectively batch this operation, instead - // of doing a single write per preimage. - err := l.cfg.PreimageCache.AddPreimages( - l.uncommittedPreimages..., - ) + err := l.processRemoteCommitSig(ctx, msg) if err != nil { - l.failf( - LinkFailureError{code: ErrInternalError}, - "unable to add preimages=%v to cache: %v", - l.uncommittedPreimages, err, - ) - return - } - - // Instead of truncating the slice to conserve memory - // allocations, we simply set the uncommitted preimage slice to - // nil so that a new one will be initialized if any more - // witnesses are discovered. We do this because the maximum size - // that the slice can occupy is 15KB, and we want to ensure we - // release that memory back to the runtime. - l.uncommittedPreimages = nil - - // We just received a new updates to our local commitment - // chain, validate this new commitment, closing the link if - // invalid. - auxSigBlob, err := msg.CustomRecords.Serialize() - if err != nil { - l.failf( - LinkFailureError{code: ErrInvalidCommitment}, - "unable to serialize custom records: %v", err, - ) - - return - } - err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{ - CommitSig: msg.CommitSig, - HtlcSigs: msg.HtlcSigs, - PartialSig: msg.PartialSig, - AuxSigBlob: auxSigBlob, - }) - if err != nil { - // If we were unable to reconstruct their proposed - // commitment, then we'll examine the type of error. If - // it's an InvalidCommitSigError, then we'll send a - // direct error. - var sendData []byte - switch err.(type) { - case *lnwallet.InvalidCommitSigError: - sendData = []byte(err.Error()) - case *lnwallet.InvalidHtlcSigError: - sendData = []byte(err.Error()) - } - l.failf( - LinkFailureError{ - code: ErrInvalidCommitment, - FailureAction: LinkFailureForceClose, - SendData: sendData, - }, - "ChannelPoint(%v): unable to accept new "+ - "commitment: %v", - l.channel.ChannelPoint(), err, - ) - return - } - - // As we've just accepted a new state, we'll now - // immediately send the remote peer a revocation for our prior - // state. - nextRevocation, currentHtlcs, finalHTLCs, err := - l.channel.RevokeCurrentCommitment() - if err != nil { - l.log.Errorf("unable to revoke commitment: %v", err) - - // We need to fail the channel in case revoking our - // local commitment does not succeed. We might have - // already advanced our channel state which would lead - // us to proceed with an unclean state. - // - // NOTE: We do not trigger a force close because this - // could resolve itself in case our db was just busy - // not accepting new transactions. - l.failf( - LinkFailureError{ - code: ErrInternalError, - Warning: true, - FailureAction: LinkFailureDisconnect, - }, - "ChannelPoint(%v): unable to accept new "+ - "commitment: %v", - l.channel.ChannelPoint(), err, - ) - return - } - - // As soon as we are ready to send our next revocation, we can - // invoke the incoming commit hooks. - l.RWMutex.Lock() - l.incomingCommitHooks.invoke() - l.RWMutex.Unlock() - - l.cfg.Peer.SendMessage(false, nextRevocation) - - // Notify the incoming htlcs of which the resolutions were - // locked in. - for id, settled := range finalHTLCs { - l.cfg.HtlcNotifier.NotifyFinalHtlcEvent( - models.CircuitKey{ - ChanID: l.ShortChanID(), - HtlcID: id, - }, - channeldb.FinalHtlcInfo{ - Settled: settled, - Offchain: true, - }, - ) - } - - // Since we just revoked our commitment, we may have a new set - // of HTLC's on our commitment, so we'll send them using our - // function closure NotifyContractUpdate. - newUpdate := &contractcourt.ContractUpdate{ - HtlcKey: contractcourt.LocalHtlcSet, - Htlcs: currentHtlcs, - } - err = l.cfg.NotifyContractUpdate(newUpdate) - if err != nil { - l.log.Errorf("unable to notify contract update: %v", + l.log.Errorf("failed to process remote commit sig: %v", err) + return } - select { - case <-l.cg.Done(): - return - default: - } - - // If the remote party initiated the state transition, - // we'll reply with a signature to provide them with their - // version of the latest commitment. Otherwise, both commitment - // chains are fully synced from our PoV, then we don't need to - // reply with a signature as both sides already have a - // commitment with the latest accepted. - if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail(ctx) { - return - } - } - - // If we need to send out an Stfu, this would be the time to do - // so. - if l.noDanglingUpdates(lntypes.Local) { - err = l.quiescer.SendOwedStfu() - if err != nil { - l.stfuFailf("sendOwedStfu: %v", err.Error()) - } - } - - // Now that we have finished processing the incoming CommitSig - // and sent out our RevokeAndAck, we invoke the flushHooks if - // the channel state is clean. - l.RWMutex.Lock() - if l.channel.IsChannelClean() { - l.flushHooks.invoke() - } - l.RWMutex.Unlock() - case *lnwire.RevokeAndAck: // We've received a revocation from the remote chain, if valid, // this moves the remote chain forward, and expands our @@ -4694,3 +4528,176 @@ func (l *channelLink) processRemoteUpdateFailHTLC( return nil } + +// processRemoteCommitSig takes a `CommitSig` msg sent from the remote and +// processes it. +func (l *channelLink) processRemoteCommitSig(ctx context.Context, + msg *lnwire.CommitSig) error { + + // Since we may have learned new preimages for the first time, we'll add + // them to our preimage cache. By doing this, we ensure any contested + // contracts watched by any on-chain arbitrators can now sweep this HTLC + // on-chain. We delay committing the preimages until just before + // accepting the new remote commitment, as afterwards the peer won't + // resend the Settle messages on the next channel reestablishment. Doing + // so allows us to more effectively batch this operation, instead of + // doing a single write per preimage. + err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...) + if err != nil { + l.failf( + LinkFailureError{code: ErrInternalError}, + "unable to add preimages=%v to cache: %v", + l.uncommittedPreimages, err, + ) + + return err + } + + // Instead of truncating the slice to conserve memory allocations, we + // simply set the uncommitted preimage slice to nil so that a new one + // will be initialized if any more witnesses are discovered. We do this + // because the maximum size that the slice can occupy is 15KB, and we + // want to ensure we release that memory back to the runtime. + l.uncommittedPreimages = nil + + // We just received a new updates to our local commitment chain, + // validate this new commitment, closing the link if invalid. + auxSigBlob, err := msg.CustomRecords.Serialize() + if err != nil { + l.failf( + LinkFailureError{code: ErrInvalidCommitment}, + "unable to serialize custom records: %v", err, + ) + + return err + } + err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{ + CommitSig: msg.CommitSig, + HtlcSigs: msg.HtlcSigs, + PartialSig: msg.PartialSig, + AuxSigBlob: auxSigBlob, + }) + if err != nil { + // If we were unable to reconstruct their proposed commitment, + // then we'll examine the type of error. If it's an + // InvalidCommitSigError, then we'll send a direct error. + var sendData []byte + switch err.(type) { + case *lnwallet.InvalidCommitSigError: + sendData = []byte(err.Error()) + case *lnwallet.InvalidHtlcSigError: + sendData = []byte(err.Error()) + } + l.failf( + LinkFailureError{ + code: ErrInvalidCommitment, + FailureAction: LinkFailureForceClose, + SendData: sendData, + }, + "ChannelPoint(%v): unable to accept new "+ + "commitment: %v", + l.channel.ChannelPoint(), err, + ) + + return err + } + + // As we've just accepted a new state, we'll now immediately send the + // remote peer a revocation for our prior state. + nextRevocation, currentHtlcs, finalHTLCs, err := + l.channel.RevokeCurrentCommitment() + if err != nil { + l.log.Errorf("unable to revoke commitment: %v", err) + + // We need to fail the channel in case revoking our local + // commitment does not succeed. We might have already advanced + // our channel state which would lead us to proceed with an + // unclean state. + // + // NOTE: We do not trigger a force close because this could + // resolve itself in case our db was just busy not accepting new + // transactions. + l.failf( + LinkFailureError{ + code: ErrInternalError, + Warning: true, + FailureAction: LinkFailureDisconnect, + }, + "ChannelPoint(%v): unable to accept new "+ + "commitment: %v", + l.channel.ChannelPoint(), err, + ) + + return err + } + + // As soon as we are ready to send our next revocation, we can invoke + // the incoming commit hooks. + l.RWMutex.Lock() + l.incomingCommitHooks.invoke() + l.RWMutex.Unlock() + + l.cfg.Peer.SendMessage(false, nextRevocation) + + // Notify the incoming htlcs of which the resolutions were locked in. + for id, settled := range finalHTLCs { + l.cfg.HtlcNotifier.NotifyFinalHtlcEvent( + models.CircuitKey{ + ChanID: l.ShortChanID(), + HtlcID: id, + }, + channeldb.FinalHtlcInfo{ + Settled: settled, + Offchain: true, + }, + ) + } + + // Since we just revoked our commitment, we may have a new set of HTLC's + // on our commitment, so we'll send them using our function closure + // NotifyContractUpdate. + newUpdate := &contractcourt.ContractUpdate{ + HtlcKey: contractcourt.LocalHtlcSet, + Htlcs: currentHtlcs, + } + err = l.cfg.NotifyContractUpdate(newUpdate) + if err != nil { + return fmt.Errorf("unable to notify contract update: %w", err) + } + + select { + case <-l.cg.Done(): + return nil + default: + } + + // If the remote party initiated the state transition, we'll reply with + // a signature to provide them with their version of the latest + // commitment. Otherwise, both commitment chains are fully synced from + // our PoV, then we don't need to reply with a signature as both sides + // already have a commitment with the latest accepted. + if l.channel.OweCommitment() { + if !l.updateCommitTxOrFail(ctx) { + return nil + } + } + + // If we need to send out an Stfu, this would be the time to do so. + if l.noDanglingUpdates(lntypes.Local) { + err = l.quiescer.SendOwedStfu() + if err != nil { + l.stfuFailf("sendOwedStfu: %v", err.Error()) + } + } + + // Now that we have finished processing the incoming CommitSig and sent + // out our RevokeAndAck, we invoke the flushHooks if the channel state + // is clean. + l.RWMutex.Lock() + if l.channel.IsChannelClean() { + l.flushHooks.invoke() + } + l.RWMutex.Unlock() + + return nil +} From 4017104475a9791312ccf36e8c731fb6a779490c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:44:42 +0800 Subject: [PATCH 13/22] htlcswitch: add `processRemoteRevokeAndAck` --- htlcswitch/link.go | 204 ++++++++++++++++++++++++--------------------- 1 file changed, 107 insertions(+), 97 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 72b4a10e9..2cbcfa0bf 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1961,107 +1961,12 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, } case *lnwire.RevokeAndAck: - // We've received a revocation from the remote chain, if valid, - // this moves the remote chain forward, and expands our - // revocation window. - - // We now process the message and advance our remote commit - // chain. - fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg) + err := l.processRemoteRevokeAndAck(ctx, msg) if err != nil { - // TODO(halseth): force close? - l.failf( - LinkFailureError{ - code: ErrInvalidRevocation, - FailureAction: LinkFailureDisconnect, - }, - "unable to accept revocation: %v", err, - ) + l.log.Errorf("failed to process remote revoke: %v", err) return } - // The remote party now has a new primary commitment, so we'll - // update the contract court to be aware of this new set (the - // prior old remote pending). - newUpdate := &contractcourt.ContractUpdate{ - HtlcKey: contractcourt.RemoteHtlcSet, - Htlcs: remoteHTLCs, - } - err = l.cfg.NotifyContractUpdate(newUpdate) - if err != nil { - l.log.Errorf("unable to notify contract update: %v", - err) - return - } - - select { - case <-l.cg.Done(): - return - default: - } - - // If we have a tower client for this channel type, we'll - // create a backup for the current state. - if l.cfg.TowerClient != nil { - state := l.channel.State() - chanID := l.ChanID() - - err = l.cfg.TowerClient.BackupState( - &chanID, state.RemoteCommitment.CommitHeight-1, - ) - if err != nil { - l.failf(LinkFailureError{ - code: ErrInternalError, - }, "unable to queue breach backup: %v", err) - return - } - } - - // If we can send updates then we can process adds in case we - // are the exit hop and need to send back resolutions, or in - // case there are validity issues with the packets. Otherwise - // we defer the action until resume. - // - // We are free to process the settles and fails without this - // check since processing those can't result in further updates - // to this channel link. - if l.quiescer.CanSendUpdates() { - l.processRemoteAdds(fwdPkg) - } else { - l.quiescer.OnResume(func() { - l.processRemoteAdds(fwdPkg) - }) - } - l.processRemoteSettleFails(fwdPkg) - - // If the link failed during processing the adds, we must - // return to ensure we won't attempted to update the state - // further. - if l.failed { - return - } - - // The revocation window opened up. If there are pending local - // updates, try to update the commit tx. Pending updates could - // already have been present because of a previously failed - // update to the commit tx or freshly added in by - // processRemoteAdds. Also in case there are no local updates, - // but there are still remote updates that are not in the remote - // commit tx yet, send out an update. - if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail(ctx) { - return - } - } - - // Now that we have finished processing the RevokeAndAck, we - // can invoke the flushHooks if the channel state is clean. - l.RWMutex.Lock() - if l.channel.IsChannelClean() { - l.flushHooks.invoke() - } - l.RWMutex.Unlock() - case *lnwire.UpdateFee: // Check and see if their proposed fee-rate would make us // exceed the fee threshold. @@ -4701,3 +4606,108 @@ func (l *channelLink) processRemoteCommitSig(ctx context.Context, return nil } + +// processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and +// processes it. +func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context, + msg *lnwire.RevokeAndAck) error { + + // We've received a revocation from the remote chain, if valid, this + // moves the remote chain forward, and expands our revocation window. + + // We now process the message and advance our remote commit chain. + fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg) + if err != nil { + // TODO(halseth): force close? + l.failf( + LinkFailureError{ + code: ErrInvalidRevocation, + FailureAction: LinkFailureDisconnect, + }, + "unable to accept revocation: %v", err, + ) + + return err + } + + // The remote party now has a new primary commitment, so we'll update + // the contract court to be aware of this new set (the prior old remote + // pending). + newUpdate := &contractcourt.ContractUpdate{ + HtlcKey: contractcourt.RemoteHtlcSet, + Htlcs: remoteHTLCs, + } + err = l.cfg.NotifyContractUpdate(newUpdate) + if err != nil { + return fmt.Errorf("unable to notify contract update: %w", err) + } + + select { + case <-l.cg.Done(): + return nil + default: + } + + // If we have a tower client for this channel type, we'll create a + // backup for the current state. + if l.cfg.TowerClient != nil { + state := l.channel.State() + chanID := l.ChanID() + + err = l.cfg.TowerClient.BackupState( + &chanID, state.RemoteCommitment.CommitHeight-1, + ) + if err != nil { + l.failf(LinkFailureError{ + code: ErrInternalError, + }, "unable to queue breach backup: %v", err) + + return err + } + } + + // If we can send updates then we can process adds in case we are the + // exit hop and need to send back resolutions, or in case there are + // validity issues with the packets. Otherwise we defer the action until + // resume. + // + // We are free to process the settles and fails without this check since + // processing those can't result in further updates to this channel + // link. + if l.quiescer.CanSendUpdates() { + l.processRemoteAdds(fwdPkg) + } else { + l.quiescer.OnResume(func() { + l.processRemoteAdds(fwdPkg) + }) + } + l.processRemoteSettleFails(fwdPkg) + + // If the link failed during processing the adds, we must return to + // ensure we won't attempted to update the state further. + if l.failed { + return nil + } + + // The revocation window opened up. If there are pending local updates, + // try to update the commit tx. Pending updates could already have been + // present because of a previously failed update to the commit tx or + // freshly added in by processRemoteAdds. Also in case there are no + // local updates, but there are still remote updates that are not in the + // remote commit tx yet, send out an update. + if l.channel.OweCommitment() { + if !l.updateCommitTxOrFail(ctx) { + return nil + } + } + + // Now that we have finished processing the RevokeAndAck, we can invoke + // the flushHooks if the channel state is clean. + l.RWMutex.Lock() + if l.channel.IsChannelClean() { + l.flushHooks.invoke() + } + l.RWMutex.Unlock() + + return nil +} From 87b490dfc4e817973450a3659b2042dcedc6029b Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:48:52 +0800 Subject: [PATCH 14/22] htlcswitch: add `processRemoteError` and `processRemoteUpdateFee` --- htlcswitch/link.go | 116 ++++++++++++++++++++++++++------------------- 1 file changed, 66 insertions(+), 50 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 2cbcfa0bf..759095847 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1903,8 +1903,6 @@ func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { // handleUpstreamMsg processes wire messages related to commitment state // updates from the upstream peer. The upstream peer is the peer whom we have a // direct channel with, updating our respective commitment chains. -// -//nolint:funlen func (l *channelLink) handleUpstreamMsg(ctx context.Context, msg lnwire.Message) { @@ -1968,41 +1966,14 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, } case *lnwire.UpdateFee: - // Check and see if their proposed fee-rate would make us - // exceed the fee threshold. - fee := chainfee.SatPerKWeight(msg.FeePerKw) - - isDust, err := l.exceedsFeeExposureLimit(fee) + err := l.processRemoteUpdateFee(msg) if err != nil { - // This shouldn't typically happen. If it does, it - // indicates something is wrong with our channel state. - l.log.Errorf("Unable to determine if fee threshold " + - "exceeded") - l.failf(LinkFailureError{code: ErrInternalError}, - "error calculating fee exposure: %v", err) + l.log.Errorf("failed to process remote update fee: %v", + err) return } - if isDust { - // The proposed fee-rate makes us exceed the fee - // threshold. - l.failf(LinkFailureError{code: ErrInternalError}, - "fee threshold exceeded: %v", err) - return - } - - // We received fee update from peer. If we are the initiator we - // will fail the channel, if not we will apply the update. - if err := l.channel.ReceiveUpdateFee(fee); err != nil { - l.failf(LinkFailureError{code: ErrInvalidUpdate}, - "error receiving fee update: %v", err) - return - } - - // Update the mailbox's feerate as well. - l.mailBox.SetFeeRate(fee) - case *lnwire.Stfu: err := l.handleStfu(msg) if err != nil { @@ -2017,28 +1988,11 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, msg.Warning()) case *lnwire.Error: - // Error received from remote, MUST fail channel, but should - // only print the contents of the error message if all - // characters are printable ASCII. - l.failf( - LinkFailureError{ - code: ErrRemoteError, + l.processRemoteError(msg) - // TODO(halseth): we currently don't fail the - // channel permanently, as there are some sync - // issues with other implementations that will - // lead to them sending an error message, but - // we can recover from on next connection. See - // https://github.com/ElementsProject/lightning/issues/4212 - PermanentFailure: false, - }, - "ChannelPoint(%v): received error from peer: %v", - l.channel.ChannelPoint(), msg.Error(), - ) default: l.log.Warnf("received unknown message of type %T", msg) } - } // handleStfu implements the top-level logic for handling the Stfu message from @@ -4711,3 +4665,65 @@ func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context, return nil } + +// processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and +// processes it. +func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error { + // Check and see if their proposed fee-rate would make us exceed the fee + // threshold. + fee := chainfee.SatPerKWeight(msg.FeePerKw) + + isDust, err := l.exceedsFeeExposureLimit(fee) + if err != nil { + // This shouldn't typically happen. If it does, it indicates + // something is wrong with our channel state. + l.log.Errorf("Unable to determine if fee threshold " + + "exceeded") + l.failf(LinkFailureError{code: ErrInternalError}, + "error calculating fee exposure: %v", err) + + return err + } + + if isDust { + // The proposed fee-rate makes us exceed the fee threshold. + l.failf(LinkFailureError{code: ErrInternalError}, + "fee threshold exceeded: %v", err) + return err + } + + // We received fee update from peer. If we are the initiator we will + // fail the channel, if not we will apply the update. + if err := l.channel.ReceiveUpdateFee(fee); err != nil { + l.failf(LinkFailureError{code: ErrInvalidUpdate}, + "error receiving fee update: %v", err) + return err + } + + // Update the mailbox's feerate as well. + l.mailBox.SetFeeRate(fee) + + return nil +} + +// processRemoteError takes an `Error` msg sent from the remote and fails the +// channel link. +func (l *channelLink) processRemoteError(msg *lnwire.Error) { + // Error received from remote, MUST fail channel, but should only print + // the contents of the error message if all characters are printable + // ASCII. + l.failf( + // TODO(halseth): we currently don't fail the channel + // permanently, as there are some sync issues with other + // implementations that will lead to them sending an + // error message, but we can recover from on next + // connection. See + // https://github.com/ElementsProject/lightning/issues/4212 + LinkFailureError{ + code: ErrRemoteError, + PermanentFailure: false, + }, + "ChannelPoint(%v): received error from peer: %v", + l.channel.ChannelPoint(), msg.Error(), + ) +} From c42bf9b54530e935de84375bd6f856beeeb5aa4e Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 2 Jul 2025 21:47:00 +0800 Subject: [PATCH 15/22] htlcswitch: log err in the end in `handleUpstreamMsg` --- htlcswitch/link.go | 59 ++++++++++++---------------------------------- 1 file changed, 15 insertions(+), 44 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 759095847..ac880a7f4 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1916,66 +1916,32 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, return } + var err error + switch msg := msg.(type) { case *lnwire.UpdateAddHTLC: - err := l.processRemoteUpdateAddHTLC(msg) - if err != nil { - l.log.Errorf("failed to process remote ADD: %v", err) - return - } + err = l.processRemoteUpdateAddHTLC(msg) case *lnwire.UpdateFulfillHTLC: - err := l.processRemoteUpdateFulfillHTLC(msg) - if err != nil { - l.log.Errorf("failed to process remote fulfill: %v", - err) - - return - } + err = l.processRemoteUpdateFulfillHTLC(msg) case *lnwire.UpdateFailMalformedHTLC: - err := l.processRemoteUpdateFailMalformedHTLC(msg) - if err != nil { - l.log.Errorf("failed to process remote fail "+ - "malformed: %v", err) - - return - } + err = l.processRemoteUpdateFailMalformedHTLC(msg) case *lnwire.UpdateFailHTLC: - err := l.processRemoteUpdateFailHTLC(msg) - if err != nil { - l.log.Errorf("failed to process remote fail: %v", err) - return - } + err = l.processRemoteUpdateFailHTLC(msg) case *lnwire.CommitSig: - err := l.processRemoteCommitSig(ctx, msg) - if err != nil { - l.log.Errorf("failed to process remote commit sig: %v", - err) - - return - } + err = l.processRemoteCommitSig(ctx, msg) case *lnwire.RevokeAndAck: - err := l.processRemoteRevokeAndAck(ctx, msg) - if err != nil { - l.log.Errorf("failed to process remote revoke: %v", err) - return - } + err = l.processRemoteRevokeAndAck(ctx, msg) case *lnwire.UpdateFee: - err := l.processRemoteUpdateFee(msg) - if err != nil { - l.log.Errorf("failed to process remote update fee: %v", - err) - - return - } + err = l.processRemoteUpdateFee(msg) case *lnwire.Stfu: - err := l.handleStfu(msg) + err = l.handleStfu(msg) if err != nil { l.stfuFailf("handleStfu: %v", err.Error()) } @@ -1993,6 +1959,11 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, default: l.log.Warnf("received unknown message of type %T", msg) } + + if err != nil { + l.log.Errorf("failed to process remote %v: %v", msg.MsgType(), + err) + } } // handleStfu implements the top-level logic for handling the Stfu message from From 765714e750fa8864dc27872ba8fd79eecfcc90f4 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 22:33:14 +0800 Subject: [PATCH 16/22] htlcswitch: add `processLocalUpdateFulfillHTLC` --- htlcswitch/link.go | 130 ++++++++++++++++++++++----------------------- 1 file changed, 65 insertions(+), 65 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index ac880a7f4..bf2c0e266 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1667,71 +1667,7 @@ func (l *channelLink) handleDownstreamPkt(ctx context.Context, _ = l.handleDownstreamUpdateAdd(ctx, pkt) case *lnwire.UpdateFulfillHTLC: - // If hodl.SettleOutgoing mode is active, we exit early to - // simulate arbitrary delays between the switch adding the - // SETTLE to the mailbox, and the HTLC being added to the - // commitment state. - if l.cfg.HodlMask.Active(hodl.SettleOutgoing) { - l.log.Warnf(hodl.SettleOutgoing.Warning()) - l.mailBox.AckPacket(pkt.inKey()) - return - } - - // An HTLC we forward to the switch has just settled somewhere - // upstream. Therefore we settle the HTLC within the our local - // state machine. - inKey := pkt.inKey() - err := l.channel.SettleHTLC( - htlc.PaymentPreimage, - pkt.incomingHTLCID, - pkt.sourceRef, - pkt.destRef, - &inKey, - ) - if err != nil { - l.log.Errorf("unable to settle incoming HTLC for "+ - "circuit-key=%v: %v", inKey, err) - - // If the HTLC index for Settle response was not known - // to our commitment state, it has already been - // cleaned up by a prior response. We'll thus try to - // clean up any lingering state to ensure we don't - // continue reforwarding. - if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { - l.cleanupSpuriousResponse(pkt) - } - - // Remove the packet from the link's mailbox to ensure - // it doesn't get replayed after a reconnection. - l.mailBox.AckPacket(inKey) - - return - } - - l.log.Debugf("queueing removal of SETTLE closed circuit: "+ - "%s->%s", pkt.inKey(), pkt.outKey()) - - l.closedCircuits = append(l.closedCircuits, pkt.inKey()) - - // With the HTLC settled, we'll need to populate the wire - // message to target the specific channel and HTLC to be - // canceled. - htlc.ChanID = l.ChanID() - htlc.ID = pkt.incomingHTLCID - - // Then we send the HTLC settle message to the connected peer - // so we can continue the propagation of the settle message. - l.cfg.Peer.SendMessage(false, htlc) - - // Send a settle event notification to htlcNotifier. - l.cfg.HtlcNotifier.NotifySettleEvent( - newHtlcKey(pkt), - htlc.PaymentPreimage, - getEventType(pkt), - ) - - // Immediately update the commitment tx to minimize latency. - l.updateCommitTxOrFail(ctx) + l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc) case *lnwire.UpdateFailHTLC: // If hodl.FailOutgoing mode is active, we exit early to @@ -4698,3 +4634,67 @@ func (l *channelLink) processRemoteError(msg *lnwire.Error) { l.channel.ChannelPoint(), msg.Error(), ) } + +// processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and +// processes it. +func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context, + pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) { + + // If hodl.SettleOutgoing mode is active, we exit early to simulate + // arbitrary delays between the switch adding the SETTLE to the mailbox, + // and the HTLC being added to the commitment state. + if l.cfg.HodlMask.Active(hodl.SettleOutgoing) { + l.log.Warnf(hodl.SettleOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) + + return + } + + // An HTLC we forward to the switch has just settled somewhere upstream. + // Therefore we settle the HTLC within the our local state machine. + inKey := pkt.inKey() + err := l.channel.SettleHTLC( + htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef, + pkt.destRef, &inKey, + ) + if err != nil { + l.log.Errorf("unable to settle incoming HTLC for "+ + "circuit-key=%v: %v", inKey, err) + + // If the HTLC index for Settle response was not known to our + // commitment state, it has already been cleaned up by a prior + // response. We'll thus try to clean up any lingering state to + // ensure we don't continue reforwarding. + if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { + l.cleanupSpuriousResponse(pkt) + } + + // Remove the packet from the link's mailbox to ensure it + // doesn't get replayed after a reconnection. + l.mailBox.AckPacket(inKey) + + return + } + + l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s", + pkt.inKey(), pkt.outKey()) + + l.closedCircuits = append(l.closedCircuits, pkt.inKey()) + + // With the HTLC settled, we'll need to populate the wire message to + // target the specific channel and HTLC to be canceled. + htlc.ChanID = l.ChanID() + htlc.ID = pkt.incomingHTLCID + + // Then we send the HTLC settle message to the connected peer so we can + // continue the propagation of the settle message. + l.cfg.Peer.SendMessage(false, htlc) + + // Send a settle event notification to htlcNotifier. + l.cfg.HtlcNotifier.NotifySettleEvent( + newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt), + ) + + // Immediately update the commitment tx to minimize latency. + l.updateCommitTxOrFail(ctx) +} From 31e166a6b3dfa724d6586e10baf35277501e5f02 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 22:36:10 +0800 Subject: [PATCH 17/22] htlcswitch: add `processLocalUpdateFaillHTLC` --- htlcswitch/link.go | 170 ++++++++++++++++++++++----------------------- 1 file changed, 82 insertions(+), 88 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index bf2c0e266..209203411 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1670,94 +1670,7 @@ func (l *channelLink) handleDownstreamPkt(ctx context.Context, l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc) case *lnwire.UpdateFailHTLC: - // If hodl.FailOutgoing mode is active, we exit early to - // simulate arbitrary delays between the switch adding a FAIL to - // the mailbox, and the HTLC being added to the commitment - // state. - if l.cfg.HodlMask.Active(hodl.FailOutgoing) { - l.log.Warnf(hodl.FailOutgoing.Warning()) - l.mailBox.AckPacket(pkt.inKey()) - return - } - - // An HTLC cancellation has been triggered somewhere upstream, - // we'll remove then HTLC from our local state machine. - inKey := pkt.inKey() - err := l.channel.FailHTLC( - pkt.incomingHTLCID, - htlc.Reason, - pkt.sourceRef, - pkt.destRef, - &inKey, - ) - if err != nil { - l.log.Errorf("unable to cancel incoming HTLC for "+ - "circuit-key=%v: %v", inKey, err) - - // If the HTLC index for Fail response was not known to - // our commitment state, it has already been cleaned up - // by a prior response. We'll thus try to clean up any - // lingering state to ensure we don't continue - // reforwarding. - if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { - l.cleanupSpuriousResponse(pkt) - } - - // Remove the packet from the link's mailbox to ensure - // it doesn't get replayed after a reconnection. - l.mailBox.AckPacket(inKey) - - return - } - - l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s", - pkt.inKey(), pkt.outKey()) - - l.closedCircuits = append(l.closedCircuits, pkt.inKey()) - - // With the HTLC removed, we'll need to populate the wire - // message to target the specific channel and HTLC to be - // canceled. The "Reason" field will have already been set - // within the switch. - htlc.ChanID = l.ChanID() - htlc.ID = pkt.incomingHTLCID - - // We send the HTLC message to the peer which initially created - // the HTLC. If the incoming blinding point is non-nil, we - // know that we are a relaying node in a blinded path. - // Otherwise, we're either an introduction node or not part of - // a blinded path at all. - if err := l.sendIncomingHTLCFailureMsg( - htlc.ID, - pkt.obfuscator, - htlc.Reason, - ); err != nil { - l.log.Errorf("unable to send HTLC failure: %v", - err) - - return - } - - // If the packet does not have a link failure set, it failed - // further down the route so we notify a forwarding failure. - // Otherwise, we notify a link failure because it failed at our - // node. - if pkt.linkFailure != nil { - l.cfg.HtlcNotifier.NotifyLinkFailEvent( - newHtlcKey(pkt), - newHtlcInfo(pkt), - getEventType(pkt), - pkt.linkFailure, - false, - ) - } else { - l.cfg.HtlcNotifier.NotifyForwardingFailEvent( - newHtlcKey(pkt), getEventType(pkt), - ) - } - - // Immediately update the commitment tx to minimize latency. - l.updateCommitTxOrFail(ctx) + l.processLocalUpdateFailHTLC(ctx, pkt, htlc) } } @@ -4698,3 +4611,84 @@ func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context, // Immediately update the commitment tx to minimize latency. l.updateCommitTxOrFail(ctx) } + +// processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and +// processes it. +func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context, + pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) { + + // If hodl.FailOutgoing mode is active, we exit early to simulate + // arbitrary delays between the switch adding a FAIL to the mailbox, and + // the HTLC being added to the commitment state. + if l.cfg.HodlMask.Active(hodl.FailOutgoing) { + l.log.Warnf(hodl.FailOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) + + return + } + + // An HTLC cancellation has been triggered somewhere upstream, we'll + // remove then HTLC from our local state machine. + inKey := pkt.inKey() + err := l.channel.FailHTLC( + pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef, + &inKey, + ) + if err != nil { + l.log.Errorf("unable to cancel incoming HTLC for "+ + "circuit-key=%v: %v", inKey, err) + + // If the HTLC index for Fail response was not known to our + // commitment state, it has already been cleaned up by a prior + // response. We'll thus try to clean up any lingering state to + // ensure we don't continue reforwarding. + if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { + l.cleanupSpuriousResponse(pkt) + } + + // Remove the packet from the link's mailbox to ensure it + // doesn't get replayed after a reconnection. + l.mailBox.AckPacket(inKey) + + return + } + + l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s", + pkt.inKey(), pkt.outKey()) + + l.closedCircuits = append(l.closedCircuits, pkt.inKey()) + + // With the HTLC removed, we'll need to populate the wire message to + // target the specific channel and HTLC to be canceled. The "Reason" + // field will have already been set within the switch. + htlc.ChanID = l.ChanID() + htlc.ID = pkt.incomingHTLCID + + // We send the HTLC message to the peer which initially created the + // HTLC. If the incoming blinding point is non-nil, we know that we are + // a relaying node in a blinded path. Otherwise, we're either an + // introduction node or not part of a blinded path at all. + err = l.sendIncomingHTLCFailureMsg(htlc.ID, pkt.obfuscator, htlc.Reason) + if err != nil { + l.log.Errorf("unable to send HTLC failure: %v", err) + + return + } + + // If the packet does not have a link failure set, it failed further + // down the route so we notify a forwarding failure. Otherwise, we + // notify a link failure because it failed at our node. + if pkt.linkFailure != nil { + l.cfg.HtlcNotifier.NotifyLinkFailEvent( + newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt), + pkt.linkFailure, false, + ) + } else { + l.cfg.HtlcNotifier.NotifyForwardingFailEvent( + newHtlcKey(pkt), getEventType(pkt), + ) + } + + // Immediately update the commitment tx to minimize latency. + l.updateCommitTxOrFail(ctx) +} From 2bb8b90afd05135aa3af1a25028f0f2b95fd6b87 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 22:41:20 +0800 Subject: [PATCH 18/22] htlcswitch: fix `error not checked` As required by the linter. --- htlcswitch/link.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 209203411..3014c7979 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -3918,7 +3918,10 @@ func (l *channelLink) resumeLink(ctx context.Context) error { // ensure that all acknowledgments that occur during channel // resynchronization have taken affect, causing us only to pull unacked // packets after starting to read from the downstream mailbox. - l.mailBox.ResetPackets() + err := l.mailBox.ResetPackets() + if err != nil { + l.log.Errorf("failed to reset packets: %v", err) + } // If the channel is pending, there's no need to reforwarding packets. if l.ShortChanID() == hop.Source { @@ -3931,7 +3934,7 @@ func (l *channelLink) resumeLink(ctx context.Context) error { // only attempt to resolve packages if our short chan id indicates that // the channel is not pending, otherwise we should have no htlcs to // reforward. - err := l.resolveFwdPkgs(ctx) + err = l.resolveFwdPkgs(ctx) switch err { // No error was encountered, success. case nil: @@ -4316,7 +4319,10 @@ func (l *channelLink) processRemoteCommitSig(ctx context.Context, l.incomingCommitHooks.invoke() l.RWMutex.Unlock() - l.cfg.Peer.SendMessage(false, nextRevocation) + err = l.cfg.Peer.SendMessage(false, nextRevocation) + if err != nil { + l.log.Errorf("failed to send RevokeAndAck: %v", err) + } // Notify the incoming htlcs of which the resolutions were locked in. for id, settled := range finalHTLCs { @@ -4601,7 +4607,10 @@ func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context, // Then we send the HTLC settle message to the connected peer so we can // continue the propagation of the settle message. - l.cfg.Peer.SendMessage(false, htlc) + err = l.cfg.Peer.SendMessage(false, htlc) + if err != nil { + l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err) + } // Send a settle event notification to htlcNotifier. l.cfg.HtlcNotifier.NotifySettleEvent( From 1b6ca8827e542797cceae71cf27e1e6fb500e55c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 22:43:20 +0800 Subject: [PATCH 19/22] htlcswitch: fix linters Fix a few linter errors - these were not caught before as they were configured to be ignored. --- htlcswitch/link.go | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 3014c7979..adaeafaf4 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1920,13 +1920,13 @@ func (l *channelLink) ackDownStreamPackets() error { // the link. func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool { err := l.updateCommitTx(ctx) - switch err { + switch { // No error encountered, success. - case nil: + case err == nil: // A duplicate keystone error should be resolved and is not fatal, so // we won't send an Error message to the peer. - case ErrDuplicateKeystone: + case errors.Is(err, ErrDuplicateKeystone): l.failf(LinkFailureError{code: ErrCircuitError}, "temporary circuit error: %v", err) return false @@ -3761,12 +3761,12 @@ func (l *channelLink) handleHtlcResolution(ctx context.Context, // No error, success. if err == nil { return nil - } - switch err { + + switch { // If the duplicate keystone error was encountered, fail back // gracefully. - case ErrDuplicateKeystone: + case errors.Is(err, ErrDuplicateKeystone): l.failf( LinkFailureError{ code: ErrCircuitError, @@ -3935,9 +3935,9 @@ func (l *channelLink) resumeLink(ctx context.Context) error { // the channel is not pending, otherwise we should have no htlcs to // reforward. err = l.resolveFwdPkgs(ctx) - switch err { + switch { // No error was encountered, success. - case nil: + case err == nil: // With our link's in-memory state fully reconstructed, spawn a // goroutine to manage the reclamation of disk space occupied by // completed forwarding packages. @@ -3946,9 +3946,9 @@ func (l *channelLink) resumeLink(ctx context.Context) error { return nil - // If the duplicate keystone error was encountered, we'll fail - // without sending an Error message to the peer. - case ErrDuplicateKeystone: + // If the duplicate keystone error was encountered, we'll fail without + // sending an Error message to the peer. + case errors.Is(err, ErrDuplicateKeystone): l.failf(LinkFailureError{code: ErrCircuitError}, "temporary circuit error: %v", err) @@ -4264,10 +4264,10 @@ func (l *channelLink) processRemoteCommitSig(ctx context.Context, // then we'll examine the type of error. If it's an // InvalidCommitSigError, then we'll send a direct error. var sendData []byte - switch err.(type) { - case *lnwallet.InvalidCommitSigError: + switch { + case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err): sendData = []byte(err.Error()) - case *lnwallet.InvalidHtlcSigError: + case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err): sendData = []byte(err.Error()) } l.failf( @@ -4315,9 +4315,9 @@ func (l *channelLink) processRemoteCommitSig(ctx context.Context, // As soon as we are ready to send our next revocation, we can invoke // the incoming commit hooks. - l.RWMutex.Lock() + l.Lock() l.incomingCommitHooks.invoke() - l.RWMutex.Unlock() + l.Unlock() err = l.cfg.Peer.SendMessage(false, nextRevocation) if err != nil { @@ -4378,11 +4378,11 @@ func (l *channelLink) processRemoteCommitSig(ctx context.Context, // Now that we have finished processing the incoming CommitSig and sent // out our RevokeAndAck, we invoke the flushHooks if the channel state // is clean. - l.RWMutex.Lock() + l.Lock() if l.channel.IsChannelClean() { l.flushHooks.invoke() } - l.RWMutex.Unlock() + l.Unlock() return nil } @@ -4483,11 +4483,11 @@ func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context, // Now that we have finished processing the RevokeAndAck, we can invoke // the flushHooks if the channel state is clean. - l.RWMutex.Lock() + l.Lock() if l.channel.IsChannelClean() { l.flushHooks.invoke() } - l.RWMutex.Unlock() + l.Unlock() return nil } @@ -4584,7 +4584,7 @@ func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context, // commitment state, it has already been cleaned up by a prior // response. We'll thus try to clean up any lingering state to // ensure we don't continue reforwarding. - if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { + if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) { l.cleanupSpuriousResponse(pkt) } @@ -4651,7 +4651,7 @@ func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context, // commitment state, it has already been cleaned up by a prior // response. We'll thus try to clean up any lingering state to // ensure we don't continue reforwarding. - if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok { + if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) { l.cleanupSpuriousResponse(pkt) } From d9369ae1a8dc215f3dc5aa65f08cfb7ca9b435a6 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 1 Jul 2025 20:25:38 +0800 Subject: [PATCH 20/22] htlcswitch: remove unnecessary `prand.Seed` --- htlcswitch/link.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index adaeafaf4..206b8db60 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -36,10 +36,6 @@ import ( "github.com/lightningnetwork/lnd/tlv" ) -func init() { - prand.Seed(time.Now().UnixNano()) -} - const ( // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that // the node accepts for forwarded payments. The value is relative to the From d6c3400e4e8f404bfb497b1ab3a4d0162f7159f6 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 1 Jul 2025 20:24:37 +0800 Subject: [PATCH 21/22] docs: update release notes --- docs/release-notes/release-notes-0.20.0.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/release-notes/release-notes-0.20.0.md b/docs/release-notes/release-notes-0.20.0.md index ee36d7c8f..4d64a7ec6 100644 --- a/docs/release-notes/release-notes-0.20.0.md +++ b/docs/release-notes/release-notes-0.20.0.md @@ -109,6 +109,9 @@ circuit. The indices are only available for forwarding events saved after v0.20. for payments. Now the payment address is mandatory for the writer and reader of a payment request. +- [Refactored](https://github.com/lightningnetwork/lnd/pull/10018) `channelLink` + to improve readability and maintainability of the code. + ## Breaking Changes ## Performance Improvements From 240593a3994c88cf182a1192e184918fa4824d01 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 2 Jul 2025 16:23:14 +0800 Subject: [PATCH 22/22] htlcswitch: log the error returned from `SendMessage` --- htlcswitch/link.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 206b8db60..a677df1a5 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1018,7 +1018,11 @@ func (l *channelLink) syncChanStates(ctx context.Context) error { // immediately so we return to a synchronized state as soon as // possible. for _, msg := range msgsToReSend { - l.cfg.Peer.SendMessage(false, msg) + err := l.cfg.Peer.SendMessage(false, msg) + if err != nil { + l.log.Errorf("failed to send %v: %v", + msg.MsgType(), err) + } } case <-l.cg.Done(): @@ -1619,7 +1623,10 @@ func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context, l.openedCircuits = append(l.openedCircuits, pkt.inKey()) l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) - _ = l.cfg.Peer.SendMessage(false, htlc) + err = l.cfg.Peer.SendMessage(false, htlc) + if err != nil { + l.log.Errorf("failed to send UpdateAddHTLC: %v", err) + } // Send a forward event notification to htlcNotifier. l.cfg.HtlcNotifier.NotifyForwardingEvent( @@ -2021,7 +2028,10 @@ func (l *channelLink) updateCommitTx(ctx context.Context) error { PartialSig: newCommit.PartialSig, CustomRecords: auxBlobRecords, } - l.cfg.Peer.SendMessage(false, commitSig) + err = l.cfg.Peer.SendMessage(false, commitSig) + if err != nil { + l.log.Errorf("failed to send CommitSig: %v", err) + } // Now that we have sent out a new CommitSig, we invoke the outgoing set // of commit hooks. @@ -3485,11 +3495,14 @@ func (l *channelLink) settleHTLC(preimage lntypes.Preimage, // HTLC was successfully settled locally send notification about it // remote peer. - l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{ + err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{ ChanID: l.ChanID(), ID: htlcIndex, PaymentPreimage: preimage, }) + if err != nil { + l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err) + } // Once we have successfully settled the htlc, notify a settle event. l.cfg.HtlcNotifier.NotifySettleEvent( @@ -3684,12 +3697,15 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64, return } - l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{ + err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{ ChanID: l.ChanID(), ID: htlcIndex, ShaOnionBlob: shaOnionBlob, FailureCode: code, }) + if err != nil { + l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err) + } } // failf is a function which is used to encapsulate the action necessary for