From 4b15310c0827058f28d828e6429e76e233e418ad Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 5 May 2017 15:53:09 -0700 Subject: [PATCH] chainntfns/btcdnotify: eliminate block epoch race condition, use diff cancel channel This commit fixes a race condition that was uncovered by the race condition detector surrounding cancelling active block epoch notifications. Previously we would close the main notification channel for each client, at tine this would cause a read/write race condition if an active grouting was attempting to dispatch a notification. We now fix this use by using a distinct channel for signaling cancellation to the active grouting, and another to signal cancellation to any notification observers. --- chainntnfs/btcdnotify/btcd.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index bae9c1c97..ca1bd214b 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -68,7 +68,7 @@ type BtcdNotifier struct { confNotifications map[chainhash.Hash][]*confirmationsNotification confHeap *confirmationHeap - blockEpochClients map[uint64]chan *chainntnfs.BlockEpoch + blockEpochClients map[uint64]*blockEpochRegistration disconnectedBlockHashes chan *blockNtfn @@ -95,7 +95,7 @@ func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), - blockEpochClients: make(map[uint64]chan *chainntnfs.BlockEpoch), + blockEpochClients: make(map[uint64]*blockEpochRegistration), spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), @@ -185,7 +185,8 @@ func (b *BtcdNotifier) Stop() error { } } for _, epochClient := range b.blockEpochClients { - close(epochClient) + close(epochClient.cancelChan) + close(epochClient.epochChan) } return nil @@ -260,7 +261,8 @@ out: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) - close(b.blockEpochClients[msg.epochID]) + close(b.blockEpochClients[msg.epochID].cancelChan) + close(b.blockEpochClients[msg.epochID].epochChan) delete(b.blockEpochClients, msg.epochID) close(msg.done) @@ -292,7 +294,7 @@ out: b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") - b.blockEpochClients[msg.epochID] = msg.epochChan + b.blockEpochClients[msg.epochID] = msg } case staleBlockHash := <-b.disconnectedBlockHashes: @@ -481,17 +483,21 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash Hash: newSha, } - for _, epochChan := range b.blockEpochClients { + for _, epochClient := range b.blockEpochClients { b.wg.Add(1) - go func(ntfnChan chan *chainntnfs.BlockEpoch) { + go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}) { defer b.wg.Done() select { case ntfnChan <- epoch: + + case <-cancelChan: + return + case <-b.quit: return } - }(epochChan) + }(epochClient.epochChan, epochClient.cancelChan) } } @@ -540,9 +546,9 @@ func (b *BtcdNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, // requested, or place the notification on the // confirmation heap for future usage. if confClients, ok := b.confNotifications[*txSha]; ok { - // Either all of the registered confirmations wtill be + // Either all of the registered confirmations will be // dispatched due to a single confirmation, or added to the - // conf head. Therefor we unconditioanlly delete the registered + // conf head. Therefor we unconditionally delete the registered // confirmations from the staging zone. defer func() { delete(b.confNotifications, *txSha) @@ -706,6 +712,8 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + cancelChan chan struct{} + epochID uint64 } @@ -722,8 +730,9 @@ type epochCancel struct { // chain. func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { registration := &blockEpochRegistration{ - epochChan: make(chan *chainntnfs.BlockEpoch, 20), - epochID: atomic.AddUint64(&b.epochClientCounter, 1), + epochChan: make(chan *chainntnfs.BlockEpoch, 20), + cancelChan: make(chan struct{}), + epochID: atomic.AddUint64(&b.epochClientCounter, 1), } select {