Merge pull request #9929 from yyforyongyu/fix-decalylog

htlcswitch: remove the bucket `batchReplayBucket`
This commit is contained in:
Olaoluwa Osuntokun
2025-06-20 14:58:45 -07:00
committed by GitHub
5 changed files with 72 additions and 82 deletions

View File

@@ -51,6 +51,11 @@
## Performance Improvements
- The replay protection is
[optimized](https://github.com/lightningnetwork/lnd/pull/9929) to use less disk
space such that the `sphinxreplay.db` or the `decayedlogdb_kv` table will grow
much more slowly.
## Deprecations
# Technical and Architectural Updates

View File

@@ -1,7 +1,6 @@
package htlcswitch
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
@@ -24,11 +23,6 @@ var (
// bytes of a received HTLC's hashed shared secret as the key and the HTLC's
// CLTV expiry as the value.
sharedHashBucket = []byte("shared-hash")
// batchReplayBucket is a bucket that maps batch identifiers to
// serialized ReplaySets. This is used to give idempotency in the event
// that a batch is processed more than once.
batchReplayBucket = []byte("batch-replay")
)
var (
@@ -138,11 +132,6 @@ func (d *DecayedLog) initBuckets() error {
return ErrDecayedLogInit
}
_, err = tx.CreateTopLevelBucket(batchReplayBucket)
if err != nil {
return ErrDecayedLogInit
}
return nil
}, func() {})
}
@@ -329,11 +318,8 @@ func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
// PutBatch accepts a pending batch of hashed secret entries to write to disk.
// Each hashed secret is inserted with a corresponding time value, dictating
// when the entry will be evicted from the log.
// NOTE: This method enforces idempotency by writing the replay set obtained
// from the first attempt for a particular batch ID, and decoding the return
// value to subsequent calls. For the indices of the replay set to be aligned
// properly, the batch MUST be constructed identically to the first attempt,
// pruning will cause the indices to become invalid.
//
// TODO(yy): remove this method and use `Put` instead.
func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
// Since batched boltdb txns may be executed multiple times before
// succeeding, we will create a new replay set for each invocation to
@@ -348,25 +334,6 @@ func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
return ErrDecayedLogCorrupted
}
// Load the batch replay bucket, which will be used to either
// retrieve the result of previously processing this batch, or
// to write the result of this operation.
batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket)
if batchReplayBkt == nil {
return ErrDecayedLogCorrupted
}
// Check for the existence of this batch's id in the replay
// bucket. If a non-nil value is found, this indicates that we
// have already processed this batch before. We deserialize the
// resulting and return it to ensure calls to put batch are
// idempotent.
replayBytes := batchReplayBkt.Get(b.ID)
if replayBytes != nil {
replays = sphinx.NewReplaySet()
return replays.Decode(bytes.NewReader(replayBytes))
}
// The CLTV will be stored into scratch and then stored into the
// sharedHashBucket.
var scratch [4]byte
@@ -394,17 +361,7 @@ func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
// batch's construction.
replays.Merge(b.ReplaySet)
// Write the replay set under the batch identifier to the batch
// replays bucket. This can be used during recovery to test (1)
// that a particular batch was successfully processed and (2)
// recover the indexes of the adds that were rejected as
// replays.
var replayBuf bytes.Buffer
if err := replays.Encode(&replayBuf); err != nil {
return err
}
return batchReplayBkt.Put(b.ID, replayBuf.Bytes())
return nil
}); err != nil {
return nil, err
}

View File

@@ -745,7 +745,8 @@ func (r *DecodeHopIteratorResponse) Result() (Iterator, lnwire.FailCode) {
// the presented readers and rhashes *NEVER* deviate across invocations for the
// same id.
func (p *OnionProcessor) DecodeHopIterators(id []byte,
reqs []DecodeHopIteratorRequest) ([]DecodeHopIteratorResponse, error) {
reqs []DecodeHopIteratorRequest,
reforward bool) ([]DecodeHopIteratorResponse, error) {
var (
batchSize = len(reqs)
@@ -783,6 +784,8 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte,
b.Val,
))
})
// TODO(yy): use `p.router.ProcessOnionPacket` instead.
err = tx.ProcessOnionPacket(
seqNum, onionPkt, req.RHash, req.IncomingCltv, opts...,
)
@@ -864,11 +867,12 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte,
continue
}
// If this index is contained in the replay set, mark it with a
// temporary channel failure error code. We infer that the
// offending error was due to a replayed packet because this
// index was found in the replay set.
if replays.Contains(uint16(i)) {
// If this index is contained in the replay set, and it is not a
// reforwarding on startup, mark it with a permanent channel
// failure error code. We infer that the offending error was due
// to a replayed packet because this index was found in the
// replay set.
if !reforward && replays.Contains(uint16(i)) {
log.Errorf("unable to process onion packet: %v",
sphinx.ErrReplayedPacket)

View File

@@ -108,8 +108,10 @@ type ChannelLinkConfig struct {
// blobs, which are then used to inform how to forward an HTLC.
//
// NOTE: This function assumes the same set of readers and preimages
// are always presented for the same identifier.
DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) (
// are always presented for the same identifier. The last boolean is
// used to decide whether this is a reforwarding or not - when it's
// reforwarding, we skip the replay check enforced in our decay log.
DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
[]hop.DecodeHopIteratorResponse, error)
// ExtractErrorEncrypter function is responsible for decoding HTLC
@@ -3630,6 +3632,13 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
return
}
// Exit early if the fwdPkg is already processed.
if fwdPkg.State == channeldb.FwdStateCompleted {
l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
return
}
l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
var switchPackets []*htlcPacket
@@ -3738,13 +3747,36 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
return
}
// Exit early if the fwdPkg is already processed.
if fwdPkg.State == channeldb.FwdStateCompleted {
l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
return
}
l.log.Tracef("processing %d remote adds for height %d",
len(fwdPkg.Adds), fwdPkg.Height)
decodeReqs := make(
[]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds),
)
for _, update := range fwdPkg.Adds {
// decodeReqs is a list of requests sent to the onion decoder. We expect
// the same length of responses to be returned.
decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
// unackedAdds is a list of ADDs that's waiting for the remote's
// settle/fail update.
unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
for i, update := range fwdPkg.Adds {
// If this index is already found in the ack filter, the
// response to this forwarding decision has already been
// committed by one of our commitment txns. ADDs in this state
// are waiting for the rest of the fwding package to get acked
// before being garbage collected.
if fwdPkg.State == channeldb.FwdStateProcessed &&
fwdPkg.AckFilter.Contains(uint16(i)) {
continue
}
if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
// Before adding the new htlc to the state machine,
// parse the onion object in order to obtain the
@@ -3761,15 +3793,20 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
}
decodeReqs = append(decodeReqs, req)
unackedAdds = append(unackedAdds, msg)
}
}
// If the fwdPkg has already been processed, it means we are
// reforwarding the packets again, which happens only on a restart.
reforward := fwdPkg.State == channeldb.FwdStateProcessed
// Atomically decode the incoming htlcs, simultaneously checking for
// replay attempts. A particular index in the returned, spare list of
// channel iterators should only be used if the failure code at the
// same index is lnwire.FailCodeNone.
decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
fwdPkg.ID(), decodeReqs,
fwdPkg.ID(), decodeReqs, reforward,
)
if sphinxErr != nil {
l.failf(LinkFailureError{code: ErrInternalError},
@@ -3779,23 +3816,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
var switchPackets []*htlcPacket
for i, update := range fwdPkg.Adds {
for i, update := range unackedAdds {
idx := uint16(i)
//nolint:forcetypeassert
add := *update.UpdateMsg.(*lnwire.UpdateAddHTLC)
sourceRef := fwdPkg.SourceRef(idx)
if fwdPkg.State == channeldb.FwdStateProcessed &&
fwdPkg.AckFilter.Contains(idx) {
// If this index is already found in the ack filter,
// the response to this forwarding decision has already
// been committed by one of our commitment txns. ADDs
// in this state are waiting for the rest of the fwding
// package to get acked before being garbage collected.
continue
}
add := *update
// An incoming HTLC add has been full-locked in. As a result we
// can now examine the forwarding details of the HTLC, and the
@@ -3815,8 +3839,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
add.ID, failureCode, add.OnionBlob, &sourceRef,
)
l.log.Errorf("unable to decode onion hop "+
"iterator: %v", failureCode)
l.log.Errorf("unable to decode onion hop iterator "+
"for htlc(id=%v, hash=%x): %v", add.ID,
add.PaymentHash, failureCode)
continue
}
@@ -4120,17 +4146,15 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
return
}
replay := fwdPkg.State != channeldb.FwdStateLockedIn
l.log.Debugf("forwarding %d packets to switch: replay=%v",
len(switchPackets), replay)
l.log.Debugf("forwarding %d packets to switch: reforward=%v",
len(switchPackets), reforward)
// NOTE: This call is made synchronous so that we ensure all circuits
// are committed in the exact order that they are processed in the link.
// Failing to do this could cause reorderings/gaps in the range of
// opened circuits, which violates assumptions made by the circuit
// trimming.
l.forwardBatch(replay, switchPackets...)
l.forwardBatch(reforward, switchPackets...)
}
// experimentalEndorsement returns the value to set for our outgoing

View File

@@ -522,7 +522,7 @@ func (p *mockIteratorDecoder) DecodeHopIterator(r io.Reader, rHash []byte,
}
func (p *mockIteratorDecoder) DecodeHopIterators(id []byte,
reqs []hop.DecodeHopIteratorRequest) (
reqs []hop.DecodeHopIteratorRequest, _ bool) (
[]hop.DecodeHopIteratorResponse, error) {
idHash := sha256.Sum256(id)