multi: implement Consumer on subsystems

This commit implements `Consumer` on `TxPublisher`, `UtxoSweeper`,
`ChainArbitrator` and `ChannelArbitrator`.
This commit is contained in:
yyforyongyu 2024-10-29 21:23:29 +08:00
parent b5a3a27c77
commit 801fd6b85b
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
4 changed files with 78 additions and 4 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
@ -244,6 +245,10 @@ type ChainArbitrator struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
// Embed the blockbeat consumer struct to get access to the method
// `NotifyBlockProcessed` and the `BlockbeatChan`.
chainio.BeatConsumer
sync.Mutex
// activeChannels is a map of all the active contracts that are still
@ -272,15 +277,23 @@ type ChainArbitrator struct {
func NewChainArbitrator(cfg ChainArbitratorConfig,
db *channeldb.DB) *ChainArbitrator {
return &ChainArbitrator{
c := &ChainArbitrator{
cfg: cfg,
activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
activeWatchers: make(map[wire.OutPoint]*chainWatcher),
chanSource: db,
quit: make(chan struct{}),
}
// Mount the block consumer.
c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
return c
}
// Compile-time check for the chainio.Consumer interface.
var _ chainio.Consumer = (*ChainArbitrator)(nil)
// arbChannel is a wrapper around an open channel that channel arbitrators
// interact with.
type arbChannel struct {
@ -1361,3 +1374,8 @@ func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
// TODO(roasbeef): arbitration reports
// * types: contested, waiting for success conf, etc
// NOTE: part of the `chainio.Consumer` interface.
func (c *ChainArbitrator) Name() string {
return "ChainArbitrator"
}

View File

@ -14,6 +14,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/graph/db/models"
@ -330,6 +331,10 @@ type ChannelArbitrator struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
// Embed the blockbeat consumer struct to get access to the method
// `NotifyBlockProcessed` and the `BlockbeatChan`.
chainio.BeatConsumer
// startTimestamp is the time when this ChannelArbitrator was started.
startTimestamp time.Time
@ -404,7 +409,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
unmerged[RemotePendingHtlcSet] = htlcSets[RemotePendingHtlcSet]
}
return &ChannelArbitrator{
c := &ChannelArbitrator{
log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg),
@ -415,8 +420,16 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
cfg: cfg,
quit: make(chan struct{}),
}
// Mount the block consumer.
c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
return c
}
// Compile-time check for the chainio.Consumer interface.
var _ chainio.Consumer = (*ChannelArbitrator)(nil)
// chanArbStartState contains the information from disk that we need to start
// up a channel arbitrator.
type chanArbStartState struct {
@ -3131,6 +3144,13 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
}
}
// Name returns a human-readable string for this subsystem.
//
// NOTE: Part of chainio.Consumer interface.
func (c *ChannelArbitrator) Name() string {
return fmt.Sprintf("ChannelArbitrator(%v)", c.cfg.ChanPoint)
}
// checkLegacyBreach returns StateFullyResolved if the channel was closed with
// a breach transaction before the channel arbitrator launched its own breach
// resolver. StateContractClosed is returned if this is a modern breach close

View File

@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/chain"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/input"
@ -344,6 +345,10 @@ type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool
// Embed the blockbeat consumer struct to get access to the method
// `NotifyBlockProcessed` and the `BlockbeatChan`.
chainio.BeatConsumer
wg sync.WaitGroup
// cfg specifies the configuration of the TxPublisher.
@ -371,14 +376,22 @@ type TxPublisher struct {
// Compile-time constraint to ensure TxPublisher implements Bumper.
var _ Bumper = (*TxPublisher)(nil)
// Compile-time check for the chainio.Consumer interface.
var _ chainio.Consumer = (*TxPublisher)(nil)
// NewTxPublisher creates a new TxPublisher.
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
return &TxPublisher{
tp := &TxPublisher{
cfg: &cfg,
records: lnutils.SyncMap[uint64, *monitorRecord]{},
subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
quit: make(chan struct{}),
}
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
return tp
}
// isNeutrinoBackend checks if the wallet backend is neutrino.
@ -427,6 +440,11 @@ func (t *TxPublisher) storeInitialRecord(req *BumpRequest) (
return requestID, record
}
// NOTE: part of the `chainio.Consumer` interface.
func (t *TxPublisher) Name() string {
return "TxPublisher"
}
// initializeTx initializes a fee function and creates an RBF-compliant tx. If
// succeeded, the initial tx is stored in the records map.
func (t *TxPublisher) initializeTx(requestID uint64, req *BumpRequest) error {

View File

@ -10,6 +10,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/input"
@ -308,6 +309,10 @@ type UtxoSweeper struct {
started uint32 // To be used atomically.
stopped uint32 // To be used atomically.
// Embed the blockbeat consumer struct to get access to the method
// `NotifyBlockProcessed` and the `BlockbeatChan`.
chainio.BeatConsumer
cfg *UtxoSweeperConfig
newInputs chan *sweepInputMessage
@ -342,6 +347,9 @@ type UtxoSweeper struct {
bumpRespChan chan *bumpResp
}
// Compile-time check for the chainio.Consumer interface.
var _ chainio.Consumer = (*UtxoSweeper)(nil)
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
type UtxoSweeperConfig struct {
// GenSweepScript generates a P2WKH script belonging to the wallet where
@ -415,7 +423,7 @@ type sweepInputMessage struct {
// New returns a new Sweeper instance.
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
return &UtxoSweeper{
s := &UtxoSweeper{
cfg: cfg,
newInputs: make(chan *sweepInputMessage),
spendChan: make(chan *chainntnfs.SpendDetail),
@ -425,6 +433,11 @@ func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
inputs: make(InputsMap),
bumpRespChan: make(chan *bumpResp, 100),
}
// Mount the block consumer.
s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
return s
}
// Start starts the process of constructing and publish sweep txes.
@ -508,6 +521,11 @@ func (s *UtxoSweeper) Stop() error {
return nil
}
// NOTE: part of the `chainio.Consumer` interface.
func (s *UtxoSweeper) Name() string {
return "UtxoSweeper"
}
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
// swept after the batch time window ends. A custom fee preference can be
// provided to determine what fee rate should be used for the input. Note that