sweep: add new method handleNewInput

This commit is contained in:
yyforyongyu 2023-08-08 14:04:22 +08:00
parent 15f4213793
commit 39973687aa
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

@ -607,80 +607,11 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
for {
select {
// A new inputs is offered to the sweeper. We check to see if we
// are already trying to sweep this input and if not, set up a
// listener to spend and schedule a sweep.
// A new inputs is offered to the sweeper. We check to see if
// we are already trying to sweep this input and if not, set up
// a listener to spend and schedule a sweep.
case input := <-s.newInputs:
outpoint := *input.input.OutPoint()
pendInput, pending := s.pendingInputs[outpoint]
if pending {
log.Debugf("Already pending input %v received",
outpoint)
// Before updating the input details, check if
// an exclusive group was set, and if so, assume
// this input as finalized and remove all other
// inputs belonging to the same exclusive group.
var prevExclGroup *uint64
if pendInput.params.ExclusiveGroup != nil &&
input.params.ExclusiveGroup == nil {
prevExclGroup = new(uint64)
*prevExclGroup = *pendInput.params.ExclusiveGroup
}
// Update input details and sweep parameters.
// The re-offered input details may contain a
// change to the unconfirmed parent tx info.
pendInput.params = input.params
pendInput.Input = input.input
// Add additional result channel to signal
// spend of this input.
pendInput.listeners = append(
pendInput.listeners, input.resultChan,
)
if prevExclGroup != nil {
s.removeExclusiveGroup(*prevExclGroup)
}
continue
}
// Create a new pendingInput and initialize the
// listeners slice with the passed in result channel. If
// this input is offered for sweep again, the result
// channel will be appended to this slice.
pendInput = &pendingInput{
listeners: []chan Result{input.resultChan},
Input: input.input,
minPublishHeight: bestHeight,
params: input.params,
}
s.pendingInputs[outpoint] = pendInput
log.Tracef("input %v added to pendingInputs", outpoint)
// Start watching for spend of this input, either by us
// or the remote party.
cancel, err := s.waitForSpend(
outpoint,
input.input.SignDesc().Output.PkScript,
input.input.HeightHint(),
)
if err != nil {
err := fmt.Errorf("wait for spend: %v", err)
s.signalAndRemove(&outpoint, Result{Err: err})
continue
}
pendInput.ntfnRegCancel = cancel
// Check to see if with this new input a sweep tx can be
// formed.
if err := s.scheduleSweep(bestHeight); err != nil {
log.Errorf("schedule sweep: %v", err)
}
log.Tracef("input %v scheduled", outpoint)
s.handleNewInput(input, bestHeight)
// A spend of one of our inputs is detected. Signal sweep
// results to the caller(s).
@ -1651,6 +1582,94 @@ func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
return s.cfg.Store.ListSweeps()
}
// handleNewInput processes a new input by registering spend notification and
// scheduling sweeping for it.
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage,
bestHeight int32) {
outpoint := *input.input.OutPoint()
pendInput, pending := s.pendingInputs[outpoint]
if pending {
log.Debugf("Already pending input %v received", outpoint)
s.handleExistingInput(input, pendInput)
return
}
// Create a new pendingInput and initialize the listeners slice with
// the passed in result channel. If this input is offered for sweep
// again, the result channel will be appended to this slice.
pendInput = &pendingInput{
listeners: []chan Result{input.resultChan},
Input: input.input,
minPublishHeight: bestHeight,
params: input.params,
}
s.pendingInputs[outpoint] = pendInput
log.Tracef("input %v added to pendingInputs", outpoint)
// Start watching for spend of this input, either by us or the remote
// party.
cancel, err := s.waitForSpend(
outpoint, input.input.SignDesc().Output.PkScript,
input.input.HeightHint(),
)
if err != nil {
err := fmt.Errorf("wait for spend: %w", err)
s.signalAndRemove(&outpoint, Result{Err: err})
return
}
pendInput.ntfnRegCancel = cancel
// Check to see if with this new input a sweep tx can be formed.
if err := s.scheduleSweep(bestHeight); err != nil {
log.Errorf("schedule sweep: %v", err)
}
log.Tracef("input %v scheduled", outpoint)
}
// handleExistingInput processes an input that is already known to the sweeper.
// It will overwrite the params of the old input with the new ones.
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
oldInput *pendingInput) {
// Before updating the input details, check if an exclusive group was
// set. In case the same input is registered again without an exclusive
// group set, the previous input and its sweep parameters are outdated
// hence need to be replaced. This scenario currently only happens for
// anchor outputs. When a channel is force closed, in the worst case 3
// different sweeps with the same exclusive group are registered with
// the sweeper to bump the closing transaction (cpfp) when its time
// critical. Receiving an input which was already registered with the
// sweeper but now without an exclusive group means non of the previous
// inputs were used as CPFP, so we need to make sure we update the
// sweep parameters but also remove all inputs with the same exclusive
// group because the are outdated too.
var prevExclGroup *uint64
if oldInput.params.ExclusiveGroup != nil &&
input.params.ExclusiveGroup == nil {
prevExclGroup = new(uint64)
*prevExclGroup = *oldInput.params.ExclusiveGroup
}
// Update input details and sweep parameters. The re-offered input
// details may contain a change to the unconfirmed parent tx info.
oldInput.params = input.params
oldInput.Input = input.input
// Add additional result channel to signal spend of this input.
oldInput.listeners = append(oldInput.listeners, input.resultChan)
if prevExclGroup != nil {
s.removeExclusiveGroup(*prevExclGroup)
}
}
// init initializes the random generator for random input rescheduling.
func init() {
rand.Seed(time.Now().Unix())