sweep: add monitor loop to TxPublisher

This commit finishes the implementation of `TxPublisher` by adding the
monitor process. Whenever a new block arrives, the publisher will check
all its monitored records and attempt fee bumping them if necessary.
This commit is contained in:
yyforyongyu
2024-02-29 19:36:37 +08:00
parent 11f7e455d1
commit 90e727a776
5 changed files with 725 additions and 64 deletions

View File

@@ -7,6 +7,7 @@ import (
"sync/atomic"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/chain"
@@ -512,17 +513,6 @@ func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
txid := record.tx.TxHash()
// Subscribe to its confirmation notification.
confEvent, err := t.cfg.Notifier.RegisterConfirmationsNtfn(
&txid, nil, 1, uint32(t.currentHeight),
)
if err != nil {
return nil, fmt.Errorf("register confirmation ntfn: %w", err)
}
// Attach the confirmation event channel to the record.
record.confEvent = confEvent
tx := record.tx
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
txid, len(tx.TxIn), t.currentHeight)
@@ -534,7 +524,7 @@ func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
// Publish the sweeping tx with customized label. If the publish fails,
// this error will be saved in the `BumpResult` and it will be removed
// from being monitored.
err = t.cfg.Wallet.PublishTransaction(
err := t.cfg.Wallet.PublishTransaction(
tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
)
if err != nil {
@@ -638,9 +628,6 @@ type monitorRecord struct {
// req is the original request.
req *BumpRequest
// confEvent is the subscription to the confirmation event of the tx.
confEvent *chainntnfs.ConfirmationEvent
// feeFunction is the fee bumping algorithm used by the publisher.
feeFunction FeeFunction
@@ -648,6 +635,283 @@ type monitorRecord struct {
fee btcutil.Amount
}
// Start starts the publisher by subscribing to block epoch updates and kicking
// off the monitor loop.
func (t *TxPublisher) Start() error {
log.Info("TxPublisher starting...")
defer log.Debugf("TxPublisher started")
blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}
t.wg.Add(1)
go t.monitor(blockEvent)
return nil
}
// Stop stops the publisher and waits for the monitor loop to exit.
func (t *TxPublisher) Stop() {
log.Info("TxPublisher stopping...")
defer log.Debugf("TxPublisher stopped")
close(t.quit)
t.wg.Wait()
}
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
// it will examine all the txns being monitored, and check if any of them needs
// to be bumped. If so, it will attempt to bump the fee of the tx.
//
// NOTE: Must be run as a goroutine.
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
defer blockEvent.Cancel()
defer t.wg.Done()
for {
select {
case epoch, ok := <-blockEvent.Epochs:
if !ok {
// We should stop the publisher before stopping
// the chain service. Otherwise it indicates an
// error.
log.Error("Block epoch channel closed, exit " +
"monitor")
return
}
log.Debugf("TxPublisher received new block: %v",
epoch.Height)
// Update the best known height for the publisher.
t.currentHeight = epoch.Height
// Check all monitored txns to see if any of them needs
// to be bumped.
t.processRecords()
case <-t.quit:
log.Debug("Fee bumper stopped, exit monitor")
return
}
}
}
// processRecords checks all the txns being monitored, and checks if any of
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
func (t *TxPublisher) processRecords() {
// confirmedRecords stores a map of the records which have been
// confirmed.
confirmedRecords := make(map[uint64]*monitorRecord)
// feeBumpRecords stores a map of the records which need to be bumped.
feeBumpRecords := make(map[uint64]*monitorRecord)
// visitor is a helper closure that visits each record and divides them
// into two groups.
visitor := func(requestID uint64, r *monitorRecord) error {
log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
r.tx.TxHash())
// If the tx is already confirmed, we can stop monitoring it.
if t.isConfirmed(r.tx.TxHash()) {
confirmedRecords[requestID] = r
// Move to the next record.
return nil
}
feeBumpRecords[requestID] = r
// Return nil to move to the next record.
return nil
}
// Iterate through all the records and divide them into two groups.
t.records.ForEach(visitor)
// For records that are confirmed, we'll notify the caller about this
// result.
for requestID, r := range confirmedRecords {
rec := r
log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
t.wg.Add(1)
go t.handleTxConfirmed(rec, requestID)
}
// Get the current height to be used in the following goroutines.
currentHeight := t.currentHeight
// For records that are not confirmed, we perform a fee bump if needed.
for requestID, r := range feeBumpRecords {
rec := r
log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
t.wg.Add(1)
go t.handleFeeBumpTx(requestID, rec, currentHeight)
}
}
// handleTxConfirmed is called when a monitored tx is confirmed. It will
// notify the subscriber then remove the record from the maps .
//
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
defer t.wg.Done()
// Create a result that will be sent to the resultChan which is
// listened by the caller.
result := &BumpResult{
Event: TxConfirmed,
Tx: r.tx,
requestID: requestID,
Fee: r.fee,
FeeRate: r.feeFunction.FeeRate(),
}
// Notify that this tx is confirmed and remove the record from the map.
t.handleResult(result)
}
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
// attempt to bump the fee of the tx.
//
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
currentHeight int32) {
defer t.wg.Done()
oldTxid := r.tx.TxHash()
// Get the current conf target for this record.
confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
// Ask the fee function whether a bump is needed. We expect the fee
// function to increase its returned fee rate after calling this
// method.
increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
if err != nil {
// TODO(yy): send this error back to the sweeper so it can
// re-group the inputs?
log.Errorf("Failed to increase fee rate for tx %v at "+
"height=%v: %v", oldTxid, t.currentHeight, err)
return
}
// If the fee rate was not increased, there's no need to bump the fee.
if !increased {
log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
t.currentHeight)
return
}
// The fee function now has a new fee rate, we will use it to bump the
// fee of the tx.
result, err := t.createAndPublishTx(requestID, r)
if err != nil {
log.Errorf("Failed to bump tx %v: %v", oldTxid, err)
return
}
// Notify the new result.
t.handleResult(result)
}
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
// to the network. It will update the record with the new tx and fee rate if
// successfully created, and return the result when published successfully.
func (t *TxPublisher) createAndPublishTx(requestID uint64,
r *monitorRecord) (*BumpResult, error) {
// Fetch the old tx.
oldTx := r.tx
// Create a new tx with the new fee rate.
//
// NOTE: The fee function is expected to have increased its returned
// fee rate after calling the SkipFeeBump method. So we can use it
// directly here.
tx, fee, err := t.createAndCheckTx(r.req, r.feeFunction)
// If the tx doesn't not have enought budget, we will return a result
// so the sweeper can handle it by re-clustering the utxos.
if errors.Is(err, ErrNotEnoughBudget) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
return &BumpResult{
Event: TxFailed,
Tx: oldTx,
Err: err,
requestID: requestID,
}, nil
}
// If the error is not budget related, we will return an error and let
// the fee bumper retry it at next block.
//
// NOTE: we can check the RBF error here and ask the fee function to
// recalculate the fee rate. However, this would defeat the purpose of
// using a deadline based fee function:
// - if the deadline is far away, there's no rush to RBF the tx.
// - if the deadline is close, we expect the fee function to give us a
// higher fee rate. If the fee rate cannot satisfy the RBF rules, it
// means the budget is not enough.
if err != nil {
log.Infof("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return nil, err
}
// Register a new record by overwriting the same requestID.
t.records.Store(requestID, &monitorRecord{
tx: tx,
req: r.req,
feeFunction: r.feeFunction,
fee: fee,
})
// Attempt to broadcast this new tx.
result, err := t.broadcast(requestID)
if err != nil {
return nil, err
}
// A successful replacement tx is created, attach the old tx.
result.ReplacedTx = oldTx
// If the new tx failed to be published, we will return the result so
// the caller can handle it.
if result.Event == TxFailed {
return result, nil
}
log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(), tx.TxHash())
// Otherwise, it's a successful RBF, set the event and return.
result.Event = TxReplaced
return result, nil
}
// isConfirmed checks the btcwallet to see whether the tx is confirmed.
func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
if err != nil {
log.Warnf("Failed to get tx details for %v: %v", txid, err)
return false
}
return details.NumConfirmations > 0
}
// calcCurrentConfTarget calculates the current confirmation target based on
// the deadline height. The conf target is capped at 0 if the deadline has
// already been past.