sweep: introduce Bumper interface to handle RBF

This commit adds a new interface, `Bumper`, to handle RBF for a given
input set. It's responsible for creating the sweeping tx using the input
set, and monitors its confirmation status to decide whether a RBF should
be attempted or not.

We leave implementation details to future commits, and focus on mounting
this `Bumper` interface to our sweeper in this commit.
This commit is contained in:
yyforyongyu
2024-01-17 17:21:09 +08:00
parent a088501e47
commit 1187b868ad
5 changed files with 812 additions and 51 deletions

View File

@@ -13,7 +13,6 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
@@ -41,6 +40,12 @@ var (
// an input is included in a publish attempt before giving up and
// returning an error to the caller.
DefaultMaxSweepAttempts = 10
// DefaultDeadlineDelta defines a default deadline delta (1 week) to be
// used when sweeping inputs with no deadline pressure.
//
// TODO(yy): make this configurable.
DefaultDeadlineDelta = int32(1008)
)
// Params contains the parameters that control the sweeping process.
@@ -317,6 +322,10 @@ type UtxoSweeper struct {
// currentHeight is the best known height of the main chain. This is
// updated whenever a new block epoch is received.
currentHeight int32
// bumpResultChan is a channel that receives broadcast results from the
// TxPublisher.
bumpResultChan chan *BumpResult
}
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
@@ -364,6 +373,10 @@ type UtxoSweeperConfig struct {
// Aggregator is used to group inputs into clusters based on its
// implemention-specific strategy.
Aggregator UtxoAggregator
// Publisher is used to publish the sweep tx crafted here and monitors
// it for potential fee bumps.
Publisher Bumper
}
// Result is the struct that is pushed through the result channel. Callers can
@@ -397,6 +410,7 @@ func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
pendingSweepsReqs: make(chan *pendingSweepsReq),
quit: make(chan struct{}),
pendingInputs: make(pendingInputs),
bumpResultChan: make(chan *BumpResult, 100),
}
}
@@ -670,11 +684,16 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
err: err,
}
// A new block comes in, update the bestHeight.
//
// TODO(yy): this is where we check our published transactions
// and perform RBF if needed. We'd also like to consult our fee
// bumper to get an updated fee rate.
case result := <-s.bumpResultChan:
// Handle the bump event.
err := s.handleBumpEvent(result)
if err != nil {
log.Errorf("Failed to handle bump event: %v",
err)
}
// A new block comes in, update the bestHeight, perform a check
// over all pending inputs and publish sweeping txns if needed.
case epoch, ok := <-blockEpochs:
if !ok {
// We should stop the sweeper before stopping
@@ -779,8 +798,8 @@ func (s *UtxoSweeper) signalResult(pi *pendingInput, result Result) {
}
}
// sweep takes a set of preselected inputs, creates a sweep tx and publishes the
// tx. The output address is only marked as used if the publish succeeds.
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
// the tx. The output address is only marked as used if the publish succeeds.
func (s *UtxoSweeper) sweep(set InputSet) error {
// Generate an output script if there isn't an unused script available.
if s.currentOutputScript == nil {
@@ -791,20 +810,21 @@ func (s *UtxoSweeper) sweep(set InputSet) error {
s.currentOutputScript = pkScript
}
// Create sweep tx.
tx, fee, err := createSweepTx(
set.Inputs(), nil, s.currentOutputScript,
uint32(s.currentHeight), set.FeeRate(),
s.cfg.MaxFeeRate.FeePerKWeight(), s.cfg.Signer,
)
if err != nil {
return fmt.Errorf("create sweep tx: %w", err)
}
// Create a default deadline height, and replace it with set's
// DeadlineHeight if it's set.
deadlineHeight := s.currentHeight + DefaultDeadlineDelta
deadlineHeight = set.DeadlineHeight().UnwrapOr(deadlineHeight)
tr := &TxRecord{
Txid: tx.TxHash(),
FeeRate: uint64(set.FeeRate()),
Fee: uint64(fee),
// Create a fee bump request and ask the publisher to broadcast it. The
// publisher will then take over and start monitoring the tx for
// potential fee bump.
req := &BumpRequest{
Inputs: set.Inputs(),
Budget: set.Budget(),
DeadlineHeight: deadlineHeight,
DeliveryAddress: s.currentOutputScript,
MaxFeeRate: s.cfg.MaxFeeRate.FeePerKWeight(),
// TODO(yy): pass the strategy here.
}
// Reschedule the inputs that we just tried to sweep. This is done in
@@ -812,13 +832,9 @@ func (s *UtxoSweeper) sweep(set InputSet) error {
// publish attempts and rescue them in the next sweep.
s.markInputsPendingPublish(set)
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
tx.TxHash(), len(tx.TxIn), s.currentHeight)
// Publish the sweeping tx with customized label.
err = s.cfg.Wallet.PublishTransaction(
tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
)
// Broadcast will return a read-only chan that we will listen to for
// this publish result and future RBF attempt.
resp, err := s.cfg.Publisher.Broadcast(req)
if err != nil {
outpoints := make([]wire.OutPoint, len(set.Inputs()))
for i, inp := range set.Inputs() {
@@ -831,16 +847,11 @@ func (s *UtxoSweeper) sweep(set InputSet) error {
return err
}
// Inputs have been successfully published so we update their states.
err = s.markInputsPublished(tr, tx.TxIn)
if err != nil {
return err
}
// If there's no error, remove the output script. Otherwise keep it so
// that it can be reused for the next transaction and causes no address
// inflation.
s.currentOutputScript = nil
// Successfully sent the broadcast attempt, we now handle the result by
// subscribing to the result chan and listen for future updates about
// this tx.
s.wg.Add(1)
go s.monitorFeeBumpResult(resp)
return nil
}
@@ -1557,3 +1568,167 @@ func (s *UtxoSweeper) sweepPendingInputs(inputs pendingInputs) {
}
}
}
// monitorFeeBumpResult subscribes to the passed result chan to listen for
// future updates about the sweeping tx.
//
// NOTE: must run as a goroutine.
func (s *UtxoSweeper) monitorFeeBumpResult(resultChan <-chan *BumpResult) {
defer s.wg.Done()
for {
select {
case r := <-resultChan:
// Validate the result is valid.
if err := r.Validate(); err != nil {
log.Errorf("Received invalid result: %v", err)
continue
}
// Send the result back to the main event loop.
select {
case s.bumpResultChan <- r:
case <-s.quit:
log.Debug("Sweeper shutting down, skip " +
"sending bump result")
return
}
// The sweeping tx has been confirmed, we can exit the
// monitor now.
//
// TODO(yy): can instead remove the spend subscription
// in sweeper and rely solely on this event to mark
// inputs as Swept?
if r.Event == TxConfirmed || r.Event == TxFailed {
log.Debugf("Received %v for sweep tx %v, exit "+
"fee bump monitor", r.Event,
r.Tx.TxHash())
return
}
case <-s.quit:
log.Debugf("Sweeper shutting down, exit fee " +
"bump handler")
return
}
}
}
// handleBumpEventTxFailed handles the case where the tx has been failed to
// publish.
func (s *UtxoSweeper) handleBumpEventTxFailed(r *BumpResult) error {
tx, err := r.Tx, r.Err
log.Errorf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(), err)
outpoints := make([]wire.OutPoint, 0, len(tx.TxIn))
for _, inp := range tx.TxIn {
outpoints = append(outpoints, inp.PreviousOutPoint)
}
// TODO(yy): should we also remove the failed tx from db?
s.markInputsPublishFailed(outpoints)
return err
}
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
// replaced by a new one.
func (s *UtxoSweeper) handleBumpEventTxReplaced(r *BumpResult) error {
oldTx := r.ReplacedTx
newTx := r.Tx
// Prepare a new record to replace the old one.
tr := &TxRecord{
Txid: newTx.TxHash(),
FeeRate: uint64(r.FeeRate),
Fee: uint64(r.Fee),
}
// Get the old record for logging purpose.
oldTxid := oldTx.TxHash()
record, err := s.cfg.Store.GetTx(oldTxid)
if err != nil {
log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
return err
}
log.Infof("RBFed tx=%v(fee=%v, feerate=%v) with new tx=%v(fee=%v, "+
"feerate=%v)", record.Txid, record.Fee, record.FeeRate,
tr.Txid, tr.Fee, tr.FeeRate)
// The old sweeping tx has been replaced by a new one, we will update
// the tx record in the sweeper db.
//
// TODO(yy): we may also need to update the inputs in this tx to a new
// state. Suppose a replacing tx only spends a subset of the inputs
// here, we'd end up with the rest being marked as `StatePublished` and
// won't be aggregated in the next sweep. Atm it's fine as we always
// RBF the same input set.
if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
log.Errorf("Delete tx record for %v: %v", oldTxid, err)
return err
}
// Mark the inputs as published using the replacing tx.
return s.markInputsPublished(tr, r.Tx.TxIn)
}
// handleBumpEventTxPublished handles the case where the sweeping tx has been
// successfully published.
func (s *UtxoSweeper) handleBumpEventTxPublished(r *BumpResult) error {
tx := r.Tx
tr := &TxRecord{
Txid: tx.TxHash(),
FeeRate: uint64(r.FeeRate),
Fee: uint64(r.Fee),
}
// Inputs have been successfully published so we update their
// states.
err := s.markInputsPublished(tr, tx.TxIn)
if err != nil {
return err
}
log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
tx.TxHash(), len(tx.TxIn), s.currentHeight)
// If there's no error, remove the output script. Otherwise
// keep it so that it can be reused for the next transaction
// and causes no address inflation.
s.currentOutputScript = nil
return nil
}
// handleBumpEvent handles the result sent from the bumper based on its event
// type.
//
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
// input's spending event, we don't need to do anything here.
func (s *UtxoSweeper) handleBumpEvent(r *BumpResult) error {
log.Debugf("Received bump event [%v] for tx %v", r.Event, r.Tx.TxHash())
switch r.Event {
// The tx has been published, we update the inputs' state and create a
// record to be stored in the sweeper db.
case TxPublished:
return s.handleBumpEventTxPublished(r)
// The tx has failed, we update the inputs' state.
case TxFailed:
return s.handleBumpEventTxFailed(r)
// The tx has been replaced, we will remove the old tx and replace it
// with the new one.
case TxReplaced:
return s.handleBumpEventTxReplaced(r)
}
return nil
}