sweep: expand sweeper store to also save RBF-related info

This commit modifies the sweeper store to save a `TxRecord` in db
instead of an empty byte slice. This record will later be used to bring
RBF-awareness to our sweeper.
This commit is contained in:
yyforyongyu
2023-10-25 18:45:25 +08:00
parent 1ace1fdf4c
commit b37444d0de
4 changed files with 182 additions and 42 deletions

View File

@@ -4,17 +4,19 @@ import (
"bytes"
"encoding/binary"
"errors"
"io"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/tlv"
)
var (
// txHashesBucketKey is the key that points to a bucket containing the
// hashes of all sweep txes that were published successfully.
//
// maps: txHash -> empty slice
// maps: txHash -> TxRecord
txHashesBucketKey = []byte("sweeper-tx-hashes")
// utxnChainPrefix is the bucket prefix for nursery buckets.
@@ -31,8 +33,90 @@ var (
byteOrder = binary.BigEndian
errNoTxHashesBucket = errors.New("tx hashes bucket does not exist")
// ErrTxNotFound is returned when querying using a txid that's not
// found in our db.
ErrTxNotFound = errors.New("tx not found")
)
// TxRecord specifies a record of a tx that's stored in the database.
type TxRecord struct {
// Txid is the sweeping tx's txid, which is used as the key to store
// the following values.
Txid chainhash.Hash
// FeeRate is the fee rate of the sweeping tx, unit is sats/kw.
FeeRate uint64
// Fee is the fee of the sweeping tx, unit is sat.
Fee uint64
// Published indicates whether the tx has been published.
Published bool
}
// toTlvStream converts TxRecord into a tlv representation.
func (t *TxRecord) toTlvStream() (*tlv.Stream, error) {
const (
// A set of tlv type definitions used to serialize TxRecord.
// We define it here instead of the head of the file to avoid
// naming conflicts.
//
// NOTE: A migration should be added whenever the existing type
// changes.
//
// NOTE: Txid is stored as the key, so it's not included here.
feeRateType tlv.Type = 0
feeType tlv.Type = 1
boolType tlv.Type = 2
)
return tlv.NewStream(
tlv.MakeBigSizeRecord(feeRateType, &t.FeeRate),
tlv.MakeBigSizeRecord(feeType, &t.Fee),
tlv.MakePrimitiveRecord(boolType, &t.Published),
)
}
// serializeTxRecord serializes a TxRecord based on tlv format.
func serializeTxRecord(w io.Writer, tx *TxRecord) error {
// Create the tlv stream.
tlvStream, err := tx.toTlvStream()
if err != nil {
return err
}
// Encode the tlv stream.
var buf bytes.Buffer
if err := tlvStream.Encode(&buf); err != nil {
return err
}
// Write the tlv stream.
if _, err = w.Write(buf.Bytes()); err != nil {
return err
}
return nil
}
// deserializeTxRecord deserializes a TxRecord based on tlv format.
func deserializeTxRecord(r io.Reader) (*TxRecord, error) {
var tx TxRecord
// Create the tlv stream.
tlvStream, err := tx.toTlvStream()
if err != nil {
return nil, err
}
if err := tlvStream.Decode(r); err != nil {
return nil, err
}
return &tx, nil
}
// SweeperStore stores published txes.
type SweeperStore interface {
// IsOurTx determines whether a tx is published by us, based on its
@@ -40,7 +124,7 @@ type SweeperStore interface {
IsOurTx(hash chainhash.Hash) (bool, error)
// StoreTx stores a tx hash we are about to publish.
StoreTx(chainhash.Hash) error
StoreTx(*TxRecord) error
// ListSweeps lists all the sweeps we have successfully published.
ListSweeps() ([]chainhash.Hash, error)
@@ -83,6 +167,8 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (
// migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This
// is not implemented as a database migration, to keep the downgrade path open.
//
// TODO(yy): delete this function once nursery is removed.
func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
chainHash *chainhash.Hash) error {
@@ -138,7 +224,24 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
log.Debugf("Inserting nursery tx %v in hash list "+
"(height=%v)", hash, byteOrder.Uint32(k))
return txHashesBucket.Put(hash[:], []byte{})
// Create the transaction record. Since this is an old record,
// we can assume it's already been published. Although it's
// possible to calculate the fees and fee rate used here, we
// skip it as it's unlikely we'd perform RBF on these old
// sweeping transactions.
tr := &TxRecord{
Txid: hash,
Published: true,
}
// Serialize tx record.
var b bytes.Buffer
err = serializeTxRecord(&b, tr)
if err != nil {
return err
}
return txHashesBucket.Put(tr.Txid[:], b.Bytes())
})
if err != nil {
return err
@@ -148,15 +251,21 @@ func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
}
// StoreTx stores that we are about to publish a tx.
func (s *sweeperStore) StoreTx(txid chainhash.Hash) error {
func (s *sweeperStore) StoreTx(tr *TxRecord) error {
return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
if txHashesBucket == nil {
return errNoTxHashesBucket
}
return txHashesBucket.Put(txid[:], []byte{})
// Serialize tx record.
var b bytes.Buffer
err := serializeTxRecord(&b, tr)
if err != nil {
return err
}
return txHashesBucket.Put(tr.Txid[:], b.Bytes())
}, func() {})
}