diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index bae9c1c97..ca1bd214b 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -68,7 +68,7 @@ type BtcdNotifier struct { confNotifications map[chainhash.Hash][]*confirmationsNotification confHeap *confirmationHeap - blockEpochClients map[uint64]chan *chainntnfs.BlockEpoch + blockEpochClients map[uint64]*blockEpochRegistration disconnectedBlockHashes chan *blockNtfn @@ -95,7 +95,7 @@ func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notificationCancels: make(chan interface{}), notificationRegistry: make(chan interface{}), - blockEpochClients: make(map[uint64]chan *chainntnfs.BlockEpoch), + blockEpochClients: make(map[uint64]*blockEpochRegistration), spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), @@ -185,7 +185,8 @@ func (b *BtcdNotifier) Stop() error { } } for _, epochClient := range b.blockEpochClients { - close(epochClient) + close(epochClient.cancelChan) + close(epochClient.epochChan) } return nil @@ -260,7 +261,8 @@ out: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) - close(b.blockEpochClients[msg.epochID]) + close(b.blockEpochClients[msg.epochID].cancelChan) + close(b.blockEpochClients[msg.epochID].epochChan) delete(b.blockEpochClients, msg.epochID) close(msg.done) @@ -292,7 +294,7 @@ out: b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") - b.blockEpochClients[msg.epochID] = msg.epochChan + b.blockEpochClients[msg.epochID] = msg } case staleBlockHash := <-b.disconnectedBlockHashes: @@ -481,17 +483,21 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash Hash: newSha, } - for _, epochChan := range b.blockEpochClients { + for _, epochClient := range b.blockEpochClients { b.wg.Add(1) - go func(ntfnChan chan *chainntnfs.BlockEpoch) { + go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}) { defer b.wg.Done() select { case ntfnChan <- epoch: + + case <-cancelChan: + return + case <-b.quit: return } - }(epochChan) + }(epochClient.epochChan, epochClient.cancelChan) } } @@ -540,9 +546,9 @@ func (b *BtcdNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, // requested, or place the notification on the // confirmation heap for future usage. if confClients, ok := b.confNotifications[*txSha]; ok { - // Either all of the registered confirmations wtill be + // Either all of the registered confirmations will be // dispatched due to a single confirmation, or added to the - // conf head. Therefor we unconditioanlly delete the registered + // conf head. Therefor we unconditionally delete the registered // confirmations from the staging zone. defer func() { delete(b.confNotifications, *txSha) @@ -706,6 +712,8 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + cancelChan chan struct{} + epochID uint64 } @@ -722,8 +730,9 @@ type epochCancel struct { // chain. func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { registration := &blockEpochRegistration{ - epochChan: make(chan *chainntnfs.BlockEpoch, 20), - epochID: atomic.AddUint64(&b.epochClientCounter, 1), + epochChan: make(chan *chainntnfs.BlockEpoch, 20), + cancelChan: make(chan struct{}), + epochID: atomic.AddUint64(&b.epochClientCounter, 1), } select {