Merge pull request #3495 from halseth/router-not-starte-deadlock

chainntnfs: start concurrent queues prior to connection
This commit is contained in:
Conner Fromknecht
2019-09-16 12:19:54 -07:00
committed by GitHub
2 changed files with 38 additions and 11 deletions

View File

@ -139,17 +139,24 @@ func (b *BtcdNotifier) Start() error {
return nil return nil
} }
// Start our concurrent queues before starting the chain connection, to
// ensure onBlockConnected and onRedeemingTx callbacks won't be
// blocked.
b.chainUpdates.Start()
b.txUpdates.Start()
// Connect to btcd, and register for notifications on connected, and // Connect to btcd, and register for notifications on connected, and
// disconnected blocks. // disconnected blocks.
if err := b.chainConn.Connect(20); err != nil { if err := b.chainConn.Connect(20); err != nil {
return err b.txUpdates.Stop()
} b.chainUpdates.Stop()
if err := b.chainConn.NotifyBlocks(); err != nil {
return err return err
} }
currentHash, currentHeight, err := b.chainConn.GetBestBlock() currentHash, currentHeight, err := b.chainConn.GetBestBlock()
if err != nil { if err != nil {
b.txUpdates.Stop()
b.chainUpdates.Stop()
return err return err
} }
@ -163,8 +170,11 @@ func (b *BtcdNotifier) Start() error {
Hash: currentHash, Hash: currentHash,
} }
b.chainUpdates.Start() if err := b.chainConn.NotifyBlocks(); err != nil {
b.txUpdates.Start() b.txUpdates.Stop()
b.chainUpdates.Stop()
return err
}
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher() go b.notificationDispatcher()
@ -208,10 +218,14 @@ func (b *BtcdNotifier) Stop() error {
func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) { func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) {
// Append this new chain update to the end of the queue of new chain // Append this new chain update to the end of the queue of new chain
// updates. // updates.
b.chainUpdates.ChanIn() <- &chainUpdate{ select {
case b.chainUpdates.ChanIn() <- &chainUpdate{
blockHash: hash, blockHash: hash,
blockHeight: height, blockHeight: height,
connect: true, connect: true,
}:
case <-b.quit:
return
} }
} }
@ -236,10 +250,14 @@ type filteredBlock struct {
func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) { func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) {
// Append this new chain update to the end of the queue of new chain // Append this new chain update to the end of the queue of new chain
// updates. // updates.
b.chainUpdates.ChanIn() <- &chainUpdate{ select {
case b.chainUpdates.ChanIn() <- &chainUpdate{
blockHash: hash, blockHash: hash,
blockHeight: height, blockHeight: height,
connect: false, connect: false,
}:
case <-b.quit:
return
} }
} }
@ -247,7 +265,11 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t
func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
// Append this new transaction update to the end of the queue of new // Append this new transaction update to the end of the queue of new
// chain updates. // chain updates.
b.txUpdates.ChanIn() <- &txUpdate{tx, details} select {
case b.txUpdates.ChanIn() <- &txUpdate{tx, details}:
case <-b.quit:
return
}
} }
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client

View File

@ -116,12 +116,20 @@ func (n *NeutrinoNotifier) Start() error {
return nil return nil
} }
// Start our concurrent queues before starting the rescan, to ensure
// onFilteredBlockConnected and onRelavantTx callbacks won't be
// blocked.
n.chainUpdates.Start()
n.txUpdates.Start()
// First, we'll obtain the latest block height of the p2p node. We'll // First, we'll obtain the latest block height of the p2p node. We'll
// start the auto-rescan from this point. Once a caller actually wishes // start the auto-rescan from this point. Once a caller actually wishes
// to register a chain view, the rescan state will be rewound // to register a chain view, the rescan state will be rewound
// accordingly. // accordingly.
startingPoint, err := n.p2pNode.BestBlock() startingPoint, err := n.p2pNode.BestBlock()
if err != nil { if err != nil {
n.txUpdates.Stop()
n.chainUpdates.Stop()
return err return err
} }
n.bestBlock.Hash = &startingPoint.Hash n.bestBlock.Hash = &startingPoint.Hash
@ -160,9 +168,6 @@ func (n *NeutrinoNotifier) Start() error {
) )
n.rescanErr = n.chainView.Start() n.rescanErr = n.chainView.Start()
n.chainUpdates.Start()
n.txUpdates.Start()
n.wg.Add(1) n.wg.Add(1)
go n.notificationDispatcher() go n.notificationDispatcher()