diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 8e854442c..9a2d5d760 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -164,6 +164,7 @@ func (n *NeutrinoNotifier) Start() error { n.txNotifier = chainntnfs.NewTxNotifier( n.bestHeight, reorgSafetyLimit, n.confirmHintCache, + n.spendHintCache, ) n.chainConn = &NeutrinoChainConn{n.p2pNode} @@ -603,68 +604,6 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height, newBlock.hash) - // Create a helper struct for coalescing spend notifications triggered - // by this block. - type spendNtfnBatch struct { - details *chainntnfs.SpendDetail - clients map[uint64]*spendNotification - } - - // Scan over the list of relevant transactions and assemble the - // possible spend notifications we need to dispatch. - spendBatches := make(map[wire.OutPoint]spendNtfnBatch) - for _, tx := range newBlock.txns { - mtx := tx.MsgTx() - txSha := mtx.TxHash() - - for i, txIn := range mtx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does spend an output which - // we have a registered notification for, then create a - // spend summary and add it to our batch of spend - // notifications to be delivered. - clients, ok := n.spendNotifications[prevOut] - if !ok { - continue - } - delete(n.spendNotifications, prevOut) - - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: &prevOut, - SpenderTxHash: &txSha, - SpendingTx: mtx, - SpenderInputIndex: uint32(i), - SpendingHeight: int32(newBlock.height), - } - - spendBatches[prevOut] = spendNtfnBatch{ - details: spendDetails, - clients: clients, - } - - } - } - - // Now, we'll update the spend height hint for all of our watched - // outpoints that have not been spent yet. This is safe to do as we do - // not watch already spent outpoints for spend notifications. - ops := make([]wire.OutPoint, 0, len(n.spendNotifications)) - for op := range n.spendNotifications { - ops = append(ops, op) - } - - if len(ops) > 0 { - err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...) - if err != nil { - // The error is not fatal since we are connecting a - // block, and advancing the spend hint is an optimistic - // optimization. - chainntnfs.Log.Errorf("Unable to update spend hint to "+ - "%d for %v: %v", newBlock.height, ops, err) - } - } - // We want to set the best block before dispatching notifications // so if any subscribers make queries based on their received // block epoch, our state is fully updated in time. @@ -674,23 +613,6 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // of the block. n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Finally, send off the spend details to the notification subscribers. - for _, batch := range spendBatches { - for _, ntfn := range batch.clients { - chainntnfs.Log.Infof("Dispatching spend "+ - "notification for outpoint=%v", - ntfn.targetOutpoint) - - ntfn.spendChan <- batch.details - - // Close spendChan to ensure that any calls to - // Cancel will not block. This is safe to do - // since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) - } - } - return nil } @@ -766,86 +688,59 @@ type spendCancel struct { func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { - n.heightMtx.RLock() - currentHeight := n.bestHeight - n.heightMtx.RUnlock() - - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := n.spendHintCache.QuerySpendHint(*outpoint); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, outpoint) - heightHint = hint - } + // First, we'll construct a spend notification request and hand it off + // to the txNotifier. + spendID := atomic.AddUint64(&n.spendClientCounter, 1) + cancel := func() { + n.txNotifier.CancelSpend(*outpoint, spendID) + } + ntfn := &chainntnfs.SpendNtfn{ + SpendID: spendID, + OutPoint: *outpoint, + Event: chainntnfs.NewSpendEvent(cancel), + HeightHint: heightHint, } - // Construct a notification request for the outpoint. We'll defer - // sending it to the main event loop until after we've guaranteed that - // the outpoint has not been spent. - ntfn := &spendNotification{ - targetOutpoint: outpoint, - spendChan: make(chan *chainntnfs.SpendDetail, 1), - spendID: atomic.AddUint64(&n.spendClientCounter, 1), - heightHint: heightHint, + historicalDispatch, err := n.txNotifier.RegisterSpend(ntfn) + if err != nil { + return nil, err } - spendEvent := &chainntnfs.SpendEvent{ - Spend: ntfn.spendChan, - Cancel: func() { - cancel := &spendCancel{ - op: *outpoint, - spendID: ntfn.spendID, - } - - // Submit spend cancellation to notification dispatcher. - select { - case n.notificationCancels <- cancel: - // Cancellation is being handled, drain the - // spend chan until it is closed before yielding - // to the caller. - for { - select { - case _, ok := <-ntfn.spendChan: - if !ok { - return - } - case <-n.quit: - return - } - } - case <-n.quit: - } - }, + // If the txNotifier didn't return any details to perform a historical + // scan of the chain, then we can return early as there's nothing left + // for us to do. + if historicalDispatch == nil { + return ntfn.Event, nil } // Ensure that neutrino is caught up to the height hint before we - // attempt to fetch the utxo from the chain. If we're behind, then we + // attempt to fetch the UTXO from the chain. If we're behind, then we // may miss a notification dispatch. for { n.heightMtx.RLock() - currentHeight = n.bestHeight + currentHeight := n.bestHeight n.heightMtx.RUnlock() - if currentHeight < heightHint { - time.Sleep(time.Millisecond * 200) - continue + if currentHeight >= historicalDispatch.StartHeight { + break } - break - } - - inputToWatch := neutrino.InputWithScript{ - OutPoint: *outpoint, - PkScript: pkScript, + time.Sleep(time.Millisecond * 200) } // Before sending off the notification request, we'll attempt to see if // this output is still spent or not at this point in the chain. + inputToWatch := neutrino.InputWithScript{ + OutPoint: *outpoint, + PkScript: pkScript, + } spendReport, err := n.p2pNode.GetUtxo( neutrino.WatchInputs(inputToWatch), neutrino.StartBlock(&waddrmgr.BlockStamp{ - Height: int32(heightHint), + Height: int32(historicalDispatch.StartHeight), + }), + neutrino.EndBlock(&waddrmgr.BlockStamp{ + Height: int32(historicalDispatch.EndHeight), }), ) if err != nil && !strings.Contains(err.Error(), "not found") { @@ -857,30 +752,33 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, if spendReport != nil && spendReport.SpendingTx != nil { // As a result, we'll launch a goroutine to immediately // dispatch the notification with a normal response. - go func() { - txSha := spendReport.SpendingTx.TxHash() - select { - case ntfn.spendChan <- &chainntnfs.SpendDetail{ - SpentOutPoint: outpoint, - SpenderTxHash: &txSha, - SpendingTx: spendReport.SpendingTx, - SpenderInputIndex: spendReport.SpendingInputIndex, - SpendingHeight: int32(spendReport.SpendingTxHeight), - }: - case <-n.quit: - return - } + spendingTxHash := spendReport.SpendingTx.TxHash() + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: outpoint, + SpenderTxHash: &spendingTxHash, + SpendingTx: spendReport.SpendingTx, + SpenderInputIndex: spendReport.SpendingInputIndex, + SpendingHeight: int32(spendReport.SpendingTxHeight), + } - }() + err := n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails) + if err != nil { + return nil, err + } - return spendEvent, nil + return ntfn.Event, nil + } + + // If the output is still unspent, then we'll mark our historical rescan + // as complete and update our rescan's filter to watch for the spend of + // the outpoint in question. + if err := n.txNotifier.UpdateSpendDetails(*outpoint, nil); err != nil { + return nil, err } - // If the output is still unspent, then we'll update our rescan's - // filter, and send the request to the dispatcher goroutine. rescanUpdate := []neutrino.UpdateOption{ neutrino.AddInputs(inputToWatch), - neutrino.Rewind(currentHeight), + neutrino.Rewind(historicalDispatch.EndHeight), neutrino.DisableDisconnectedNtfns(true), } diff --git a/chainntnfs/neutrinonotify/neutrino_dev.go b/chainntnfs/neutrinonotify/neutrino_dev.go index b5ff182b1..3a1b19b95 100644 --- a/chainntnfs/neutrinonotify/neutrino_dev.go +++ b/chainntnfs/neutrinonotify/neutrino_dev.go @@ -50,6 +50,7 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, n.txNotifier = chainntnfs.NewTxNotifier( uint32(bestHeight), reorgSafetyLimit, n.confirmHintCache, + n.spendHintCache, ) n.chainConn = &NeutrinoChainConn{n.p2pNode}