chainntnfs/neutrinonotify: fix deadlock when notifying blocks, wait for epoch coroutines to exit

This commit is contained in:
Olaoluwa Osuntokun
2017-06-07 17:05:16 -07:00
parent 625d57aea6
commit baf769eaf0

View File

@ -64,8 +64,13 @@ type NeutrinoNotifier struct {
rescanErr <-chan error rescanErr <-chan error
newBlocks chan filteredBlock newBlocksMtx sync.Mutex
staleBlocks chan filteredBlock newBlocks []*filteredBlock
newBlocksUpdateSignal chan struct{}
staleBlocksMtx sync.Mutex
staleBlocks []*filteredBlock
staleBlocksUpdateSignal chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
@ -95,8 +100,9 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) {
rescanErr: make(chan error), rescanErr: make(chan error),
newBlocks: make(chan filteredBlock), newBlocksUpdateSignal: make(chan struct{}),
staleBlocks: make(chan filteredBlock),
staleBlocksUpdateSignal: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -178,7 +184,6 @@ func (n *NeutrinoNotifier) Stop() error {
} }
} }
for _, epochClient := range n.blockEpochClients { for _, epochClient := range n.blockEpochClients {
close(epochClient.cancelChan)
close(epochClient.epochChan) close(epochClient.epochChan)
} }
@ -200,11 +205,22 @@ type filteredBlock struct {
func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, txns []*btcutil.Tx) { header *wire.BlockHeader, txns []*btcutil.Tx) {
n.newBlocks <- filteredBlock{ // Append this new chain update to the end of the queue of new chain
// updates.
n.newBlocksMtx.Lock()
n.newBlocks = append(n.newBlocks, &filteredBlock{
hash: header.BlockHash(), hash: header.BlockHash(),
height: uint32(height), height: uint32(height),
txns: txns, txns: txns,
} })
n.newBlocksMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new
// transaction update is available. We do this in a new goroutine in
// order to avoid blocking the main loop of the rescan.
go func() {
n.newBlocksUpdateSignal <- struct{}{}
}()
} }
// onFilteredBlockDisconnected is a callback which is executed each time a new // onFilteredBlockDisconnected is a callback which is executed each time a new
@ -212,10 +228,21 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
header *wire.BlockHeader) { header *wire.BlockHeader) {
n.staleBlocks <- filteredBlock{ // Append this new chain update to the end of the queue of new chain
// disconnects.
n.staleBlocksMtx.Lock()
n.staleBlocks = append(n.staleBlocks, &filteredBlock{
hash: header.BlockHash(), hash: header.BlockHash(),
height: uint32(height), height: uint32(height),
} })
n.staleBlocksMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new
// transaction update is available. We do this in a new goroutine in
// order to avoid blocking the main loop of the rescan.
go func() {
n.staleBlocksUpdateSignal <- struct{}{}
}()
} }
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
@ -243,7 +270,17 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
chainntnfs.Log.Infof("Cancelling epoch "+ chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID) "notification, epoch_id=%v", msg.epochID)
// First, close the cancel channel for this
// specific client, and wait for the client to
// exit.
close(n.blockEpochClients[msg.epochID].cancelChan) close(n.blockEpochClients[msg.epochID].cancelChan)
n.blockEpochClients[msg.epochID].wg.Wait()
// Once the client has exited, we can then
// safely close the channel used to send epoch
// notifications, in order to notify any
// listeners that the intent has been
// cancelled.
close(n.blockEpochClients[msg.epochID].epochChan) close(n.blockEpochClients[msg.epochID].epochChan)
delete(n.blockEpochClients, msg.epochID) delete(n.blockEpochClients, msg.epochID)
@ -298,7 +335,15 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
n.blockEpochClients[msg.epochID] = msg n.blockEpochClients[msg.epochID] = msg
} }
case newBlock := <-n.newBlocks: case <-n.newBlocksUpdateSignal:
// A new update is available, so pop the new chain
// update from the front of the update queue.
n.newBlocksMtx.Lock()
newBlock := n.newBlocks[0]
n.newBlocks[0] = nil // Set to nil to prevent GC leak.
n.newBlocks = n.newBlocks[1:]
n.newBlocksMtx.Unlock()
n.heightMtx.Lock() n.heightMtx.Lock()
n.bestHeight = newBlock.height n.bestHeight = newBlock.height
n.heightMtx.Unlock() n.heightMtx.Unlock()
@ -324,7 +369,7 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
mtx := tx.MsgTx() mtx := tx.MsgTx()
txIndex := tx.Index() txIndex := tx.Index()
txSha := mtx.TxHash() txSha := mtx.TxHash()
n.checkConfirmationTrigger(&txSha, &newBlock, txIndex) n.checkConfirmationTrigger(&txSha, newBlock, txIndex)
for i, txIn := range mtx.TxIn { for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint prevOut := txIn.PreviousOutPoint
@ -367,7 +412,14 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
// have been triggered by this new block. // have been triggered by this new block.
n.notifyConfs(int32(newBlock.height)) n.notifyConfs(int32(newBlock.height))
case staleBlock := <-n.staleBlocks: case <-n.staleBlocksUpdateSignal:
// A new update is available, so pop the new chain
// update from the front of the update queue.
n.staleBlocksMtx.Lock()
staleBlock := n.staleBlocks[0]
n.staleBlocks[0] = nil // Set to nil to prevent GC leak.
n.staleBlocks = n.staleBlocks[1:]
n.staleBlocksMtx.Unlock()
chainntnfs.Log.Warnf("Block disconnected from main "+ chainntnfs.Log.Warnf("Block disconnected from main "+
"chain: %v", staleBlock.hash) "chain: %v", staleBlock.hash)
@ -509,7 +561,11 @@ func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.
for _, epochClient := range n.blockEpochClients { for _, epochClient := range n.blockEpochClients {
n.wg.Add(1) n.wg.Add(1)
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}) { epochClient.wg.Add(1)
go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{},
clientWg sync.WaitGroup) {
defer clientWg.Done()
defer n.wg.Done() defer n.wg.Done()
select { select {
@ -521,7 +577,7 @@ func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.
case <-n.quit: case <-n.quit:
return return
} }
}(epochClient.epochChan, epochClient.cancelChan) }(epochClient.epochChan, epochClient.cancelChan, epochClient.wg)
} }
} }
@ -757,11 +813,13 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
// blockEpochRegistration represents a client's intent to receive a // blockEpochRegistration represents a client's intent to receive a
// notification with each newly connected block. // notification with each newly connected block.
type blockEpochRegistration struct { type blockEpochRegistration struct {
epochID uint64
epochChan chan *chainntnfs.BlockEpoch epochChan chan *chainntnfs.BlockEpoch
cancelChan chan struct{} cancelChan chan struct{}
epochID uint64 wg sync.WaitGroup
} }
// epochCancel is a message sent to the NeutrinoNotifier when a client wishes // epochCancel is a message sent to the NeutrinoNotifier when a client wishes