sweep: remove block subscription in UtxoSweeper and TxPublisher

This commit removes the independent block subscriptions in `UtxoSweeper`
and `TxPublisher`. These subsystems now listen to the `BlockbeatChan`
for new blocks.
This commit is contained in:
yyforyongyu
2024-06-04 20:31:03 +08:00
parent 801fd6b85b
commit e113f39d26
2 changed files with 18 additions and 55 deletions

View File

@@ -802,13 +802,8 @@ func (t *TxPublisher) Start() error {
return fmt.Errorf("TxPublisher started more than once")
}
blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}
t.wg.Add(1)
go t.monitor(blockEvent)
go t.monitor()
log.Debugf("TxPublisher started")
@@ -836,33 +831,25 @@ func (t *TxPublisher) Stop() error {
// to be bumped. If so, it will attempt to bump the fee of the tx.
//
// NOTE: Must be run as a goroutine.
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
defer blockEvent.Cancel()
func (t *TxPublisher) monitor() {
defer t.wg.Done()
for {
select {
case epoch, ok := <-blockEvent.Epochs:
if !ok {
// We should stop the publisher before stopping
// the chain service. Otherwise it indicates an
// error.
log.Error("Block epoch channel closed, exit " +
"monitor")
return
}
log.Debugf("TxPublisher received new block: %v",
epoch.Height)
case beat := <-t.BlockbeatChan:
height := beat.Height()
log.Debugf("TxPublisher received new block: %v", height)
// Update the best known height for the publisher.
t.currentHeight.Store(epoch.Height)
t.currentHeight.Store(height)
// Check all monitored txns to see if any of them needs
// to be bumped.
t.processRecords()
// Notify we've processed the block.
t.NotifyBlockProcessed(beat, nil)
case <-t.quit:
log.Debug("Fee bumper stopped, exit monitor")
return

View File

@@ -452,21 +452,12 @@ func (s *UtxoSweeper) Start() error {
// not change from here on.
s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
// We need to register for block epochs and retry sweeping every block.
// We should get a notification with the current best block immediately
// if we don't provide any epoch. We'll wait for that in the collector.
blockEpochs, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}
// Start sweeper main loop.
s.wg.Add(1)
go func() {
defer blockEpochs.Cancel()
defer s.wg.Done()
s.collector(blockEpochs.Epochs)
s.collector()
// The collector exited and won't longer handle incoming
// requests. This can happen on shutdown, when the block
@@ -657,17 +648,8 @@ func (s *UtxoSweeper) removeConflictSweepDescendants(
// collector is the sweeper main loop. It processes new inputs, spend
// notifications and counts down to publication of the sweep tx.
func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// We registered for the block epochs with a nil request. The notifier
// should send us the current best block immediately. So we need to wait
// for it here because we need to know the current best height.
select {
case bestBlock := <-blockEpochs:
s.currentHeight = bestBlock.Height
case <-s.quit:
return
}
func (s *UtxoSweeper) collector() {
defer s.wg.Done()
for {
// Clean inputs, which will remove inputs that are swept,
@@ -737,25 +719,16 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// A new block comes in, update the bestHeight, perform a check
// over all pending inputs and publish sweeping txns if needed.
case epoch, ok := <-blockEpochs:
if !ok {
// We should stop the sweeper before stopping
// the chain service. Otherwise it indicates an
// error.
log.Error("Block epoch channel closed")
return
}
case beat := <-s.BlockbeatChan:
// Update the sweeper to the best height.
s.currentHeight = epoch.Height
s.currentHeight = beat.Height()
// Update the inputs with the latest height.
inputs := s.updateSweeperInputs()
log.Debugf("Received new block: height=%v, attempt "+
"sweeping %d inputs:\n%s",
epoch.Height, len(inputs),
s.currentHeight, len(inputs),
lnutils.NewLogClosure(func() string {
inps := make(
[]input.Input, 0, len(inputs),
@@ -770,6 +743,9 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// Attempt to sweep any pending inputs.
s.sweepPendingInputs(inputs)
// Notify we've processed the block.
s.NotifyBlockProcessed(beat, nil)
case <-s.quit:
return
}