From fb95458a1bd53b931ff9b9f2231bffe6976b8351 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 11 Jun 2025 12:16:09 +0800 Subject: [PATCH 1/5] htlcswitch: skip checking replays for reforwarded packets We now rely on the forwarding package's state to decide whether a given packet is a reforwarding or not. If we know it's a reforwarding packet, there's no need to check for replays in the `sharedHashes` bucket, which behaves the same as if we are querying the `batchReplayBkt`. --- htlcswitch/hop/iterator.go | 14 ++++++++------ htlcswitch/link.go | 18 ++++++++++-------- htlcswitch/mock.go | 2 +- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/htlcswitch/hop/iterator.go b/htlcswitch/hop/iterator.go index 2cf2adb54..314f05209 100644 --- a/htlcswitch/hop/iterator.go +++ b/htlcswitch/hop/iterator.go @@ -745,7 +745,8 @@ func (r *DecodeHopIteratorResponse) Result() (Iterator, lnwire.FailCode) { // the presented readers and rhashes *NEVER* deviate across invocations for the // same id. func (p *OnionProcessor) DecodeHopIterators(id []byte, - reqs []DecodeHopIteratorRequest) ([]DecodeHopIteratorResponse, error) { + reqs []DecodeHopIteratorRequest, + reforward bool) ([]DecodeHopIteratorResponse, error) { var ( batchSize = len(reqs) @@ -864,11 +865,12 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte, continue } - // If this index is contained in the replay set, mark it with a - // temporary channel failure error code. We infer that the - // offending error was due to a replayed packet because this - // index was found in the replay set. - if replays.Contains(uint16(i)) { + // If this index is contained in the replay set, and it is not a + // reforwarding on startup, mark it with a permanent channel + // failure error code. We infer that the offending error was due + // to a replayed packet because this index was found in the + // replay set. + if !reforward && replays.Contains(uint16(i)) { log.Errorf("unable to process onion packet: %v", sphinx.ErrReplayedPacket) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 72b506986..ec7cb1539 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -108,8 +108,10 @@ type ChannelLinkConfig struct { // blobs, which are then used to inform how to forward an HTLC. // // NOTE: This function assumes the same set of readers and preimages - // are always presented for the same identifier. - DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) ( + // are always presented for the same identifier. The last boolean is + // used to decide whether this is a reforwarding or not - when it's + // reforwarding, we skip the replay check enforced in our decay log. + DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) ( []hop.DecodeHopIteratorResponse, error) // ExtractErrorEncrypter function is responsible for decoding HTLC @@ -3764,12 +3766,14 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { } } + reforward := fwdPkg.State != channeldb.FwdStateLockedIn + // Atomically decode the incoming htlcs, simultaneously checking for // replay attempts. A particular index in the returned, spare list of // channel iterators should only be used if the failure code at the // same index is lnwire.FailCodeNone. decodeResps, sphinxErr := l.cfg.DecodeHopIterators( - fwdPkg.ID(), decodeReqs, + fwdPkg.ID(), decodeReqs, reforward, ) if sphinxErr != nil { l.failf(LinkFailureError{code: ErrInternalError}, @@ -4120,17 +4124,15 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { return } - replay := fwdPkg.State != channeldb.FwdStateLockedIn - - l.log.Debugf("forwarding %d packets to switch: replay=%v", - len(switchPackets), replay) + l.log.Debugf("forwarding %d packets to switch: reforward=%v", + len(switchPackets), reforward) // NOTE: This call is made synchronous so that we ensure all circuits // are committed in the exact order that they are processed in the link. // Failing to do this could cause reorderings/gaps in the range of // opened circuits, which violates assumptions made by the circuit // trimming. - l.forwardBatch(replay, switchPackets...) + l.forwardBatch(reforward, switchPackets...) } // experimentalEndorsement returns the value to set for our outgoing diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index cb814e95c..136d5c39b 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -522,7 +522,7 @@ func (p *mockIteratorDecoder) DecodeHopIterator(r io.Reader, rHash []byte, } func (p *mockIteratorDecoder) DecodeHopIterators(id []byte, - reqs []hop.DecodeHopIteratorRequest) ( + reqs []hop.DecodeHopIteratorRequest, _ bool) ( []hop.DecodeHopIteratorResponse, error) { idHash := sha256.Sum256(id) From 3b9c4eb23224df91c7aa0848929512b3452ee0fe Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 11 Jun 2025 12:26:50 +0800 Subject: [PATCH 2/5] htlcswitch: exit early if the `fwdPkg` is already completed --- htlcswitch/link.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index ec7cb1539..33bf1805d 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -3632,6 +3632,13 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) { return } + // Exit early if the fwdPkg is already processed. + if fwdPkg.State == channeldb.FwdStateCompleted { + l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg) + + return + } + l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter) var switchPackets []*htlcPacket @@ -3740,6 +3747,13 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { return } + // Exit early if the fwdPkg is already processed. + if fwdPkg.State == channeldb.FwdStateCompleted { + l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg) + + return + } + l.log.Tracef("processing %d remote adds for height %d", len(fwdPkg.Adds), fwdPkg.Height) @@ -3766,7 +3780,9 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { } } - reforward := fwdPkg.State != channeldb.FwdStateLockedIn + // If the fwdPkg has already been processed, it means we are + // reforwarding the packets again, which happens only on a restart. + reforward := fwdPkg.State == channeldb.FwdStateProcessed // Atomically decode the incoming htlcs, simultaneously checking for // replay attempts. A particular index in the returned, spare list of From e3f95dcd091b2a36c8c20b11075f0ff4fb4e5978 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 11 Jun 2025 12:45:53 +0800 Subject: [PATCH 3/5] htlcswitch: remove `batchReplayBkt` This commit removes the `batchReplayBkt` as its only effect is to allow reforwarding htlcs during startup. Normally for every incoming htlc added, their shared secret is used as the key to be saved into the `sharedHashBucket`, which will be used for check for replays. In addition, the fwdPkg's ID, which is SCID+height is also saved to the bucket `batchReplayBkt`. Since replays of HTLCs cannot happen at the same commitment height, when a replay happens, `batchReplayBkt` simply doesn't have this info, and we again rely on `sharedHashBucket` to detect it. This means most of the time the `batchReplayBkt` is a list of SCID+height with empty values. The `batchReplayBkt` was previously used as a mechanism to check for reforwardings during startup - when reforwarding htlcs, it quries this bucket and finds an empty map, knowing this is a forwarding and skips the check in `sharedHashBucket`. Given now we use a bool flag to explicitly skip the replay check, this bucket is no longer useful. --- htlcswitch/decayedlog.go | 49 +++----------------------------------- htlcswitch/hop/iterator.go | 2 ++ 2 files changed, 5 insertions(+), 46 deletions(-) diff --git a/htlcswitch/decayedlog.go b/htlcswitch/decayedlog.go index ca7e19e7d..e92a45160 100644 --- a/htlcswitch/decayedlog.go +++ b/htlcswitch/decayedlog.go @@ -1,7 +1,6 @@ package htlcswitch import ( - "bytes" "encoding/binary" "errors" "fmt" @@ -24,11 +23,6 @@ var ( // bytes of a received HTLC's hashed shared secret as the key and the HTLC's // CLTV expiry as the value. sharedHashBucket = []byte("shared-hash") - - // batchReplayBucket is a bucket that maps batch identifiers to - // serialized ReplaySets. This is used to give idempotency in the event - // that a batch is processed more than once. - batchReplayBucket = []byte("batch-replay") ) var ( @@ -138,11 +132,6 @@ func (d *DecayedLog) initBuckets() error { return ErrDecayedLogInit } - _, err = tx.CreateTopLevelBucket(batchReplayBucket) - if err != nil { - return ErrDecayedLogInit - } - return nil }, func() {}) } @@ -329,11 +318,8 @@ func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error { // PutBatch accepts a pending batch of hashed secret entries to write to disk. // Each hashed secret is inserted with a corresponding time value, dictating // when the entry will be evicted from the log. -// NOTE: This method enforces idempotency by writing the replay set obtained -// from the first attempt for a particular batch ID, and decoding the return -// value to subsequent calls. For the indices of the replay set to be aligned -// properly, the batch MUST be constructed identically to the first attempt, -// pruning will cause the indices to become invalid. +// +// TODO(yy): remove this method and use `Put` instead. func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) { // Since batched boltdb txns may be executed multiple times before // succeeding, we will create a new replay set for each invocation to @@ -348,25 +334,6 @@ func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) { return ErrDecayedLogCorrupted } - // Load the batch replay bucket, which will be used to either - // retrieve the result of previously processing this batch, or - // to write the result of this operation. - batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket) - if batchReplayBkt == nil { - return ErrDecayedLogCorrupted - } - - // Check for the existence of this batch's id in the replay - // bucket. If a non-nil value is found, this indicates that we - // have already processed this batch before. We deserialize the - // resulting and return it to ensure calls to put batch are - // idempotent. - replayBytes := batchReplayBkt.Get(b.ID) - if replayBytes != nil { - replays = sphinx.NewReplaySet() - return replays.Decode(bytes.NewReader(replayBytes)) - } - // The CLTV will be stored into scratch and then stored into the // sharedHashBucket. var scratch [4]byte @@ -394,17 +361,7 @@ func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) { // batch's construction. replays.Merge(b.ReplaySet) - // Write the replay set under the batch identifier to the batch - // replays bucket. This can be used during recovery to test (1) - // that a particular batch was successfully processed and (2) - // recover the indexes of the adds that were rejected as - // replays. - var replayBuf bytes.Buffer - if err := replays.Encode(&replayBuf); err != nil { - return err - } - - return batchReplayBkt.Put(b.ID, replayBuf.Bytes()) + return nil }); err != nil { return nil, err } diff --git a/htlcswitch/hop/iterator.go b/htlcswitch/hop/iterator.go index 314f05209..553c4921d 100644 --- a/htlcswitch/hop/iterator.go +++ b/htlcswitch/hop/iterator.go @@ -784,6 +784,8 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte, b.Val, )) }) + + // TODO(yy): use `p.router.ProcessOnionPacket` instead. err = tx.ProcessOnionPacket( seqNum, onionPkt, req.RHash, req.IncomingCltv, opts..., ) From 755bb09a73f9b996951a455f49f598b320e292c2 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 11 Jun 2025 16:36:52 +0800 Subject: [PATCH 4/5] htlcswitch: skip decoding hop if the htlc is already acked We now move the check earlier in the loop so we don't even need to decode the hop for already processed ADDs. --- htlcswitch/link.go | 48 ++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 33bf1805d..d2bc4f4bf 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -3757,10 +3757,26 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { l.log.Tracef("processing %d remote adds for height %d", len(fwdPkg.Adds), fwdPkg.Height) - decodeReqs := make( - []hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds), - ) - for _, update := range fwdPkg.Adds { + // decodeReqs is a list of requests sent to the onion decoder. We expect + // the same length of responses to be returned. + decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds)) + + // unackedAdds is a list of ADDs that's waiting for the remote's + // settle/fail update. + unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds)) + + for i, update := range fwdPkg.Adds { + // If this index is already found in the ack filter, the + // response to this forwarding decision has already been + // committed by one of our commitment txns. ADDs in this state + // are waiting for the rest of the fwding package to get acked + // before being garbage collected. + if fwdPkg.State == channeldb.FwdStateProcessed && + fwdPkg.AckFilter.Contains(uint16(i)) { + + continue + } + if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok { // Before adding the new htlc to the state machine, // parse the onion object in order to obtain the @@ -3777,6 +3793,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { } decodeReqs = append(decodeReqs, req) + unackedAdds = append(unackedAdds, msg) } } @@ -3799,23 +3816,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { var switchPackets []*htlcPacket - for i, update := range fwdPkg.Adds { + for i, update := range unackedAdds { idx := uint16(i) - - //nolint:forcetypeassert - add := *update.UpdateMsg.(*lnwire.UpdateAddHTLC) sourceRef := fwdPkg.SourceRef(idx) - - if fwdPkg.State == channeldb.FwdStateProcessed && - fwdPkg.AckFilter.Contains(idx) { - - // If this index is already found in the ack filter, - // the response to this forwarding decision has already - // been committed by one of our commitment txns. ADDs - // in this state are waiting for the rest of the fwding - // package to get acked before being garbage collected. - continue - } + add := *update // An incoming HTLC add has been full-locked in. As a result we // can now examine the forwarding details of the HTLC, and the @@ -3835,8 +3839,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { add.ID, failureCode, add.OnionBlob, &sourceRef, ) - l.log.Errorf("unable to decode onion hop "+ - "iterator: %v", failureCode) + l.log.Errorf("unable to decode onion hop iterator "+ + "for htlc(id=%v, hash=%x): %v", add.ID, + add.PaymentHash, failureCode) + continue } From 2defb116cc970a9ad2876aaec9bfa863b1578f67 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 12 Jun 2025 06:30:27 +0800 Subject: [PATCH 5/5] docs: update release notes --- docs/release-notes/release-notes-0.19.2.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.2.md b/docs/release-notes/release-notes-0.19.2.md index 40f05624a..5952d8c07 100644 --- a/docs/release-notes/release-notes-0.19.2.md +++ b/docs/release-notes/release-notes-0.19.2.md @@ -48,6 +48,11 @@ ## Performance Improvements +- The replay protection is +[optimized](https://github.com/lightningnetwork/lnd/pull/9929) to use less disk +space such that the `sphinxreplay.db` or the `decayedlogdb_kv` table will grow +much more slowly. + ## Deprecations # Technical and Architectural Updates