chainio: use errgroup to limit num of goroutines

This commit is contained in:
yyforyongyu
2024-11-19 17:34:09 +08:00
parent 1d53e7d081
commit 0bab6b3419

View File

@@ -9,6 +9,7 @@ import (
"github.com/btcsuite/btclog/v2" "github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"golang.org/x/sync/errgroup"
) )
// DefaultProcessBlockTimeout is the timeout value used when waiting for one // DefaultProcessBlockTimeout is the timeout value used when waiting for one
@@ -229,35 +230,31 @@ func DispatchSequential(b Blockbeat, consumers []Consumer) error {
// It requires the consumer to finish processing the block within the specified // It requires the consumer to finish processing the block within the specified
// time, otherwise a timeout error is returned. // time, otherwise a timeout error is returned.
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error { func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
// errChans is a map of channels that will be used to receive errors eg := &errgroup.Group{}
// returned from notifying the consumers.
errChans := make(map[string]chan error, len(consumers))
// Notify each queue in goroutines. // Notify each queue in goroutines.
for _, c := range consumers { for _, c := range consumers {
// Create a signal chan.
errChan := make(chan error, 1)
errChans[c.Name()] = errChan
// Notify each consumer concurrently. // Notify each consumer concurrently.
go func(c Consumer, beat Blockbeat) { eg.Go(func() error {
// Send the beat to the consumer. // Send the beat to the consumer.
errChan <- notifyAndWait( err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
b, c, DefaultProcessBlockTimeout,
) // Exit early if there's no error.
}(c, b) if err == nil {
return nil
}
b.logger().Errorf("Consumer=%v failed to process "+
"block: %v", c.Name(), err)
return err
})
} }
// Wait for all consumers in each queue to finish. // Wait for all consumers in each queue to finish.
for name, errChan := range errChans { if err := eg.Wait(); err != nil {
err := <-errChan
if err != nil {
b.logger().Errorf("Consumer=%v failed to process "+
"block: %v", name, err)
return err return err
} }
}
return nil return nil
} }