diff --git a/docs/release-notes/release-notes-0.19.2.md b/docs/release-notes/release-notes-0.19.2.md index 4cf16724c..7dd5820dc 100644 --- a/docs/release-notes/release-notes-0.19.2.md +++ b/docs/release-notes/release-notes-0.19.2.md @@ -51,6 +51,11 @@ ## Performance Improvements +- The replay protection is +[optimized](https://github.com/lightningnetwork/lnd/pull/9929) to use less disk +space such that the `sphinxreplay.db` or the `decayedlogdb_kv` table will grow +much more slowly. + ## Deprecations # Technical and Architectural Updates diff --git a/htlcswitch/decayedlog.go b/htlcswitch/decayedlog.go index ca7e19e7d..e92a45160 100644 --- a/htlcswitch/decayedlog.go +++ b/htlcswitch/decayedlog.go @@ -1,7 +1,6 @@ package htlcswitch import ( - "bytes" "encoding/binary" "errors" "fmt" @@ -24,11 +23,6 @@ var ( // bytes of a received HTLC's hashed shared secret as the key and the HTLC's // CLTV expiry as the value. sharedHashBucket = []byte("shared-hash") - - // batchReplayBucket is a bucket that maps batch identifiers to - // serialized ReplaySets. This is used to give idempotency in the event - // that a batch is processed more than once. - batchReplayBucket = []byte("batch-replay") ) var ( @@ -138,11 +132,6 @@ func (d *DecayedLog) initBuckets() error { return ErrDecayedLogInit } - _, err = tx.CreateTopLevelBucket(batchReplayBucket) - if err != nil { - return ErrDecayedLogInit - } - return nil }, func() {}) } @@ -329,11 +318,8 @@ func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error { // PutBatch accepts a pending batch of hashed secret entries to write to disk. // Each hashed secret is inserted with a corresponding time value, dictating // when the entry will be evicted from the log. -// NOTE: This method enforces idempotency by writing the replay set obtained -// from the first attempt for a particular batch ID, and decoding the return -// value to subsequent calls. For the indices of the replay set to be aligned -// properly, the batch MUST be constructed identically to the first attempt, -// pruning will cause the indices to become invalid. +// +// TODO(yy): remove this method and use `Put` instead. func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) { // Since batched boltdb txns may be executed multiple times before // succeeding, we will create a new replay set for each invocation to @@ -348,25 +334,6 @@ func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) { return ErrDecayedLogCorrupted } - // Load the batch replay bucket, which will be used to either - // retrieve the result of previously processing this batch, or - // to write the result of this operation. - batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket) - if batchReplayBkt == nil { - return ErrDecayedLogCorrupted - } - - // Check for the existence of this batch's id in the replay - // bucket. If a non-nil value is found, this indicates that we - // have already processed this batch before. We deserialize the - // resulting and return it to ensure calls to put batch are - // idempotent. - replayBytes := batchReplayBkt.Get(b.ID) - if replayBytes != nil { - replays = sphinx.NewReplaySet() - return replays.Decode(bytes.NewReader(replayBytes)) - } - // The CLTV will be stored into scratch and then stored into the // sharedHashBucket. var scratch [4]byte @@ -394,17 +361,7 @@ func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) { // batch's construction. replays.Merge(b.ReplaySet) - // Write the replay set under the batch identifier to the batch - // replays bucket. This can be used during recovery to test (1) - // that a particular batch was successfully processed and (2) - // recover the indexes of the adds that were rejected as - // replays. - var replayBuf bytes.Buffer - if err := replays.Encode(&replayBuf); err != nil { - return err - } - - return batchReplayBkt.Put(b.ID, replayBuf.Bytes()) + return nil }); err != nil { return nil, err } diff --git a/htlcswitch/hop/iterator.go b/htlcswitch/hop/iterator.go index 2cf2adb54..553c4921d 100644 --- a/htlcswitch/hop/iterator.go +++ b/htlcswitch/hop/iterator.go @@ -745,7 +745,8 @@ func (r *DecodeHopIteratorResponse) Result() (Iterator, lnwire.FailCode) { // the presented readers and rhashes *NEVER* deviate across invocations for the // same id. func (p *OnionProcessor) DecodeHopIterators(id []byte, - reqs []DecodeHopIteratorRequest) ([]DecodeHopIteratorResponse, error) { + reqs []DecodeHopIteratorRequest, + reforward bool) ([]DecodeHopIteratorResponse, error) { var ( batchSize = len(reqs) @@ -783,6 +784,8 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte, b.Val, )) }) + + // TODO(yy): use `p.router.ProcessOnionPacket` instead. err = tx.ProcessOnionPacket( seqNum, onionPkt, req.RHash, req.IncomingCltv, opts..., ) @@ -864,11 +867,12 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte, continue } - // If this index is contained in the replay set, mark it with a - // temporary channel failure error code. We infer that the - // offending error was due to a replayed packet because this - // index was found in the replay set. - if replays.Contains(uint16(i)) { + // If this index is contained in the replay set, and it is not a + // reforwarding on startup, mark it with a permanent channel + // failure error code. We infer that the offending error was due + // to a replayed packet because this index was found in the + // replay set. + if !reforward && replays.Contains(uint16(i)) { log.Errorf("unable to process onion packet: %v", sphinx.ErrReplayedPacket) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 72b506986..d2bc4f4bf 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -108,8 +108,10 @@ type ChannelLinkConfig struct { // blobs, which are then used to inform how to forward an HTLC. // // NOTE: This function assumes the same set of readers and preimages - // are always presented for the same identifier. - DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) ( + // are always presented for the same identifier. The last boolean is + // used to decide whether this is a reforwarding or not - when it's + // reforwarding, we skip the replay check enforced in our decay log. + DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) ( []hop.DecodeHopIteratorResponse, error) // ExtractErrorEncrypter function is responsible for decoding HTLC @@ -3630,6 +3632,13 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) { return } + // Exit early if the fwdPkg is already processed. + if fwdPkg.State == channeldb.FwdStateCompleted { + l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg) + + return + } + l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter) var switchPackets []*htlcPacket @@ -3738,13 +3747,36 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { return } + // Exit early if the fwdPkg is already processed. + if fwdPkg.State == channeldb.FwdStateCompleted { + l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg) + + return + } + l.log.Tracef("processing %d remote adds for height %d", len(fwdPkg.Adds), fwdPkg.Height) - decodeReqs := make( - []hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds), - ) - for _, update := range fwdPkg.Adds { + // decodeReqs is a list of requests sent to the onion decoder. We expect + // the same length of responses to be returned. + decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds)) + + // unackedAdds is a list of ADDs that's waiting for the remote's + // settle/fail update. + unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds)) + + for i, update := range fwdPkg.Adds { + // If this index is already found in the ack filter, the + // response to this forwarding decision has already been + // committed by one of our commitment txns. ADDs in this state + // are waiting for the rest of the fwding package to get acked + // before being garbage collected. + if fwdPkg.State == channeldb.FwdStateProcessed && + fwdPkg.AckFilter.Contains(uint16(i)) { + + continue + } + if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok { // Before adding the new htlc to the state machine, // parse the onion object in order to obtain the @@ -3761,15 +3793,20 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { } decodeReqs = append(decodeReqs, req) + unackedAdds = append(unackedAdds, msg) } } + // If the fwdPkg has already been processed, it means we are + // reforwarding the packets again, which happens only on a restart. + reforward := fwdPkg.State == channeldb.FwdStateProcessed + // Atomically decode the incoming htlcs, simultaneously checking for // replay attempts. A particular index in the returned, spare list of // channel iterators should only be used if the failure code at the // same index is lnwire.FailCodeNone. decodeResps, sphinxErr := l.cfg.DecodeHopIterators( - fwdPkg.ID(), decodeReqs, + fwdPkg.ID(), decodeReqs, reforward, ) if sphinxErr != nil { l.failf(LinkFailureError{code: ErrInternalError}, @@ -3779,23 +3816,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { var switchPackets []*htlcPacket - for i, update := range fwdPkg.Adds { + for i, update := range unackedAdds { idx := uint16(i) - - //nolint:forcetypeassert - add := *update.UpdateMsg.(*lnwire.UpdateAddHTLC) sourceRef := fwdPkg.SourceRef(idx) - - if fwdPkg.State == channeldb.FwdStateProcessed && - fwdPkg.AckFilter.Contains(idx) { - - // If this index is already found in the ack filter, - // the response to this forwarding decision has already - // been committed by one of our commitment txns. ADDs - // in this state are waiting for the rest of the fwding - // package to get acked before being garbage collected. - continue - } + add := *update // An incoming HTLC add has been full-locked in. As a result we // can now examine the forwarding details of the HTLC, and the @@ -3815,8 +3839,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { add.ID, failureCode, add.OnionBlob, &sourceRef, ) - l.log.Errorf("unable to decode onion hop "+ - "iterator: %v", failureCode) + l.log.Errorf("unable to decode onion hop iterator "+ + "for htlc(id=%v, hash=%x): %v", add.ID, + add.PaymentHash, failureCode) + continue } @@ -4120,17 +4146,15 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) { return } - replay := fwdPkg.State != channeldb.FwdStateLockedIn - - l.log.Debugf("forwarding %d packets to switch: replay=%v", - len(switchPackets), replay) + l.log.Debugf("forwarding %d packets to switch: reforward=%v", + len(switchPackets), reforward) // NOTE: This call is made synchronous so that we ensure all circuits // are committed in the exact order that they are processed in the link. // Failing to do this could cause reorderings/gaps in the range of // opened circuits, which violates assumptions made by the circuit // trimming. - l.forwardBatch(replay, switchPackets...) + l.forwardBatch(reforward, switchPackets...) } // experimentalEndorsement returns the value to set for our outgoing diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index cb814e95c..136d5c39b 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -522,7 +522,7 @@ func (p *mockIteratorDecoder) DecodeHopIterator(r io.Reader, rHash []byte, } func (p *mockIteratorDecoder) DecodeHopIterators(id []byte, - reqs []hop.DecodeHopIteratorRequest) ( + reqs []hop.DecodeHopIteratorRequest, _ bool) ( []hop.DecodeHopIteratorResponse, error) { idHash := sha256.Sum256(id)