diff --git a/sweep/sweeper.go b/sweep/sweeper.go index cda02db9f..7347c37a8 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -607,80 +607,11 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { for { select { - // A new inputs is offered to the sweeper. We check to see if we - // are already trying to sweep this input and if not, set up a - // listener to spend and schedule a sweep. + // A new inputs is offered to the sweeper. We check to see if + // we are already trying to sweep this input and if not, set up + // a listener to spend and schedule a sweep. case input := <-s.newInputs: - outpoint := *input.input.OutPoint() - pendInput, pending := s.pendingInputs[outpoint] - if pending { - log.Debugf("Already pending input %v received", - outpoint) - - // Before updating the input details, check if - // an exclusive group was set, and if so, assume - // this input as finalized and remove all other - // inputs belonging to the same exclusive group. - var prevExclGroup *uint64 - if pendInput.params.ExclusiveGroup != nil && - input.params.ExclusiveGroup == nil { - - prevExclGroup = new(uint64) - *prevExclGroup = *pendInput.params.ExclusiveGroup - } - - // Update input details and sweep parameters. - // The re-offered input details may contain a - // change to the unconfirmed parent tx info. - pendInput.params = input.params - pendInput.Input = input.input - - // Add additional result channel to signal - // spend of this input. - pendInput.listeners = append( - pendInput.listeners, input.resultChan, - ) - - if prevExclGroup != nil { - s.removeExclusiveGroup(*prevExclGroup) - } - - continue - } - - // Create a new pendingInput and initialize the - // listeners slice with the passed in result channel. If - // this input is offered for sweep again, the result - // channel will be appended to this slice. - pendInput = &pendingInput{ - listeners: []chan Result{input.resultChan}, - Input: input.input, - minPublishHeight: bestHeight, - params: input.params, - } - s.pendingInputs[outpoint] = pendInput - log.Tracef("input %v added to pendingInputs", outpoint) - - // Start watching for spend of this input, either by us - // or the remote party. - cancel, err := s.waitForSpend( - outpoint, - input.input.SignDesc().Output.PkScript, - input.input.HeightHint(), - ) - if err != nil { - err := fmt.Errorf("wait for spend: %v", err) - s.signalAndRemove(&outpoint, Result{Err: err}) - continue - } - pendInput.ntfnRegCancel = cancel - - // Check to see if with this new input a sweep tx can be - // formed. - if err := s.scheduleSweep(bestHeight); err != nil { - log.Errorf("schedule sweep: %v", err) - } - log.Tracef("input %v scheduled", outpoint) + s.handleNewInput(input, bestHeight) // A spend of one of our inputs is detected. Signal sweep // results to the caller(s). @@ -1651,6 +1582,94 @@ func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) { return s.cfg.Store.ListSweeps() } +// handleNewInput processes a new input by registering spend notification and +// scheduling sweeping for it. +func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage, + bestHeight int32) { + + outpoint := *input.input.OutPoint() + pendInput, pending := s.pendingInputs[outpoint] + if pending { + log.Debugf("Already pending input %v received", outpoint) + + s.handleExistingInput(input, pendInput) + + return + } + + // Create a new pendingInput and initialize the listeners slice with + // the passed in result channel. If this input is offered for sweep + // again, the result channel will be appended to this slice. + pendInput = &pendingInput{ + listeners: []chan Result{input.resultChan}, + Input: input.input, + minPublishHeight: bestHeight, + params: input.params, + } + s.pendingInputs[outpoint] = pendInput + log.Tracef("input %v added to pendingInputs", outpoint) + + // Start watching for spend of this input, either by us or the remote + // party. + cancel, err := s.waitForSpend( + outpoint, input.input.SignDesc().Output.PkScript, + input.input.HeightHint(), + ) + if err != nil { + err := fmt.Errorf("wait for spend: %w", err) + s.signalAndRemove(&outpoint, Result{Err: err}) + + return + } + + pendInput.ntfnRegCancel = cancel + + // Check to see if with this new input a sweep tx can be formed. + if err := s.scheduleSweep(bestHeight); err != nil { + log.Errorf("schedule sweep: %v", err) + } + + log.Tracef("input %v scheduled", outpoint) +} + +// handleExistingInput processes an input that is already known to the sweeper. +// It will overwrite the params of the old input with the new ones. +func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage, + oldInput *pendingInput) { + + // Before updating the input details, check if an exclusive group was + // set. In case the same input is registered again without an exclusive + // group set, the previous input and its sweep parameters are outdated + // hence need to be replaced. This scenario currently only happens for + // anchor outputs. When a channel is force closed, in the worst case 3 + // different sweeps with the same exclusive group are registered with + // the sweeper to bump the closing transaction (cpfp) when its time + // critical. Receiving an input which was already registered with the + // sweeper but now without an exclusive group means non of the previous + // inputs were used as CPFP, so we need to make sure we update the + // sweep parameters but also remove all inputs with the same exclusive + // group because the are outdated too. + var prevExclGroup *uint64 + if oldInput.params.ExclusiveGroup != nil && + input.params.ExclusiveGroup == nil { + + prevExclGroup = new(uint64) + *prevExclGroup = *oldInput.params.ExclusiveGroup + } + + // Update input details and sweep parameters. The re-offered input + // details may contain a change to the unconfirmed parent tx info. + oldInput.params = input.params + oldInput.Input = input.input + + // Add additional result channel to signal spend of this input. + oldInput.listeners = append(oldInput.listeners, input.resultChan) + + if prevExclGroup != nil { + s.removeExclusiveGroup(*prevExclGroup) + } +} + // init initializes the random generator for random input rescheduling. func init() { rand.Seed(time.Now().Unix())