diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 9dc40a982..26dd11fd5 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -65,13 +65,8 @@ type NeutrinoNotifier struct { rescanErr <-chan error - newBlocksMtx sync.Mutex - newBlocks []*filteredBlock - newBlocksUpdateSignal chan struct{} - - staleBlocksMtx sync.Mutex - staleBlocks []*filteredBlock - staleBlocksUpdateSignal chan struct{} + newBlocks *chainntnfs.ConcurrentQueue + staleBlocks *chainntnfs.ConcurrentQueue wg sync.WaitGroup quit chan struct{} @@ -101,9 +96,9 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { rescanErr: make(chan error), - newBlocksUpdateSignal: make(chan struct{}), + newBlocks: chainntnfs.NewConcurrentQueue(10), - staleBlocksUpdateSignal: make(chan struct{}), + staleBlocks: chainntnfs.NewConcurrentQueue(10), quit: make(chan struct{}), } @@ -155,6 +150,9 @@ func (n *NeutrinoNotifier) Start() error { n.chainView = n.p2pNode.NewRescan(rescanOptions...) n.rescanErr = n.chainView.Start() + n.newBlocks.Start() + n.staleBlocks.Start() + n.wg.Add(1) go n.notificationDispatcher() @@ -171,6 +169,9 @@ func (n *NeutrinoNotifier) Stop() error { close(n.quit) n.wg.Wait() + n.newBlocks.Stop() + n.staleBlocks.Stop() + // Notify all pending clients of our shutdown by closing the related // notification channels. for _, spendClients := range n.spendNotifications { @@ -208,20 +209,11 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. - n.newBlocksMtx.Lock() - n.newBlocks = append(n.newBlocks, &filteredBlock{ + n.newBlocks.ChanIn() <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), txns: txns, - }) - n.newBlocksMtx.Unlock() - - // Launch a goroutine to signal the notification dispatcher that a new - // transaction update is available. We do this in a new goroutine in - // order to avoid blocking the main loop of the rescan. - go func() { - n.newBlocksUpdateSignal <- struct{}{} - }() + } } // onFilteredBlockDisconnected is a callback which is executed each time a new @@ -231,19 +223,10 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. - n.staleBlocksMtx.Lock() - n.staleBlocks = append(n.staleBlocks, &filteredBlock{ + n.staleBlocks.ChanIn() <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), - }) - n.staleBlocksMtx.Unlock() - - // Launch a goroutine to signal the notification dispatcher that a new - // transaction update is available. We do this in a new goroutine in - // order to avoid blocking the main loop of the rescan. - go func() { - n.staleBlocksUpdateSignal <- struct{}{} - }() + } } // notificationDispatcher is the primary goroutine which handles client @@ -335,14 +318,8 @@ func (n *NeutrinoNotifier) notificationDispatcher() { n.blockEpochClients[msg.epochID] = msg } - case <-n.newBlocksUpdateSignal: - // A new update is available, so pop the new chain - // update from the front of the update queue. - n.newBlocksMtx.Lock() - newBlock := n.newBlocks[0] - n.newBlocks[0] = nil // Set to nil to prevent GC leak. - n.newBlocks = n.newBlocks[1:] - n.newBlocksMtx.Unlock() + case item := <-n.newBlocks.ChanOut(): + newBlock := item.(*filteredBlock) n.heightMtx.Lock() n.bestHeight = newBlock.height @@ -417,15 +394,8 @@ func (n *NeutrinoNotifier) notificationDispatcher() { // have been triggered by this new block. n.notifyConfs(int32(newBlock.height)) - case <-n.staleBlocksUpdateSignal: - // A new update is available, so pop the new chain - // update from the front of the update queue. - n.staleBlocksMtx.Lock() - staleBlock := n.staleBlocks[0] - n.staleBlocks[0] = nil // Set to nil to prevent GC leak. - n.staleBlocks = n.staleBlocks[1:] - n.staleBlocksMtx.Unlock() - + case item := <-n.staleBlocks.ChanOut(): + staleBlock := item.(*filteredBlock) chainntnfs.Log.Warnf("Block disconnected from main "+ "chain: %v", staleBlock.hash)