From 04abf96f60fd564061f06bc2da4d54da19396dce Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 30 Jun 2025 21:41:41 +0800 Subject: [PATCH] htlcswitch: add `processRemoteCommitSig` --- htlcswitch/link.go | 345 +++++++++++++++++++++++---------------------- 1 file changed, 176 insertions(+), 169 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 444ff82a1..72b4a10e9 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1952,180 +1952,14 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context, } case *lnwire.CommitSig: - // Since we may have learned new preimages for the first time, - // we'll add them to our preimage cache. By doing this, we - // ensure any contested contracts watched by any on-chain - // arbitrators can now sweep this HTLC on-chain. We delay - // committing the preimages until just before accepting the new - // remote commitment, as afterwards the peer won't resend the - // Settle messages on the next channel reestablishment. Doing so - // allows us to more effectively batch this operation, instead - // of doing a single write per preimage. - err := l.cfg.PreimageCache.AddPreimages( - l.uncommittedPreimages..., - ) + err := l.processRemoteCommitSig(ctx, msg) if err != nil { - l.failf( - LinkFailureError{code: ErrInternalError}, - "unable to add preimages=%v to cache: %v", - l.uncommittedPreimages, err, - ) - return - } - - // Instead of truncating the slice to conserve memory - // allocations, we simply set the uncommitted preimage slice to - // nil so that a new one will be initialized if any more - // witnesses are discovered. We do this because the maximum size - // that the slice can occupy is 15KB, and we want to ensure we - // release that memory back to the runtime. - l.uncommittedPreimages = nil - - // We just received a new updates to our local commitment - // chain, validate this new commitment, closing the link if - // invalid. - auxSigBlob, err := msg.CustomRecords.Serialize() - if err != nil { - l.failf( - LinkFailureError{code: ErrInvalidCommitment}, - "unable to serialize custom records: %v", err, - ) - - return - } - err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{ - CommitSig: msg.CommitSig, - HtlcSigs: msg.HtlcSigs, - PartialSig: msg.PartialSig, - AuxSigBlob: auxSigBlob, - }) - if err != nil { - // If we were unable to reconstruct their proposed - // commitment, then we'll examine the type of error. If - // it's an InvalidCommitSigError, then we'll send a - // direct error. - var sendData []byte - switch err.(type) { - case *lnwallet.InvalidCommitSigError: - sendData = []byte(err.Error()) - case *lnwallet.InvalidHtlcSigError: - sendData = []byte(err.Error()) - } - l.failf( - LinkFailureError{ - code: ErrInvalidCommitment, - FailureAction: LinkFailureForceClose, - SendData: sendData, - }, - "ChannelPoint(%v): unable to accept new "+ - "commitment: %v", - l.channel.ChannelPoint(), err, - ) - return - } - - // As we've just accepted a new state, we'll now - // immediately send the remote peer a revocation for our prior - // state. - nextRevocation, currentHtlcs, finalHTLCs, err := - l.channel.RevokeCurrentCommitment() - if err != nil { - l.log.Errorf("unable to revoke commitment: %v", err) - - // We need to fail the channel in case revoking our - // local commitment does not succeed. We might have - // already advanced our channel state which would lead - // us to proceed with an unclean state. - // - // NOTE: We do not trigger a force close because this - // could resolve itself in case our db was just busy - // not accepting new transactions. - l.failf( - LinkFailureError{ - code: ErrInternalError, - Warning: true, - FailureAction: LinkFailureDisconnect, - }, - "ChannelPoint(%v): unable to accept new "+ - "commitment: %v", - l.channel.ChannelPoint(), err, - ) - return - } - - // As soon as we are ready to send our next revocation, we can - // invoke the incoming commit hooks. - l.RWMutex.Lock() - l.incomingCommitHooks.invoke() - l.RWMutex.Unlock() - - l.cfg.Peer.SendMessage(false, nextRevocation) - - // Notify the incoming htlcs of which the resolutions were - // locked in. - for id, settled := range finalHTLCs { - l.cfg.HtlcNotifier.NotifyFinalHtlcEvent( - models.CircuitKey{ - ChanID: l.ShortChanID(), - HtlcID: id, - }, - channeldb.FinalHtlcInfo{ - Settled: settled, - Offchain: true, - }, - ) - } - - // Since we just revoked our commitment, we may have a new set - // of HTLC's on our commitment, so we'll send them using our - // function closure NotifyContractUpdate. - newUpdate := &contractcourt.ContractUpdate{ - HtlcKey: contractcourt.LocalHtlcSet, - Htlcs: currentHtlcs, - } - err = l.cfg.NotifyContractUpdate(newUpdate) - if err != nil { - l.log.Errorf("unable to notify contract update: %v", + l.log.Errorf("failed to process remote commit sig: %v", err) + return } - select { - case <-l.cg.Done(): - return - default: - } - - // If the remote party initiated the state transition, - // we'll reply with a signature to provide them with their - // version of the latest commitment. Otherwise, both commitment - // chains are fully synced from our PoV, then we don't need to - // reply with a signature as both sides already have a - // commitment with the latest accepted. - if l.channel.OweCommitment() { - if !l.updateCommitTxOrFail(ctx) { - return - } - } - - // If we need to send out an Stfu, this would be the time to do - // so. - if l.noDanglingUpdates(lntypes.Local) { - err = l.quiescer.SendOwedStfu() - if err != nil { - l.stfuFailf("sendOwedStfu: %v", err.Error()) - } - } - - // Now that we have finished processing the incoming CommitSig - // and sent out our RevokeAndAck, we invoke the flushHooks if - // the channel state is clean. - l.RWMutex.Lock() - if l.channel.IsChannelClean() { - l.flushHooks.invoke() - } - l.RWMutex.Unlock() - case *lnwire.RevokeAndAck: // We've received a revocation from the remote chain, if valid, // this moves the remote chain forward, and expands our @@ -4694,3 +4528,176 @@ func (l *channelLink) processRemoteUpdateFailHTLC( return nil } + +// processRemoteCommitSig takes a `CommitSig` msg sent from the remote and +// processes it. +func (l *channelLink) processRemoteCommitSig(ctx context.Context, + msg *lnwire.CommitSig) error { + + // Since we may have learned new preimages for the first time, we'll add + // them to our preimage cache. By doing this, we ensure any contested + // contracts watched by any on-chain arbitrators can now sweep this HTLC + // on-chain. We delay committing the preimages until just before + // accepting the new remote commitment, as afterwards the peer won't + // resend the Settle messages on the next channel reestablishment. Doing + // so allows us to more effectively batch this operation, instead of + // doing a single write per preimage. + err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...) + if err != nil { + l.failf( + LinkFailureError{code: ErrInternalError}, + "unable to add preimages=%v to cache: %v", + l.uncommittedPreimages, err, + ) + + return err + } + + // Instead of truncating the slice to conserve memory allocations, we + // simply set the uncommitted preimage slice to nil so that a new one + // will be initialized if any more witnesses are discovered. We do this + // because the maximum size that the slice can occupy is 15KB, and we + // want to ensure we release that memory back to the runtime. + l.uncommittedPreimages = nil + + // We just received a new updates to our local commitment chain, + // validate this new commitment, closing the link if invalid. + auxSigBlob, err := msg.CustomRecords.Serialize() + if err != nil { + l.failf( + LinkFailureError{code: ErrInvalidCommitment}, + "unable to serialize custom records: %v", err, + ) + + return err + } + err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{ + CommitSig: msg.CommitSig, + HtlcSigs: msg.HtlcSigs, + PartialSig: msg.PartialSig, + AuxSigBlob: auxSigBlob, + }) + if err != nil { + // If we were unable to reconstruct their proposed commitment, + // then we'll examine the type of error. If it's an + // InvalidCommitSigError, then we'll send a direct error. + var sendData []byte + switch err.(type) { + case *lnwallet.InvalidCommitSigError: + sendData = []byte(err.Error()) + case *lnwallet.InvalidHtlcSigError: + sendData = []byte(err.Error()) + } + l.failf( + LinkFailureError{ + code: ErrInvalidCommitment, + FailureAction: LinkFailureForceClose, + SendData: sendData, + }, + "ChannelPoint(%v): unable to accept new "+ + "commitment: %v", + l.channel.ChannelPoint(), err, + ) + + return err + } + + // As we've just accepted a new state, we'll now immediately send the + // remote peer a revocation for our prior state. + nextRevocation, currentHtlcs, finalHTLCs, err := + l.channel.RevokeCurrentCommitment() + if err != nil { + l.log.Errorf("unable to revoke commitment: %v", err) + + // We need to fail the channel in case revoking our local + // commitment does not succeed. We might have already advanced + // our channel state which would lead us to proceed with an + // unclean state. + // + // NOTE: We do not trigger a force close because this could + // resolve itself in case our db was just busy not accepting new + // transactions. + l.failf( + LinkFailureError{ + code: ErrInternalError, + Warning: true, + FailureAction: LinkFailureDisconnect, + }, + "ChannelPoint(%v): unable to accept new "+ + "commitment: %v", + l.channel.ChannelPoint(), err, + ) + + return err + } + + // As soon as we are ready to send our next revocation, we can invoke + // the incoming commit hooks. + l.RWMutex.Lock() + l.incomingCommitHooks.invoke() + l.RWMutex.Unlock() + + l.cfg.Peer.SendMessage(false, nextRevocation) + + // Notify the incoming htlcs of which the resolutions were locked in. + for id, settled := range finalHTLCs { + l.cfg.HtlcNotifier.NotifyFinalHtlcEvent( + models.CircuitKey{ + ChanID: l.ShortChanID(), + HtlcID: id, + }, + channeldb.FinalHtlcInfo{ + Settled: settled, + Offchain: true, + }, + ) + } + + // Since we just revoked our commitment, we may have a new set of HTLC's + // on our commitment, so we'll send them using our function closure + // NotifyContractUpdate. + newUpdate := &contractcourt.ContractUpdate{ + HtlcKey: contractcourt.LocalHtlcSet, + Htlcs: currentHtlcs, + } + err = l.cfg.NotifyContractUpdate(newUpdate) + if err != nil { + return fmt.Errorf("unable to notify contract update: %w", err) + } + + select { + case <-l.cg.Done(): + return nil + default: + } + + // If the remote party initiated the state transition, we'll reply with + // a signature to provide them with their version of the latest + // commitment. Otherwise, both commitment chains are fully synced from + // our PoV, then we don't need to reply with a signature as both sides + // already have a commitment with the latest accepted. + if l.channel.OweCommitment() { + if !l.updateCommitTxOrFail(ctx) { + return nil + } + } + + // If we need to send out an Stfu, this would be the time to do so. + if l.noDanglingUpdates(lntypes.Local) { + err = l.quiescer.SendOwedStfu() + if err != nil { + l.stfuFailf("sendOwedStfu: %v", err.Error()) + } + } + + // Now that we have finished processing the incoming CommitSig and sent + // out our RevokeAndAck, we invoke the flushHooks if the channel state + // is clean. + l.RWMutex.Lock() + if l.channel.IsChannelClean() { + l.flushHooks.invoke() + } + l.RWMutex.Unlock() + + return nil +}