From b7de0eae93e81e1b7250f48356c7c306f512cf23 Mon Sep 17 00:00:00 2001 From: eugene Date: Fri, 16 Jul 2021 15:35:43 -0400 Subject: [PATCH] chainntnfs/neutrinonotify: make chainUpdates a buffered chan ConcurrentQueue has internal structures so if a filterUpdate exists in it, the idea of draining the queue may not work reliably. The update may exist in the ConcurrentQueue but may not be available via ChanOut() when we're ready to drain the ConcurrentQueue. Fix this by using a regular buffered chan, which will either have the update or not have the update. Its size is set to 100 as our tests may generate quite a bit of updates. --- chainntnfs/neutrinonotify/neutrino.go | 21 ++++++++++----------- chainntnfs/neutrinonotify/neutrino_dev.go | 5 ++--- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 2320c0e6e..ef96eaa8a 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -62,8 +62,9 @@ type NeutrinoNotifier struct { rescanErr <-chan error - chainUpdates *queue.ConcurrentQueue - txUpdates *queue.ConcurrentQueue + chainUpdates chan *filteredBlock + + txUpdates *queue.ConcurrentQueue // spendHintCache is a cache used to query and update the latest height // hints for an outpoint. Each height hint represents the earliest @@ -105,8 +106,9 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, rescanErr: make(chan error), - chainUpdates: queue.NewConcurrentQueue(10), - txUpdates: queue.NewConcurrentQueue(10), + chainUpdates: make(chan *filteredBlock, 100), + + txUpdates: queue.NewConcurrentQueue(10), spendHintCache: spendHintCache, confirmHintCache: confirmHintCache, @@ -137,7 +139,6 @@ func (n *NeutrinoNotifier) Stop() error { close(n.quit) n.wg.Wait() - n.chainUpdates.Stop() n.txUpdates.Stop() // Notify all pending clients of our shutdown by closing the related @@ -162,7 +163,6 @@ func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be // blocked. - n.chainUpdates.Start() n.txUpdates.Start() // First, we'll obtain the latest block height of the p2p node. We'll @@ -172,7 +172,6 @@ func (n *NeutrinoNotifier) startNotifier() error { startingPoint, err := n.p2pNode.BestBlock() if err != nil { n.txUpdates.Stop() - n.chainUpdates.Stop() return err } n.bestBlock.Hash = &startingPoint.Hash @@ -251,7 +250,7 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. select { - case n.chainUpdates.ChanIn() <- &filteredBlock{ + case n.chainUpdates <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), txns: txns, @@ -269,7 +268,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. select { - case n.chainUpdates.ChanIn() <- &filteredBlock{ + case n.chainUpdates <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), connect: false, @@ -413,8 +412,8 @@ out: msg.errChan <- err } - case item := <-n.chainUpdates.ChanOut(): - update := item.(*filteredBlock) + case item := <-n.chainUpdates: + update := item if update.connect { n.bestBlockMtx.Lock() // Since neutrino has no way of knowing what diff --git a/chainntnfs/neutrinonotify/neutrino_dev.go b/chainntnfs/neutrinonotify/neutrino_dev.go index 728514aa8..d19783cce 100644 --- a/chainntnfs/neutrinonotify/neutrino_dev.go +++ b/chainntnfs/neutrinonotify/neutrino_dev.go @@ -63,7 +63,6 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, ) n.rescanErr = n.chainView.Start() - n.chainUpdates.Start() n.txUpdates.Start() if generateBlocks != nil { @@ -80,8 +79,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, loop: for { select { - case ntfn := <-n.chainUpdates.ChanOut(): - lastReceivedNtfn := ntfn.(*filteredBlock) + case ntfn := <-n.chainUpdates: + lastReceivedNtfn := ntfn if lastReceivedNtfn.height >= uint32(syncHeight) { break loop }