chainio: add BlockbeatDispatcher to dispatch blockbeats

This commit adds a blockbeat dispatcher which handles sending new blocks
to all subscribed consumers.
This commit is contained in:
yyforyongyu
2024-06-27 08:43:26 +08:00
parent a1eb87e280
commit 4b83d87baa
2 changed files with 416 additions and 0 deletions

View File

@@ -3,7 +3,12 @@ package chainio
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/chainntnfs"
)
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
@@ -14,6 +19,195 @@ var DefaultProcessBlockTimeout = 60 * time.Second
// to process the block.
var ErrProcessBlockTimeout = errors.New("process block timeout")
// BlockbeatDispatcher is a service that handles dispatching new blocks to
// `lnd`'s subsystems. During startup, subsystems that are block-driven should
// implement the `Consumer` interface and register themselves via
// `RegisterQueue`. When two subsystems are independent of each other, they
// should be registered in different queues so blocks are notified concurrently.
// Otherwise, when living in the same queue, the subsystems are notified of the
// new blocks sequentially, which means it's critical to understand the
// relationship of these systems to properly handle the order.
type BlockbeatDispatcher struct {
wg sync.WaitGroup
// notifier is used to receive new block epochs.
notifier chainntnfs.ChainNotifier
// beat is the latest blockbeat received.
beat Blockbeat
// consumerQueues is a map of consumers that will receive blocks. Its
// key is a unique counter and its value is a queue of consumers. Each
// queue is notified concurrently, and consumers in the same queue is
// notified sequentially.
consumerQueues map[uint32][]Consumer
// counter is used to assign a unique id to each queue.
counter atomic.Uint32
// quit is used to signal the BlockbeatDispatcher to stop.
quit chan struct{}
}
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
return &BlockbeatDispatcher{
notifier: n,
quit: make(chan struct{}),
consumerQueues: make(map[uint32][]Consumer),
}
}
// RegisterQueue takes a list of consumers and registers them in the same
// queue.
//
// NOTE: these consumers are notified sequentially.
func (b *BlockbeatDispatcher) RegisterQueue(consumers []Consumer) {
qid := b.counter.Add(1)
b.consumerQueues[qid] = append(b.consumerQueues[qid], consumers...)
clog.Infof("Registered queue=%d with %d blockbeat consumers", qid,
len(consumers))
for _, c := range consumers {
clog.Debugf("Consumer [%s] registered in queue %d", c.Name(),
qid)
}
}
// Start starts the blockbeat dispatcher - it registers a block notification
// and monitors and dispatches new blocks in a goroutine. It will refuse to
// start if there are no registered consumers.
func (b *BlockbeatDispatcher) Start() error {
// Make sure consumers are registered.
if len(b.consumerQueues) == 0 {
return fmt.Errorf("no consumers registered")
}
// Start listening to new block epochs. We should get a notification
// with the current best block immediately.
blockEpochs, err := b.notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}
clog.Infof("BlockbeatDispatcher is starting with %d consumer queues",
len(b.consumerQueues))
defer clog.Debug("BlockbeatDispatcher started")
b.wg.Add(1)
go b.dispatchBlocks(blockEpochs)
return nil
}
// Stop shuts down the blockbeat dispatcher.
func (b *BlockbeatDispatcher) Stop() {
clog.Info("BlockbeatDispatcher is stopping")
defer clog.Debug("BlockbeatDispatcher stopped")
// Signal the dispatchBlocks goroutine to stop.
close(b.quit)
b.wg.Wait()
}
func (b *BlockbeatDispatcher) log() btclog.Logger {
return b.beat.logger()
}
// dispatchBlocks listens to new block epoch and dispatches it to all the
// consumers. Each queue is notified concurrently, and the consumers in the
// same queue are notified sequentially.
//
// NOTE: Must be run as a goroutine.
func (b *BlockbeatDispatcher) dispatchBlocks(
blockEpochs *chainntnfs.BlockEpochEvent) {
defer b.wg.Done()
defer blockEpochs.Cancel()
for {
select {
case blockEpoch, ok := <-blockEpochs.Epochs:
if !ok {
clog.Debugf("Block epoch channel closed")
return
}
clog.Infof("Received new block %v at height %d, "+
"notifying consumers...", blockEpoch.Hash,
blockEpoch.Height)
// Record the time it takes the consumer to process
// this block.
start := time.Now()
// Update the current block epoch.
b.beat = NewBeat(*blockEpoch)
// Notify all consumers.
err := b.notifyQueues()
if err != nil {
b.log().Errorf("Notify block failed: %v", err)
}
b.log().Infof("Notified all consumers on new block "+
"in %v", time.Since(start))
case <-b.quit:
b.log().Debugf("BlockbeatDispatcher quit signal " +
"received")
return
}
}
}
// notifyQueues notifies each queue concurrently about the latest block epoch.
func (b *BlockbeatDispatcher) notifyQueues() error {
// errChans is a map of channels that will be used to receive errors
// returned from notifying the consumers.
errChans := make(map[uint32]chan error, len(b.consumerQueues))
// Notify each queue in goroutines.
for qid, consumers := range b.consumerQueues {
b.log().Debugf("Notifying queue=%d with %d consumers", qid,
len(consumers))
// Create a signal chan.
errChan := make(chan error, 1)
errChans[qid] = errChan
// Notify each queue concurrently.
go func(qid uint32, c []Consumer, beat Blockbeat) {
// Notify each consumer in this queue sequentially.
errChan <- DispatchSequential(beat, c)
}(qid, consumers, b.beat)
}
// Wait for all consumers in each queue to finish.
for qid, errChan := range errChans {
select {
case err := <-errChan:
if err != nil {
return fmt.Errorf("queue=%d got err: %w", qid,
err)
}
b.log().Debugf("Notified queue=%d", qid)
case <-b.quit:
b.log().Debugf("BlockbeatDispatcher quit signal " +
"received, exit notifyQueues")
return nil
}
}
return nil
}
// DispatchSequential takes a list of consumers and notify them about the new
// epoch sequentially. It requires the consumer to finish processing the block
// within the specified time, otherwise a timeout error is returned.

View File

@@ -4,6 +4,8 @@ import (
"testing"
"time"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
@@ -159,3 +161,223 @@ func TestDispatchSequential(t *testing.T) {
// Check the previous consumer is the last consumer.
require.Equal(t, consumer3.Name(), prevConsumer)
}
// TestRegisterQueue tests the RegisterQueue function.
func TestRegisterQueue(t *testing.T) {
t.Parallel()
// Create two mock consumers.
consumer1 := &MockConsumer{}
defer consumer1.AssertExpectations(t)
consumer1.On("Name").Return("mocker1")
consumer2 := &MockConsumer{}
defer consumer2.AssertExpectations(t)
consumer2.On("Name").Return("mocker2")
consumers := []Consumer{consumer1, consumer2}
// Create a mock chain notifier.
mockNotifier := &chainntnfs.MockChainNotifier{}
defer mockNotifier.AssertExpectations(t)
// Create a new dispatcher.
b := NewBlockbeatDispatcher(mockNotifier)
// Register the consumers.
b.RegisterQueue(consumers)
// Assert that the consumers have been registered.
//
// We should have one queue.
require.Len(t, b.consumerQueues, 1)
// The queue should have two consumers.
queue, ok := b.consumerQueues[1]
require.True(t, ok)
require.Len(t, queue, 2)
}
// TestStartDispatcher tests the Start method.
func TestStartDispatcher(t *testing.T) {
t.Parallel()
// Create a mock chain notifier.
mockNotifier := &chainntnfs.MockChainNotifier{}
defer mockNotifier.AssertExpectations(t)
// Create a new dispatcher.
b := NewBlockbeatDispatcher(mockNotifier)
// Start the dispatcher without consumers should return an error.
err := b.Start()
require.Error(t, err)
// Create a consumer and register it.
consumer := &MockConsumer{}
defer consumer.AssertExpectations(t)
consumer.On("Name").Return("mocker1")
b.RegisterQueue([]Consumer{consumer})
// Mock the chain notifier to return an error.
mockNotifier.On("RegisterBlockEpochNtfn",
mock.Anything).Return(nil, errDummy).Once()
// Start the dispatcher now should return the error.
err = b.Start()
require.ErrorIs(t, err, errDummy)
// Mock the chain notifier to return a valid notifier.
blockEpochs := &chainntnfs.BlockEpochEvent{}
mockNotifier.On("RegisterBlockEpochNtfn",
mock.Anything).Return(blockEpochs, nil).Once()
// Start the dispatcher now should not return an error.
err = b.Start()
require.NoError(t, err)
}
// TestDispatchBlocks asserts the blocks are properly dispatched to the queues.
func TestDispatchBlocks(t *testing.T) {
t.Parallel()
// Create a mock chain notifier.
mockNotifier := &chainntnfs.MockChainNotifier{}
defer mockNotifier.AssertExpectations(t)
// Create a new dispatcher.
b := NewBlockbeatDispatcher(mockNotifier)
// Create the beat and attach it to the dispatcher.
epoch := chainntnfs.BlockEpoch{Height: 1}
beat := NewBeat(epoch)
b.beat = beat
// Create a consumer and register it.
consumer := &MockConsumer{}
defer consumer.AssertExpectations(t)
consumer.On("Name").Return("mocker1")
b.RegisterQueue([]Consumer{consumer})
// Mock the consumer to return nil error on ProcessBlock. This
// implictly asserts that the step `notifyQueues` is successfully
// reached in the `dispatchBlocks` method.
consumer.On("ProcessBlock", mock.Anything).Return(nil).Once()
// Create a test epoch chan.
epochChan := make(chan *chainntnfs.BlockEpoch, 1)
blockEpochs := &chainntnfs.BlockEpochEvent{
Epochs: epochChan,
Cancel: func() {},
}
// Call the method in a goroutine.
done := make(chan struct{})
b.wg.Add(1)
go func() {
defer close(done)
b.dispatchBlocks(blockEpochs)
}()
// Send an epoch.
epoch = chainntnfs.BlockEpoch{Height: 2}
epochChan <- &epoch
// Wait for the dispatcher to process the epoch.
time.Sleep(100 * time.Millisecond)
// Stop the dispatcher.
b.Stop()
// We expect the dispatcher to stop immediately.
_, err := fn.RecvOrTimeout(done, time.Second)
require.NoError(t, err)
}
// TestNotifyQueuesSuccess checks when the dispatcher successfully notifies all
// the queues, no error is returned.
func TestNotifyQueuesSuccess(t *testing.T) {
t.Parallel()
// Create two mock consumers.
consumer1 := &MockConsumer{}
defer consumer1.AssertExpectations(t)
consumer1.On("Name").Return("mocker1")
consumer2 := &MockConsumer{}
defer consumer2.AssertExpectations(t)
consumer2.On("Name").Return("mocker2")
// Create two queues.
queue1 := []Consumer{consumer1}
queue2 := []Consumer{consumer2}
// Create a mock chain notifier.
mockNotifier := &chainntnfs.MockChainNotifier{}
defer mockNotifier.AssertExpectations(t)
// Create a mock beat.
mockBeat := &MockBlockbeat{}
defer mockBeat.AssertExpectations(t)
mockBeat.On("logger").Return(clog)
// Create a new dispatcher.
b := NewBlockbeatDispatcher(mockNotifier)
// Register the queues.
b.RegisterQueue(queue1)
b.RegisterQueue(queue2)
// Attach the blockbeat.
b.beat = mockBeat
// Mock the consumers to return nil error on ProcessBlock for
// both calls.
consumer1.On("ProcessBlock", mockBeat).Return(nil).Once()
consumer2.On("ProcessBlock", mockBeat).Return(nil).Once()
// Notify the queues. The mockers will be asserted in the end to
// validate the calls.
err := b.notifyQueues()
require.NoError(t, err)
}
// TestNotifyQueuesError checks when one of the queue returns an error, this
// error is returned by the method.
func TestNotifyQueuesError(t *testing.T) {
t.Parallel()
// Create a mock consumer.
consumer := &MockConsumer{}
defer consumer.AssertExpectations(t)
consumer.On("Name").Return("mocker1")
// Create one queue.
queue := []Consumer{consumer}
// Create a mock chain notifier.
mockNotifier := &chainntnfs.MockChainNotifier{}
defer mockNotifier.AssertExpectations(t)
// Create a mock beat.
mockBeat := &MockBlockbeat{}
defer mockBeat.AssertExpectations(t)
mockBeat.On("logger").Return(clog)
// Create a new dispatcher.
b := NewBlockbeatDispatcher(mockNotifier)
// Register the queues.
b.RegisterQueue(queue)
// Attach the blockbeat.
b.beat = mockBeat
// Mock the consumer to return an error on ProcessBlock.
consumer.On("ProcessBlock", mockBeat).Return(errDummy).Once()
// Notify the queues. The mockers will be asserted in the end to
// validate the calls.
err := b.notifyQueues()
require.ErrorIs(t, err, errDummy)
}