mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-01 18:27:43 +02:00
lnwallet+sweep: introduce TxPublisher
to handle fee bump
This commit adds `TxPublisher` which implements `Bumper` interface. This is part one of the implementation that focuses on implementing the `Broadcast` method which guarantees a tx can be published with RBF-compliant. It does so by leveraging the `testmempoolaccept` API, keep increasing the fee rate until an RBF-compliant tx is made and broadcasts it. This tx will then be monitored by the `TxPublisher` and in the following commit, the monitoring process will be added.
This commit is contained in:
@@ -3,16 +3,29 @@ package sweep
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/btcsuite/btcd/btcutil"
|
||||
"github.com/btcsuite/btcd/rpcclient"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/btcsuite/btcwallet/chain"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/input"
|
||||
"github.com/lightningnetwork/lnd/labels"
|
||||
"github.com/lightningnetwork/lnd/lnutils"
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrInvalidBumpResult is returned when the bump result is invalid.
|
||||
ErrInvalidBumpResult = errors.New("invalid bump result")
|
||||
|
||||
// ErrNotEnoughBudget is returned when the fee bumper decides the
|
||||
// current budget cannot cover the fee.
|
||||
ErrNotEnoughBudget = errors.New("not enough budget")
|
||||
)
|
||||
|
||||
// Bumper defines an interface that can be used by other subsystems for fee
|
||||
@@ -165,6 +178,9 @@ type BumpResult struct {
|
||||
|
||||
// Err is the error that occurred during the broadcast.
|
||||
Err error
|
||||
|
||||
// requestID is the ID of the request that created this record.
|
||||
requestID uint64
|
||||
}
|
||||
|
||||
// Validate validates the BumpResult so it's safe to use.
|
||||
@@ -197,3 +213,460 @@ func (b *BumpResult) Validate() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TxPublisherConfig is the config used to create a new TxPublisher.
|
||||
type TxPublisherConfig struct {
|
||||
// Signer is used to create the tx signature.
|
||||
Signer input.Signer
|
||||
|
||||
// Wallet is used primarily to publish the tx.
|
||||
Wallet Wallet
|
||||
|
||||
// Estimator is used to estimate the fee rate for the new tx based on
|
||||
// its deadline conf target.
|
||||
Estimator chainfee.Estimator
|
||||
|
||||
// Notifier is used to monitor the confirmation status of the tx.
|
||||
Notifier chainntnfs.ChainNotifier
|
||||
}
|
||||
|
||||
// TxPublisher is an implementation of the Bumper interface. It utilizes the
|
||||
// `testmempoolaccept` RPC to bump the fee of txns it created based on
|
||||
// different fee function selected or configed by the caller. Its purpose is to
|
||||
// take a list of inputs specified, and create a tx that spends them to a
|
||||
// specified output. It will then monitor the confirmation status of the tx,
|
||||
// and if it's not confirmed within a certain time frame, it will attempt to
|
||||
// bump the fee of the tx by creating a new tx that spends the same inputs to
|
||||
// the same output, but with a higher fee rate. It will continue to do this
|
||||
// until the tx is confirmed or the fee rate reaches the maximum fee rate
|
||||
// specified by the caller.
|
||||
type TxPublisher struct {
|
||||
wg sync.WaitGroup
|
||||
|
||||
// cfg specifies the configuration of the TxPublisher.
|
||||
cfg *TxPublisherConfig
|
||||
|
||||
// currentHeight is the current block height.
|
||||
currentHeight int32
|
||||
|
||||
// records is a map keyed by the requestCounter and the value is the tx
|
||||
// being monitored.
|
||||
records lnutils.SyncMap[uint64, *monitorRecord]
|
||||
|
||||
// requestCounter is a monotonically increasing counter used to keep
|
||||
// track of how many requests have been made.
|
||||
requestCounter atomic.Uint64
|
||||
|
||||
// subscriberChans is a map keyed by the requestCounter, each item is
|
||||
// the chan that the publisher sends the fee bump result to.
|
||||
subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
|
||||
|
||||
// quit is used to signal the publisher to stop.
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// Compile-time constraint to ensure TxPublisher implements Bumper.
|
||||
var _ Bumper = (*TxPublisher)(nil)
|
||||
|
||||
// NewTxPublisher creates a new TxPublisher.
|
||||
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
|
||||
return &TxPublisher{
|
||||
cfg: &cfg,
|
||||
records: lnutils.SyncMap[uint64, *monitorRecord]{},
|
||||
subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast is used to publish the tx created from the given inputs. It will,
|
||||
// 1. init a fee function based on the given strategy.
|
||||
// 2. create an RBF-compliant tx and monitor it for confirmation.
|
||||
// 3. notify the initial broadcast result back to the caller.
|
||||
// The initial broadcast is guaranteed to be RBF-compliant unless the budget
|
||||
// specified cannot cover the fee.
|
||||
//
|
||||
// NOTE: part of the Bumper interface.
|
||||
func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
|
||||
log.Tracef("Received broadcast request: %s", newLogClosure(
|
||||
func() string {
|
||||
return spew.Sdump(req)
|
||||
})())
|
||||
|
||||
// Attempt an initial broadcast which is guaranteed to comply with the
|
||||
// RBF rules.
|
||||
result, err := t.initialBroadcast(req)
|
||||
if err != nil {
|
||||
log.Errorf("Initial broadcast failed: %v", err)
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create a chan to send the result to the caller.
|
||||
subscriber := make(chan *BumpResult, 1)
|
||||
t.subscriberChans.Store(result.requestID, subscriber)
|
||||
|
||||
// Send the initial broadcast result to the caller.
|
||||
t.handleResult(result)
|
||||
|
||||
return subscriber, nil
|
||||
}
|
||||
|
||||
// initialBroadcast initializes a fee function, creates an RBF-compliant tx and
|
||||
// broadcasts it.
|
||||
func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
|
||||
// Create a fee bumping algorithm to be used for future RBF.
|
||||
feeAlgo, err := t.initializeFeeFunction(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init fee function: %w", err)
|
||||
}
|
||||
|
||||
// Create the initial tx to be broadcasted. This tx is guaranteed to
|
||||
// comply with the RBF restrictions.
|
||||
requestID, err := t.createRBFCompliantTx(req, feeAlgo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
|
||||
}
|
||||
|
||||
// Broadcast the tx and return the monitored record.
|
||||
result, err := t.broadcast(requestID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("broadcast sweep tx: %w", err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// initializeFeeFunction initializes a fee function to be used for this request
|
||||
// for future fee bumping.
|
||||
func (t *TxPublisher) initializeFeeFunction(
|
||||
req *BumpRequest) (FeeFunction, error) {
|
||||
|
||||
// Get the max allowed feerate.
|
||||
maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get the initial conf target.
|
||||
confTarget := calcCurrentConfTarget(t.currentHeight, req.DeadlineHeight)
|
||||
|
||||
// Initialize the fee function and return it.
|
||||
//
|
||||
// TODO(yy): return based on differet req.Strategy?
|
||||
return NewLinearFeeFunction(
|
||||
maxFeeRateAllowed, confTarget, t.cfg.Estimator,
|
||||
)
|
||||
}
|
||||
|
||||
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
|
||||
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
|
||||
// and redo the process until the tx is valid, or return an error when non-RBF
|
||||
// related errors occur or the budget has been used up.
|
||||
func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
|
||||
f FeeFunction) (uint64, error) {
|
||||
|
||||
for {
|
||||
// Create a new tx with the given fee rate and check its
|
||||
// mempool acceptance.
|
||||
tx, fee, err := t.createAndCheckTx(req, f)
|
||||
|
||||
switch {
|
||||
case err == nil:
|
||||
// The tx is valid, return the request ID.
|
||||
requestID := t.storeRecord(tx, req, f, fee)
|
||||
|
||||
log.Infof("Created tx %v for %v inputs: feerate=%v, "+
|
||||
"fee=%v, inputs=%v", tx.TxHash(),
|
||||
len(req.Inputs), f.FeeRate(), fee,
|
||||
inputTypeSummary(req.Inputs))
|
||||
|
||||
return requestID, nil
|
||||
|
||||
// If the error indicates the fees paid is not enough, we will
|
||||
// ask the fee function to increase the fee rate and retry.
|
||||
case errors.Is(err, lnwallet.ErrMempoolFee):
|
||||
// We should at least start with a feerate above the
|
||||
// mempool min feerate, so if we get this error, it
|
||||
// means something is wrong earlier in the pipeline.
|
||||
log.Errorf("Current fee=%v, feerate=%v, %v", fee,
|
||||
f.FeeRate(), err)
|
||||
|
||||
fallthrough
|
||||
|
||||
// We are not paying enough fees so we increase it.
|
||||
case errors.Is(err, rpcclient.ErrInsufficientFee):
|
||||
increased := false
|
||||
|
||||
// Keep calling the fee function until the fee rate is
|
||||
// increased or maxed out.
|
||||
for !increased {
|
||||
log.Debugf("Increasing fee for next round, "+
|
||||
"current fee=%v, feerate=%v", fee,
|
||||
f.FeeRate())
|
||||
|
||||
// If the fee function tells us that we have
|
||||
// used up the budget, we will return an error
|
||||
// indicating this tx cannot be made. The
|
||||
// sweeper should handle this error and try to
|
||||
// cluster these inputs differetly.
|
||||
increased, err = f.Increment()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yy): suppose there's only one bad input, we can do a
|
||||
// binary search to find out which input is causing this error
|
||||
// by recreating a tx using half of the inputs and check its
|
||||
// mempool acceptance.
|
||||
default:
|
||||
log.Debugf("Failed to create RBF-compliant tx: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// storeRecord stores the given record in the records map.
|
||||
func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
|
||||
f FeeFunction, fee btcutil.Amount) uint64 {
|
||||
|
||||
// Increase the request counter.
|
||||
//
|
||||
// NOTE: this is the only place where we increase the
|
||||
// counter.
|
||||
requestID := t.requestCounter.Add(1)
|
||||
|
||||
// Register the record.
|
||||
t.records.Store(requestID, &monitorRecord{
|
||||
tx: tx,
|
||||
req: req,
|
||||
feeFunction: f,
|
||||
fee: fee,
|
||||
})
|
||||
|
||||
return requestID
|
||||
}
|
||||
|
||||
// createAndCheckTx creates a tx based on the given inputs, change output
|
||||
// script, and the fee rate. In addition, it validates the tx's mempool
|
||||
// acceptance before returning a tx that can be published directly, along with
|
||||
// its fee.
|
||||
func (t *TxPublisher) createAndCheckTx(req *BumpRequest, f FeeFunction) (
|
||||
*wire.MsgTx, btcutil.Amount, error) {
|
||||
|
||||
// Create the sweep tx with max fee rate of 0 as the fee function
|
||||
// guarantees the fee rate used here won't exceed the max fee rate.
|
||||
//
|
||||
// TODO(yy): refactor this function to not require a max fee rate.
|
||||
tx, fee, err := createSweepTx(
|
||||
req.Inputs, nil, req.DeliveryAddress, uint32(t.currentHeight),
|
||||
f.FeeRate(), 0, t.cfg.Signer,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("create sweep tx: %w", err)
|
||||
}
|
||||
|
||||
// Sanity check the budget still covers the fee.
|
||||
if fee > req.Budget {
|
||||
return nil, 0, fmt.Errorf("%w: budget=%v, fee=%v",
|
||||
ErrNotEnoughBudget, req.Budget, fee)
|
||||
}
|
||||
|
||||
// Validate the tx's mempool acceptance.
|
||||
err = t.cfg.Wallet.CheckMempoolAcceptance(tx)
|
||||
|
||||
// Exit early if the tx is valid.
|
||||
if err == nil {
|
||||
return tx, fee, nil
|
||||
}
|
||||
|
||||
// Print an error log if the chain backend doesn't support the mempool
|
||||
// acceptance test RPC.
|
||||
if errors.Is(err, rpcclient.ErrBackendVersion) {
|
||||
log.Errorf("TestMempoolAccept not supported by backend, " +
|
||||
"consider upgrading it to a newer version")
|
||||
return tx, fee, nil
|
||||
}
|
||||
|
||||
// We are running on a backend that doesn't implement the RPC
|
||||
// testmempoolaccept, eg, neutrino, so we'll skip the check.
|
||||
if errors.Is(err, chain.ErrUnimplemented) {
|
||||
log.Debug("Skipped testmempoolaccept due to not implemented")
|
||||
return tx, fee, nil
|
||||
}
|
||||
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// broadcast takes a monitored tx and publishes it to the network. Prior to the
|
||||
// broadcast, it will subscribe the tx's confirmation notification and attach
|
||||
// the event channel to the record. Any broadcast-related errors will not be
|
||||
// returned here, instead, they will be put inside the `BumpResult` and
|
||||
// returned to the caller.
|
||||
func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
|
||||
// Get the record being monitored.
|
||||
record, ok := t.records.Load(requestID)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("tx record %v not found", requestID)
|
||||
}
|
||||
|
||||
txid := record.tx.TxHash()
|
||||
|
||||
// Subscribe to its confirmation notification.
|
||||
confEvent, err := t.cfg.Notifier.RegisterConfirmationsNtfn(
|
||||
&txid, nil, 1, uint32(t.currentHeight),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("register confirmation ntfn: %w", err)
|
||||
}
|
||||
|
||||
// Attach the confirmation event channel to the record.
|
||||
record.confEvent = confEvent
|
||||
|
||||
tx := record.tx
|
||||
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
|
||||
txid, len(tx.TxIn), t.currentHeight)
|
||||
|
||||
// Set the event, and change it to TxFailed if the wallet fails to
|
||||
// publish it.
|
||||
event := TxPublished
|
||||
|
||||
// Publish the sweeping tx with customized label. If the publish fails,
|
||||
// this error will be saved in the `BumpResult` and it will be removed
|
||||
// from being monitored.
|
||||
err = t.cfg.Wallet.PublishTransaction(
|
||||
tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
|
||||
)
|
||||
if err != nil {
|
||||
// NOTE: we decide to attach this error to the result instead
|
||||
// of returning it here because by the time the tx reaches
|
||||
// here, it should have passed the mempool acceptance check. If
|
||||
// it still fails to be broadcast, it's likely a non-RBF
|
||||
// related error happened. So we send this error back to the
|
||||
// caller so that it can handle it properly.
|
||||
//
|
||||
// TODO(yy): find out which input is causing the failure.
|
||||
log.Errorf("Failed to publish tx %v: %v", txid, err)
|
||||
event = TxFailed
|
||||
}
|
||||
|
||||
result := &BumpResult{
|
||||
Event: event,
|
||||
Tx: record.tx,
|
||||
Fee: record.fee,
|
||||
FeeRate: record.feeFunction.FeeRate(),
|
||||
Err: err,
|
||||
requestID: requestID,
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// notifyResult sends the result to the resultChan specified by the requestID.
|
||||
// This channel is expected to be read by the caller.
|
||||
func (t *TxPublisher) notifyResult(result *BumpResult) {
|
||||
id := result.requestID
|
||||
subscriber, ok := t.subscriberChans.Load(id)
|
||||
if !ok {
|
||||
log.Errorf("Result chan for id=%v not found", id)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("Sending result for requestID=%v, tx=%v", id,
|
||||
result.Tx.TxHash())
|
||||
|
||||
select {
|
||||
// Send the result to the subscriber.
|
||||
//
|
||||
// TODO(yy): Add timeout in case it's blocking?
|
||||
case subscriber <- result:
|
||||
case <-t.quit:
|
||||
log.Debug("Fee bumper stopped")
|
||||
}
|
||||
}
|
||||
|
||||
// removeResult removes the tracking of the result if the result contains a
|
||||
// non-nil error, or the tx is confirmed, the record will be removed from the
|
||||
// maps.
|
||||
func (t *TxPublisher) removeResult(result *BumpResult) {
|
||||
id := result.requestID
|
||||
|
||||
// Remove the record from the maps if there's an error. This means this
|
||||
// tx has failed its broadcast and cannot be retried. There are two
|
||||
// cases,
|
||||
// - when the budget cannot cover the fee.
|
||||
// - when a non-RBF related error occurs.
|
||||
switch result.Event {
|
||||
case TxFailed:
|
||||
log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
|
||||
id, result.Tx.TxHash(), result.Err)
|
||||
|
||||
case TxConfirmed:
|
||||
// Remove the record is the tx is confirmed.
|
||||
log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
|
||||
result.Tx.TxHash())
|
||||
|
||||
// Do nothing if it's neither failed or confirmed.
|
||||
default:
|
||||
log.Tracef("Skipping record removal for id=%v, event=%v", id,
|
||||
result.Event)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
t.records.Delete(id)
|
||||
t.subscriberChans.Delete(id)
|
||||
}
|
||||
|
||||
// handleResult handles the result of a tx broadcast. It will notify the
|
||||
// subscriber and remove the record if the tx is confirmed or failed to be
|
||||
// broadcast.
|
||||
func (t *TxPublisher) handleResult(result *BumpResult) {
|
||||
// Notify the subscriber.
|
||||
t.notifyResult(result)
|
||||
|
||||
// Remove the record if it's failed or confirmed.
|
||||
t.removeResult(result)
|
||||
}
|
||||
|
||||
// monitorRecord is used to keep track of the tx being monitored by the
|
||||
// publisher internally.
|
||||
type monitorRecord struct {
|
||||
// tx is the tx being monitored.
|
||||
tx *wire.MsgTx
|
||||
|
||||
// req is the original request.
|
||||
req *BumpRequest
|
||||
|
||||
// confEvent is the subscription to the confirmation event of the tx.
|
||||
confEvent *chainntnfs.ConfirmationEvent
|
||||
|
||||
// feeFunction is the fee bumping algorithm used by the publisher.
|
||||
feeFunction FeeFunction
|
||||
|
||||
// fee is the fee paid by the tx.
|
||||
fee btcutil.Amount
|
||||
}
|
||||
|
||||
// calcCurrentConfTarget calculates the current confirmation target based on
|
||||
// the deadline height. The conf target is capped at 0 if the deadline has
|
||||
// already been past.
|
||||
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
|
||||
var confTarget uint32
|
||||
|
||||
// Calculate how many blocks left until the deadline.
|
||||
deadlineDelta := deadline - currentHeight
|
||||
|
||||
// If we are already past the deadline, we will set the conf target to
|
||||
// be 1.
|
||||
if deadlineDelta <= 0 {
|
||||
log.Warnf("Deadline is %d blocks behind current height %v",
|
||||
-deadlineDelta, currentHeight)
|
||||
|
||||
confTarget = 1
|
||||
} else {
|
||||
confTarget = uint32(deadlineDelta)
|
||||
}
|
||||
|
||||
return confTarget
|
||||
}
|
||||
|
Reference in New Issue
Block a user