contractcourt: remove block subscription in chain arbitrator

This commit removes the block subscriptions used in `ChainArbitrator`
and replaced them with the blockbeat managed by `BlockbeatDispatcher`.
This commit is contained in:
yyforyongyu
2024-10-29 21:48:13 +08:00
parent 5f9d473702
commit 045f8432b7
3 changed files with 38 additions and 90 deletions

View File

@@ -36,7 +36,7 @@ import (
) )
var ( var (
defaultTimeout = 30 * time.Second defaultTimeout = 10 * time.Second
breachOutPoints = []wire.OutPoint{ breachOutPoints = []wire.OutPoint{
{ {

View File

@@ -267,6 +267,9 @@ type ChainArbitrator struct {
// active channels that it must still watch over. // active channels that it must still watch over.
chanSource *channeldb.DB chanSource *channeldb.DB
// beat is the current best known blockbeat.
beat chainio.Blockbeat
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@@ -797,18 +800,11 @@ func (c *ChainArbitrator) Start() error {
} }
} }
// Subscribe to a single stream of block epoch notifications that we
// will dispatch to all active arbitrators.
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}
// Start our goroutine which will dispatch blocks to each arbitrator. // Start our goroutine which will dispatch blocks to each arbitrator.
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
defer c.wg.Done() defer c.wg.Done()
c.dispatchBlocks(blockEpoch) c.dispatchBlocks()
}() }()
// TODO(roasbeef): eventually move all breach watching here // TODO(roasbeef): eventually move all breach watching here
@@ -816,94 +812,22 @@ func (c *ChainArbitrator) Start() error {
return nil return nil
} }
// blockRecipient contains the information we need to dispatch a block to a
// channel arbitrator.
type blockRecipient struct {
// chanPoint is the funding outpoint of the channel.
chanPoint wire.OutPoint
// blocks is the channel that new block heights are sent into. This
// channel should be sufficiently buffered as to not block the sender.
blocks chan<- int32
// quit is closed if the receiving entity is shutting down.
quit chan struct{}
}
// dispatchBlocks consumes a block epoch notification stream and dispatches // dispatchBlocks consumes a block epoch notification stream and dispatches
// blocks to each of the chain arb's active channel arbitrators. This function // blocks to each of the chain arb's active channel arbitrators. This function
// must be run in a goroutine. // must be run in a goroutine.
func (c *ChainArbitrator) dispatchBlocks( func (c *ChainArbitrator) dispatchBlocks() {
blockEpoch *chainntnfs.BlockEpochEvent) {
// getRecipients is a helper function which acquires the chain arb
// lock and returns a set of block recipients which can be used to
// dispatch blocks.
getRecipients := func() []blockRecipient {
c.Lock()
blocks := make([]blockRecipient, 0, len(c.activeChannels))
for _, channel := range c.activeChannels {
blocks = append(blocks, blockRecipient{
chanPoint: channel.cfg.ChanPoint,
blocks: channel.blocks,
quit: channel.quit,
})
}
c.Unlock()
return blocks
}
// On exit, cancel our blocks subscription and close each block channel
// so that the arbitrators know they will no longer be receiving blocks.
defer func() {
blockEpoch.Cancel()
recipients := getRecipients()
for _, recipient := range recipients {
close(recipient.blocks)
}
}()
// Consume block epochs until we receive the instruction to shutdown. // Consume block epochs until we receive the instruction to shutdown.
for { for {
select { select {
// Consume block epochs, exiting if our subscription is // Consume block epochs, exiting if our subscription is
// terminated. // terminated.
case block, ok := <-blockEpoch.Epochs: case beat := <-c.BlockbeatChan:
if !ok { // Set the current blockbeat.
log.Trace("dispatchBlocks block epoch " + c.beat = beat
"cancelled")
return
}
// Get the set of currently active channels block // Send this blockbeat to all the active channels and
// subscription channels and dispatch the block to // wait for them to finish processing it.
// each. c.handleBlockbeat(beat)
for _, recipient := range getRecipients() {
select {
// Deliver the block to the arbitrator.
case recipient.blocks <- block.Height:
// If the recipient is shutting down, exit
// without delivering the block. This may be
// the case when two blocks are mined in quick
// succession, and the arbitrator resolves
// after the first block, and does not need to
// consume the second block.
case <-recipient.quit:
log.Debugf("channel: %v exit without "+
"receiving block: %v",
recipient.chanPoint,
block.Height)
// If the chain arb is shutting down, we don't
// need to deliver any more blocks (everything
// will be shutting down).
case <-c.quit:
return
}
}
// Exit if the chain arbitrator is shutting down. // Exit if the chain arbitrator is shutting down.
case <-c.quit: case <-c.quit:
@@ -912,6 +836,32 @@ func (c *ChainArbitrator) dispatchBlocks(
} }
} }
// handleBlockbeat sends the blockbeat to all active channel arbitrator in
// parallel and wait for them to finish processing it.
func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
// Read the active channels in a lock.
c.Lock()
// Create a slice to record active channel arbitrator.
channels := make([]chainio.Consumer, 0, len(c.activeChannels))
// Copy the active channels to the slice.
for _, channel := range c.activeChannels {
channels = append(channels, channel)
}
c.Unlock()
// Iterate all the copied channels and send the blockbeat to them.
//
// NOTE: This method will timeout if the processing of blocks of the
// subsystems is too long (60s).
err := chainio.DispatchConcurrent(beat, channels)
// Notify the chain arbitrator has processed the block.
c.NotifyBlockProcessed(beat, err)
}
// republishClosingTxs will load any stored cooperative or unilateral closing // republishClosingTxs will load any stored cooperative or unilateral closing
// transactions and republish them. This helps ensure propagation of the // transactions and republish them. This helps ensure propagation of the
// transactions in the event that prior publications failed. // transactions in the event that prior publications failed.

View File

@@ -77,7 +77,6 @@ func TestChainArbitratorRepublishCloses(t *testing.T) {
ChainIO: &mock.ChainIO{}, ChainIO: &mock.ChainIO{},
Notifier: &mock.ChainNotifier{ Notifier: &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail), SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation), ConfChan: make(chan *chainntnfs.TxConfirmation),
}, },
PublishTx: func(tx *wire.MsgTx, _ string) error { PublishTx: func(tx *wire.MsgTx, _ string) error {
@@ -158,7 +157,6 @@ func TestResolveContract(t *testing.T) {
ChainIO: &mock.ChainIO{}, ChainIO: &mock.ChainIO{},
Notifier: &mock.ChainNotifier{ Notifier: &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail), SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation), ConfChan: make(chan *chainntnfs.TxConfirmation),
}, },
PublishTx: func(tx *wire.MsgTx, _ string) error { PublishTx: func(tx *wire.MsgTx, _ string) error {