diff --git a/sweep/defaults.go b/sweep/defaults.go new file mode 100644 index 000000000..d9b5c9231 --- /dev/null +++ b/sweep/defaults.go @@ -0,0 +1,14 @@ +// +build !rpctest + +package sweep + +import ( + "time" +) + +var ( + // DefaultBatchWindowDuration specifies duration of the sweep batch + // window. The sweep is held back during the batch window to allow more + // inputs to be added and thereby lower the fee per input. + DefaultBatchWindowDuration = 30 * time.Second +) diff --git a/sweep/defaults_rpctest.go b/sweep/defaults_rpctest.go new file mode 100644 index 000000000..6d027ab6f --- /dev/null +++ b/sweep/defaults_rpctest.go @@ -0,0 +1,17 @@ +// +build rpctest + +package sweep + +import ( + "time" +) + +var ( + // DefaultBatchWindowDuration specifies duration of the sweep batch + // window. The sweep is held back during the batch window to allow more + // inputs to be added and thereby lower the fee per input. + // + // To speed up integration tests waiting for a sweep to happen, the + // batch window is shortened. + DefaultBatchWindowDuration = 2 * time.Second +) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 52e6b529d..3829d27e6 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1,21 +1,88 @@ package sweep import ( + "errors" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lnwallet" ) -// UtxoSweeper provides the functionality to generate sweep txes. The plan is -// to extend UtxoSweeper in the future to also manage the actual sweeping -// process by itself. +var ( + // ErrRemoteSpend is returned in case an output that we try to sweep is + // confirmed in a tx of the remote party. + ErrRemoteSpend = errors.New("remote party swept utxo") + + // ErrTooManyAttempts is returned in case sweeping an output has failed + // for the configured max number of attempts. + ErrTooManyAttempts = errors.New("sweep failed after max attempts") + + // DefaultMaxSweepAttempts specifies the default maximum number of times + // an input is included in a publish attempt before giving up and + // returning an error to the caller. + DefaultMaxSweepAttempts = 10 +) + +// pendingInput is created when an input reaches the main loop for the first +// time. It tracks all relevant state that is needed for sweeping. +type pendingInput struct { + // listeners is a list of channels over which the final outcome of the + // sweep needs to be broadcasted. + listeners []chan Result + + // input is the original struct that contains the input and sign + // descriptor. + input Input + + // ntfnRegCancel is populated with a function that cancels the chain + // notifier spend registration. + ntfnRegCancel func() + + // minPublishHeight indicates the minimum block height at which this + // input may be (re)published. + minPublishHeight int32 + + // publishAttempts records the number of attempts that have already been + // made to sweep this tx. + publishAttempts int +} + +// UtxoSweeper is responsible for sweeping outputs back into the wallet type UtxoSweeper struct { + started uint32 // To be used atomically. + stopped uint32 // To be used atomically. + cfg *UtxoSweeperConfig + + newInputs chan *sweepInputMessage + spendChan chan *chainntnfs.SpendDetail + + pendingInputs map[wire.OutPoint]*pendingInput + + // timer is the channel that signals expiry of the sweep batch timer. + timer <-chan time.Time + + testSpendChan chan wire.OutPoint + + currentOutputScript []byte + + relayFeePerKW lnwallet.SatPerKWeight + + quit chan struct{} + wg sync.WaitGroup } // UtxoSweeperConfig contains dependencies of UtxoSweeper. type UtxoSweeperConfig struct { - // GenSweepScript generates a P2WKH script belonging to the wallet - // where funds can be swept. + // GenSweepScript generates a P2WKH script belonging to the wallet where + // funds can be swept. GenSweepScript func() ([]byte, error) // Estimator is used when crafting sweep transactions to estimate the @@ -23,18 +90,651 @@ type UtxoSweeperConfig struct { // transaction. Estimator lnwallet.FeeEstimator + // PublishTransaction facilitates the process of broadcasting a signed + // transaction to the appropriate network. + PublishTransaction func(*wire.MsgTx) error + + // NewBatchTimer creates a channel that will be sent on when a certain + // time window has passed. During this time window, new inputs can still + // be added to the sweep tx that is about to be generated. + NewBatchTimer func() <-chan time.Time + + // Notifier is an instance of a chain notifier we'll use to watch for + // certain on-chain events. + Notifier chainntnfs.ChainNotifier + + // ChainIO is used to determine the current block height. + ChainIO lnwallet.BlockChainIO + + // Store stores the published sweeper txes. + Store SweeperStore + // Signer is used by the sweeper to generate valid witnesses at the // time the incubated outputs need to be spent. Signer lnwallet.Signer + + // SweepTxConfTarget assigns a confirmation target for sweep txes on + // which the fee calculation will be based. + SweepTxConfTarget uint32 + + // MaxInputsPerTx specifies the default maximum number of inputs allowed + // in a single sweep tx. If more need to be swept, multiple txes are + // created and published. + MaxInputsPerTx int + + // MaxSweepAttempts specifies the maximum number of times an input is + // included in a publish attempt before giving up and returning an error + // to the caller. + MaxSweepAttempts int + + // NextAttemptDeltaFunc returns given the number of already attempted + // sweeps, how many blocks to wait before retrying to sweep. + NextAttemptDeltaFunc func(int) int32 } -// New returns a new UtxoSweeper instance. +// Result is the struct that is pushed through the result channel. Callers can +// use this to be informed of the final sweep result. In case of a remote +// spend, Err will be ErrRemoteSpend. +type Result struct { + // Err is the final result of the sweep. It is nil when the input is + // swept successfully by us. ErrRemoteSpend is returned when another + // party took the input. + Err error + + // Tx is the transaction that spent the input. + Tx *wire.MsgTx +} + +// sweepInputMessage structs are used in the internal channel between the +// SweepInput call and the sweeper main loop. +type sweepInputMessage struct { + input Input + resultChan chan Result +} + +// New returns a new Sweeper instance. func New(cfg *UtxoSweeperConfig) *UtxoSweeper { + return &UtxoSweeper{ - cfg: cfg, + cfg: cfg, + newInputs: make(chan *sweepInputMessage), + spendChan: make(chan *chainntnfs.SpendDetail), + quit: make(chan struct{}), + pendingInputs: make(map[wire.OutPoint]*pendingInput), } } +// Start starts the process of constructing and publish sweep txes. +func (s *UtxoSweeper) Start() error { + if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { + return nil + } + + log.Tracef("Sweeper starting") + + // Retrieve last published tx from database. + lastTx, err := s.cfg.Store.GetLastPublishedTx() + if err != nil { + return fmt.Errorf("get last published tx: %v", err) + } + + // Republish in case the previous call crashed lnd. We don't care about + // the return value, because inputs will be re-offered and retried + // anyway. The only reason we republish here is to prevent the corner + // case where lnd goes into a restart loop because of a crashing publish + // tx where we keep deriving new output script. By publishing and + // possibly crashing already now, we haven't derived a new output script + // yet. + if lastTx != nil { + log.Debugf("Publishing last tx %v", lastTx.TxHash()) + + // Error can be ignored. Because we are starting up, there are + // no pending inputs to update based on the publish result. + err := s.cfg.PublishTransaction(lastTx) + if err != nil && err != lnwallet.ErrDoubleSpend { + log.Errorf("last tx publish: %v", err) + } + } + + // Retrieve relay fee for dust limit calculation. Assume that this will + // not change from here on. + s.relayFeePerKW = s.cfg.Estimator.RelayFeePerKW() + + // Register for block epochs to retry sweeping every block. + bestHash, bestHeight, err := s.cfg.ChainIO.GetBestBlock() + if err != nil { + return fmt.Errorf("get best block: %v", err) + } + + log.Debugf("Best height: %v", bestHeight) + + blockEpochs, err := s.cfg.Notifier.RegisterBlockEpochNtfn( + &chainntnfs.BlockEpoch{ + Height: bestHeight, + Hash: bestHash, + }, + ) + if err != nil { + return fmt.Errorf("register block epoch ntfn: %v", err) + } + + // Start sweeper main loop. + s.wg.Add(1) + go func() { + defer blockEpochs.Cancel() + defer s.wg.Done() + + err := s.collector(blockEpochs.Epochs, bestHeight) + if err != nil { + log.Errorf("sweeper stopped: %v", err) + } + }() + + return nil +} + +// Stop stops sweeper from listening to block epochs and constructing sweep +// txes. +func (s *UtxoSweeper) Stop() error { + if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { + return nil + } + + log.Debugf("Sweeper shutting down") + + close(s.quit) + s.wg.Wait() + + log.Debugf("Sweeper shut down") + + return nil +} + +// SweepInput sweeps inputs back into the wallet. The inputs will be batched and +// swept after the batch time window ends. +// +// NOTE: Extreme care needs to be taken that input isn't changed externally. +// Because it is an interface and we don't know what is exactly behind it, we +// cannot make a local copy in sweeper. +func (s *UtxoSweeper) SweepInput(input Input) (chan Result, error) { + if input == nil || input.OutPoint() == nil || input.SignDesc() == nil { + return nil, errors.New("nil input received") + } + + log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+ + "time_lock=%v, size=%v", input.OutPoint(), input.WitnessType(), + input.BlocksToMaturity(), + btcutil.Amount(input.SignDesc().Output.Value)) + + sweeperInput := &sweepInputMessage{ + input: input, + resultChan: make(chan Result, 1), + } + + // Deliver input to main event loop. + select { + case s.newInputs <- sweeperInput: + case <-s.quit: + return nil, fmt.Errorf("sweeper shutting down") + } + + return sweeperInput.resultChan, nil +} + +// collector is the sweeper main loop. It processes new inputs, spend +// notifications and counts down to publication of the sweep tx. +func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch, + bestHeight int32) error { + + for { + select { + // A new inputs is offered to the sweeper. We check to see if we + // are already trying to sweep this input and if not, set up a + // listener for spend and schedule a sweep. + case input := <-s.newInputs: + outpoint := *input.input.OutPoint() + pendInput, pending := s.pendingInputs[outpoint] + if pending { + log.Debugf("Already pending input %v received", + outpoint) + + // Add additional result channel to signal + // spend of this input. + pendInput.listeners = append( + pendInput.listeners, input.resultChan, + ) + continue + } + + // Create a new pendingInput and initialize the + // listeners slice with the passed in result channel. If + // this input is offered for sweep again, the result + // channel will be appended to this slice. + pendInput = &pendingInput{ + listeners: []chan Result{input.resultChan}, + input: input.input, + minPublishHeight: bestHeight, + } + s.pendingInputs[outpoint] = pendInput + + // Start watching for spend of this input, either by us + // or the remote party. + cancel, err := s.waitForSpend( + outpoint, + input.input.SignDesc().Output.PkScript, + input.input.HeightHint(), + ) + if err != nil { + err := fmt.Errorf("wait for spend: %v", err) + s.signalAndRemove(&outpoint, Result{Err: err}) + continue + } + pendInput.ntfnRegCancel = cancel + + // Check to see if with this new input a sweep tx can be + // formed. + if err := s.scheduleSweep(bestHeight); err != nil { + log.Errorf("schedule sweep: %v", err) + } + + // A spend of one of our inputs is detected. Signal sweep + // results to the caller(s). + case spend := <-s.spendChan: + // For testing purposes. + if s.testSpendChan != nil { + s.testSpendChan <- *spend.SpentOutPoint + } + + // Query store to find out if we every published this + // tx. + spendHash := *spend.SpenderTxHash + isOurTx, err := s.cfg.Store.IsOurTx(spendHash) + if err != nil { + log.Errorf("cannot determine if tx %v "+ + "is ours: %v", spendHash, err, + ) + continue + } + + log.Debugf("Detected spend related to in flight inputs "+ + "(is_ours=%v): %v", + newLogClosure(func() string { + return spew.Sdump(spend.SpendingTx) + }), isOurTx, + ) + + // Signal sweep results for inputs in this confirmed + // tx. + for _, txIn := range spend.SpendingTx.TxIn { + outpoint := txIn.PreviousOutPoint + + // Check if this input is known to us. It could + // probably be unknown if we canceled the + // registration, deleted from pendingInputs but + // the ntfn was in-flight already. Or this could + // be not one of our inputs. + _, ok := s.pendingInputs[outpoint] + if !ok { + continue + } + + // Return either a nil or a remote spend result. + var err error + if !isOurTx { + err = ErrRemoteSpend + } + + // Signal result channels. + s.signalAndRemove(&outpoint, Result{ + Tx: spend.SpendingTx, + Err: err, + }) + } + + // Now that an input of ours is spent, we can try to + // resweep the remaining inputs. + if err := s.scheduleSweep(bestHeight); err != nil { + log.Errorf("schedule sweep: %v", err) + } + + // The timer expires and we are going to (re)sweep. + case <-s.timer: + log.Debugf("Sweep timer expired") + + // Set timer to nil so we know that a new timer needs to + // be started when new inputs arrive. + s.timer = nil + + // Retrieve fee estimate for input filtering and final + // tx fee calculation. + satPerKW, err := s.cfg.Estimator.EstimateFeePerKW( + s.cfg.SweepTxConfTarget, + ) + if err != nil { + log.Errorf("estimate fee: %v", err) + continue + } + + // Examine pending inputs and try to construct lists of + // inputs. + inputLists, err := s.getInputLists(bestHeight, satPerKW) + if err != nil { + log.Errorf("get input lists: %v", err) + continue + } + + // Sweep selected inputs. + for _, inputs := range inputLists { + err := s.sweep(inputs, satPerKW, bestHeight) + if err != nil { + log.Errorf("sweep: %v", err) + } + } + + // A new block comes in. Things may have changed, so we retry a + // sweep. + case epoch, ok := <-blockEpochs: + if !ok { + return nil + } + + bestHeight = epoch.Height + + log.Debugf("New blocks: height=%v, sha=%v", + epoch.Height, epoch.Hash) + + if err := s.scheduleSweep(bestHeight); err != nil { + log.Errorf("schedule sweep: %v", err) + } + + case <-s.quit: + return nil + } + } +} + +// scheduleSweep starts the sweep timer to create an opportunity for more inputs +// to be added. +func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error { + // The timer is already ticking, no action needed for the sweep to + // happen. + if s.timer != nil { + log.Debugf("Timer still ticking") + return nil + } + + // Retrieve fee estimate for input filtering and final tx fee + // calculation. + satPerKW, err := s.cfg.Estimator.EstimateFeePerKW( + s.cfg.SweepTxConfTarget, + ) + if err != nil { + return fmt.Errorf("estimate fee: %v", err) + } + + // Examine pending inputs and try to construct lists of inputs. + inputLists, err := s.getInputLists(currentHeight, satPerKW) + if err != nil { + return fmt.Errorf("get input lists: %v", err) + } + + log.Infof("Sweep candidates at height=%v, yield %v distinct txns", + currentHeight, len(inputLists)) + + // If there are no input sets, there is nothing sweepable and we can + // return without starting the timer. + if len(inputLists) == 0 { + return nil + } + + // Start sweep timer to create opportunity for more inputs to be added + // before a tx is constructed. + s.timer = s.cfg.NewBatchTimer() + + log.Debugf("Sweep timer started") + + return nil +} + +// signalAndRemove notifies the listeners of the final result of the input +// sweep. It cancels any pending spend notification and removes the input from +// the list of pending inputs. When this function returns, the sweeper has +// completely forgotten about the input. +func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) { + pendInput := s.pendingInputs[*outpoint] + listeners := pendInput.listeners + + if result.Err == nil { + log.Debugf("Dispatching sweep success for %v to %v listeners", + outpoint, len(listeners), + ) + } else { + log.Debugf("Dispatching sweep error for %v to %v listeners: %v", + outpoint, len(listeners), result.Err, + ) + } + + // Signal all listeners. Channel is buffered. Because we only send once + // on every channel, it should never block. + for _, resultChan := range listeners { + resultChan <- result + } + + // Cancel spend notification with chain notifier. This is not necessary + // in case of a success, except for that a reorg could still happen. + if pendInput.ntfnRegCancel != nil { + log.Debugf("Canceling spend ntfn for %v", outpoint) + + pendInput.ntfnRegCancel() + } + + // Inputs are no longer pending after result has been sent. + delete(s.pendingInputs, *outpoint) +} + +// getInputLists goes through all pending inputs and constructs sweep lists, +// each up to the configured maximum number of inputs. Negative yield inputs are +// skipped. Transactions with an output below the dust limit are not published. +// Those inputs remain pending and will be bundled with future inputs if +// possible. +func (s *UtxoSweeper) getInputLists(currentHeight int32, + satPerKW lnwallet.SatPerKWeight) ([]inputSet, error) { + + // Filter for inputs that need to be swept. Create two lists: all + // sweepable inputs and a list containing only the new, never tried + // inputs. + // + // We want to create as large a tx as possible, so we return a final set + // list that starts with sets created from all inputs. However, there is + // a chance that those txes will not publish, because they already + // contain inputs that failed before. Therefore we also add sets + // consisting of only new inputs to the list, to make sure that new + // inputs are given a good, isolated chance of being published. + var newInputs, retryInputs []Input + for _, input := range s.pendingInputs { + // Skip inputs that have a minimum publish height that is not + // yet reached. + if input.minPublishHeight > currentHeight { + continue + } + + // Add input to the either one of the lists. + if input.publishAttempts == 0 { + newInputs = append(newInputs, input.input) + } else { + retryInputs = append(retryInputs, input.input) + } + } + + // If there is anything to retry, combine it with the new inputs and + // form input sets. + var allSets []inputSet + if len(retryInputs) > 0 { + var err error + allSets, err = generateInputPartitionings( + append(retryInputs, newInputs...), + s.relayFeePerKW, satPerKW, + s.cfg.MaxInputsPerTx, + ) + if err != nil { + return nil, fmt.Errorf("input partitionings: %v", err) + } + } + + // Create sets for just the new inputs. + newSets, err := generateInputPartitionings( + newInputs, + s.relayFeePerKW, satPerKW, + s.cfg.MaxInputsPerTx, + ) + if err != nil { + return nil, fmt.Errorf("input partitionings: %v", err) + } + + log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+ + "total_num_new=%v", currentHeight, len(allSets), len(newSets)) + + // Append the new sets at the end of the list, because those tx likely + // have a higher fee per input. + return append(allSets, newSets...), nil +} + +// sweep takes a set of preselected inputs, creates a sweep tx and publishes the +// tx. The output address is only marked as used if the publish succeeds. +func (s *UtxoSweeper) sweep(inputs inputSet, + satPerKW lnwallet.SatPerKWeight, currentHeight int32) error { + + var err error + + // Generate output script if no unused script available. + if s.currentOutputScript == nil { + s.currentOutputScript, err = s.cfg.GenSweepScript() + if err != nil { + return fmt.Errorf("gen sweep script: %v", err) + } + } + + // Create sweep tx. + tx, err := createSweepTx( + inputs, s.currentOutputScript, + uint32(currentHeight), satPerKW, s.cfg.Signer, + ) + if err != nil { + return fmt.Errorf("create sweep tx: %v", err) + } + + // Add tx before publication, so that we will always know that a spend + // by this tx is ours. Otherwise if the publish doesn't return, but did + // publish, we loose track of this tx. Even republication on startup + // doesn't prevent this, because that call returns a double spend error + // then and would also not add the hash to the store. + err = s.cfg.Store.NotifyPublishTx(tx) + if err != nil { + return fmt.Errorf("notify publish tx: %v", err) + } + + // Publish sweep tx. + log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v", + tx.TxHash(), len(tx.TxIn), currentHeight) + + log.Tracef("Sweep tx at height=%v: %v", currentHeight, + newLogClosure(func() string { + return spew.Sdump(tx) + }), + ) + + err = s.cfg.PublishTransaction(tx) + + // In case of an unexpected error, don't try to recover. + if err != nil && err != lnwallet.ErrDoubleSpend { + return fmt.Errorf("publish tx: %v", err) + } + + // Keep outputScript in case of an error, so that it can be reused for + // the next tx and causes no address inflation. + if err == nil { + s.currentOutputScript = nil + } + + // Reschedule sweep. + for _, input := range tx.TxIn { + pi, ok := s.pendingInputs[input.PreviousOutPoint] + if !ok { + // It can be that the input has been removed because it + // exceed the maximum number of attempts in a previous + // input set. + continue + } + + // Record another publish attempt. + pi.publishAttempts++ + + // We don't care what the result of the publish call was. Even + // if it is published successfully, it can still be that it + // needs to be retried. Call NextAttemptDeltaFunc to calculate + // when to resweep this input. + nextAttemptDelta := s.cfg.NextAttemptDeltaFunc( + pi.publishAttempts, + ) + + pi.minPublishHeight = currentHeight + nextAttemptDelta + + log.Debugf("Rescheduling input %v after %v attempts at "+ + "height %v (delta %v)", input.PreviousOutPoint, + pi.publishAttempts, pi.minPublishHeight, + nextAttemptDelta) + + if pi.publishAttempts >= s.cfg.MaxSweepAttempts { + // Signal result channels sweep result. + s.signalAndRemove(&input.PreviousOutPoint, Result{ + Err: ErrTooManyAttempts, + }) + } + } + return nil +} + +// waitForSpend registers a spend notification with the chain notifier. It +// returns a cancel function that can be used to cancel the registration. +func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint, + script []byte, heightHint uint32) (func(), error) { + + log.Debugf("Wait for spend of %v", outpoint) + + spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn( + &outpoint, script, heightHint, + ) + if err != nil { + return nil, fmt.Errorf("register spend ntfn: %v", err) + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + select { + case spend, ok := <-spendEvent.Spend: + if !ok { + log.Debugf("Spend ntfn for %v canceled", + outpoint) + return + } + + log.Debugf("Delivering spend ntfn for %v", + outpoint) + select { + case s.spendChan <- spend: + log.Debugf("Delivered spend ntfn for %v", + outpoint) + + case <-s.quit: + } + case <-s.quit: + } + }() + + return spendEvent.Cancel, nil +} + // CreateSweepTx accepts a list of inputs and signs and generates a txn that // spends from them. This method also makes an accurate fee estimate before // generating the required witnesses. @@ -53,14 +753,13 @@ func New(cfg *UtxoSweeperConfig) *UtxoSweeper { func (s *UtxoSweeper) CreateSweepTx(inputs []Input, confTarget uint32, currentBlockHeight uint32) (*wire.MsgTx, error) { - // Generate the receiving script to which the funds will be swept. - pkScript, err := s.cfg.GenSweepScript() + feePerKw, err := s.cfg.Estimator.EstimateFeePerKW(confTarget) if err != nil { return nil, err } - // Using the txn weight estimate, compute the required txn fee. - feePerKw, err := s.cfg.Estimator.EstimateFeePerKW(confTarget) + // Generate the receiving script to which the funds will be swept. + pkScript, err := s.cfg.GenSweepScript() if err != nil { return nil, err } @@ -69,3 +768,16 @@ func (s *UtxoSweeper) CreateSweepTx(inputs []Input, confTarget uint32, inputs, pkScript, currentBlockHeight, feePerKw, s.cfg.Signer, ) } + +// DefaultNextAttemptDeltaFunc is the default calculation for next sweep attempt +// scheduling. It implements exponential back-off with some randomness. This is +// to prevent a stuck tx (for example because fee is too low and can't be bumped +// in btcd) from blocking all other retried inputs in the same tx. +func DefaultNextAttemptDeltaFunc(attempts int) int32 { + return 1 + rand.Int31n(1<