diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 6f9c8a3ab..f59582fe1 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -103,6 +103,11 @@ func testSingleConfirmationNotification(miner *rpctest.Harness, select { case confInfo := <-confIntent.Confirmed: + if !confInfo.BlockHash.IsEqual(blockHash[0]) { + t.Fatalf("mismatched block hashes: expected %v, got %v", + blockHash[0], confInfo.BlockHash) + } + // Finally, we'll verify that the tx index returned is the exact same // as the tx index of the transaction within the block itself. msgBlock, err := miner.Node.GetBlock(blockHash[0]) diff --git a/chainntnfs/neutrinonotify/confheap.go b/chainntnfs/neutrinonotify/confheap.go deleted file mode 100644 index 4ac76390a..000000000 --- a/chainntnfs/neutrinonotify/confheap.go +++ /dev/null @@ -1,58 +0,0 @@ -package neutrinonotify - -import "github.com/lightningnetwork/lnd/chainntnfs" - -// confEntry represents an entry in the min-confirmation heap. . -type confEntry struct { - *confirmationsNotification - - initialConfDetails *chainntnfs.TxConfirmation - - triggerHeight uint32 -} - -// confirmationHeap is a list of confEntries sorted according to nearest -// "confirmation" height.Each entry within the min-confirmation heap is sorted -// according to the smallest dleta from the current blockheight to the -// triggerHeight of the next entry confirmationHeap -type confirmationHeap struct { - items []*confEntry -} - -// newConfirmationHeap returns a new confirmationHeap with zero items. -func newConfirmationHeap() *confirmationHeap { - var confItems []*confEntry - return &confirmationHeap{confItems} -} - -// Len returns the number of items in the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Len() int { return len(c.items) } - -// Less returns whether the item in the priority queue with index i should sort -// before the item with index j. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Less(i, j int) bool { - return c.items[i].triggerHeight < c.items[j].triggerHeight -} - -// Swap swaps the items at the passed indices in the priority queue. It is -// part of the heap.Interface implementation. -func (c *confirmationHeap) Swap(i, j int) { - c.items[i], c.items[j] = c.items[j], c.items[i] -} - -// Push pushes the passed item onto the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Push(x interface{}) { - c.items = append(c.items, x.(*confEntry)) -} - -// Pop removes the highest priority item (according to Less) from the priority -// queue and returns it. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Pop() interface{} { - n := len(c.items) - x := c.items[n-1] - c.items[n-1] = nil - c.items = c.items[0 : n-1] - return x -} diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 821252d11..518ec9a92 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -1,8 +1,8 @@ package neutrinonotify import ( - "container/heap" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -22,6 +22,12 @@ const ( // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "neutrino" + + // reorgSafetyLimit is the chain depth beyond which it is assumed a block + // will not be reorganized out of the chain. This is used to determine when + // to prune old confirmation requests so that reorgs are handled correctly. + // The coinbase maturity period is a reasonable value to use. + reorgSafetyLimit = 100 ) var ( @@ -58,8 +64,7 @@ type NeutrinoNotifier struct { spendNotifications map[wire.OutPoint]map[uint64]*spendNotification - confNotifications map[chainhash.Hash][]*confirmationsNotification - confHeap *confirmationHeap + txConfNotifier *chainntnfs.TxConfNotifier blockEpochClients map[uint64]*blockEpochRegistration @@ -88,9 +93,6 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), - confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), - confHeap: newConfirmationHeap(), - p2pNode: node, rescanErr: make(chan error), @@ -142,6 +144,9 @@ func (n *NeutrinoNotifier) Start() error { neutrino.WatchTxIDs(zeroHash), } + n.txConfNotifier = chainntnfs.NewTxConfNotifier( + bestHeight, reorgSafetyLimit) + // Finally, we'll create our rescan struct, start it, and launch all // the goroutines we need to operate this ChainNotifier instance. n.chainView = n.p2pNode.NewRescan(rescanOptions...) @@ -174,15 +179,10 @@ func (n *NeutrinoNotifier) Stop() error { close(spendClient.spendChan) } } - for _, confClients := range n.confNotifications { - for _, confClient := range confClients { - close(confClient.finConf) - close(confClient.negativeConf) - } - } for _, epochClient := range n.blockEpochClients { close(epochClient.epochChan) } + n.txConfNotifier.TearDown() return nil } @@ -284,35 +284,39 @@ func (n *NeutrinoNotifier) notificationDispatcher() { n.spendNotifications[op][msg.spendID] = msg case *confirmationsNotification: - chainntnfs.Log.Infof("New confirmations "+ - "subscription: txid=%v, numconfs=%v, "+ - "height_hint=%v", *msg.txid, - msg.numConfirmations, msg.heightHint) + chainntnfs.Log.Infof("New confirmations subscription: "+ + "txid=%v, numconfs=%v, height_hint=%v", + msg.TxID, msg.NumConfirmations, msg.heightHint) // If the notification can be partially or // fully dispatched, then we can skip the first // phase for ntfns. n.heightMtx.RLock() currentHeight := n.bestHeight - if n.attemptHistoricalDispatch(msg, currentHeight, msg.heightHint) { - n.heightMtx.RUnlock() - continue - } n.heightMtx.RUnlock() - // If we can't fully dispatch confirmation, - // then we'll update our filter so we can be - // notified of its future initial confirmation. - rescanUpdate := []neutrino.UpdateOption{ - neutrino.AddTxIDs(*msg.txid), - neutrino.Rewind(currentHeight), - } - if err := n.chainView.Update(rescanUpdate...); err != nil { - chainntnfs.Log.Errorf("unable to update rescan: %v", err) + // Lookup whether the transaction is already included in the + // active chain. + txConf, err := n.historicalConfDetails(msg.TxID, currentHeight, + msg.heightHint) + if err != nil { + chainntnfs.Log.Error(err) } - txid := *msg.txid - n.confNotifications[txid] = append(n.confNotifications[txid], msg) + if txConf == nil { + // If we can't fully dispatch confirmation, + // then we'll update our filter so we can be + // notified of its future initial confirmation. + rescanUpdate := []neutrino.UpdateOption{ + neutrino.AddTxIDs(*msg.TxID), + neutrino.Rewind(currentHeight), + } + if err := n.chainView.Update(rescanUpdate...); err != nil { + chainntnfs.Log.Errorf("unable to update rescan: %v", err) + } + } + + n.txConfNotifier.Register(&msg.ConfNtfn, txConf) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") @@ -352,6 +356,11 @@ func (n *NeutrinoNotifier) notificationDispatcher() { chainntnfs.Log.Infof("Block disconnected from main chain: "+ "height=%v, sha=%v", update.height, update.hash) + + err := n.txConfNotifier.DisconnectTip(update.height) + if err != nil { + chainntnfs.Log.Error(err) + } } case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) @@ -363,32 +372,20 @@ func (n *NeutrinoNotifier) notificationDispatcher() { } } -// attemptHistoricalDispatch attempts to consult the historical chain data to -// see if a transaction has already reached full confirmation status at the -// time a notification for it was registered. If it has, then we do an -// immediate dispatch. Otherwise, we'll add the partially confirmed transaction -// to the confirmation heap. -func (n *NeutrinoNotifier) attemptHistoricalDispatch(msg *confirmationsNotification, - currentHeight, heightHint uint32) bool { - - targetHash := msg.txid - - var confDetails *chainntnfs.TxConfirmation - - chainntnfs.Log.Infof("Attempting to trigger dispatch for %v from "+ - "historical chain", msg.txid) +// historicalConfDetails looks up whether a transaction is already included in a +// block in the active chain and, if so, returns details about the confirmation. +func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, + currentHeight, heightHint uint32) (*chainntnfs.TxConfirmation, error) { // Starting from the height hint, we'll walk forwards in the chain to // see if this transaction has already been confirmed. -chainScan: for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { // First, we'll fetch the block header for this height so we // can compute the current block hash. header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) if err != nil { - chainntnfs.Log.Errorf("unable to get header for "+ - "height=%v: %v", scanHeight, err) - return false + return nil, fmt.Errorf("unable to get header for height=%v: %v", + scanHeight, err) } blockHash := header.BlockHash() @@ -397,9 +394,8 @@ chainScan: regFilter, err := n.p2pNode.GetCFilter(blockHash, wire.GCSFilterRegular) if err != nil { - chainntnfs.Log.Errorf("unable to retrieve regular "+ - "filter for height=%v: %v", scanHeight, err) - return false + return nil, fmt.Errorf("unable to retrieve regular filter for "+ + "height=%v: %v", scanHeight, err) } // If the block has no transactions other than the coinbase @@ -414,8 +410,7 @@ chainScan: key := builder.DeriveKey(&blockHash) match, err := regFilter.Match(key, targetHash[:]) if err != nil { - chainntnfs.Log.Errorf("unable to query filter: %v", err) - return false + return nil, fmt.Errorf("unable to query filter: %v", err) } // If there's no match, then we can continue forward to the @@ -429,54 +424,22 @@ chainScan: // to send the proper response. block, err := n.p2pNode.GetBlockFromNetwork(blockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block from "+ - "network: %v", err) - return false + return nil, fmt.Errorf("unable to get block from network: %v", err) } for j, tx := range block.Transactions() { txHash := tx.Hash() if txHash.IsEqual(targetHash) { - confDetails = &chainntnfs.TxConfirmation{ + confDetails := chainntnfs.TxConfirmation{ BlockHash: &blockHash, BlockHeight: scanHeight, TxIndex: uint32(j), } - break chainScan + return &confDetails, nil } } } - // If it hasn't yet been confirmed, then we can exit early. - if confDetails == nil { - return false - } - - // Otherwise, we'll calculate the number of confirmations that the - // transaction has so we can decide if it has reached the desired - // number of confirmations or not. - txConfs := currentHeight - confDetails.BlockHeight + 1 - - // If the transaction has more that enough confirmations, then we can - // dispatch it immediately after obtaining for information w.r.t - // exactly *when* if got all its confirmations. - if uint32(txConfs) >= msg.numConfirmations { - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", msg.numConfirmations, currentHeight) - msg.finConf <- confDetails - return true - } - - // Otherwise, the transaction has only been *partially* confirmed, so - // we need to insert it into the confirmation heap. - confHeight := confDetails.BlockHeight + msg.numConfirmations - 1 - heapEntry := &confEntry{ - msg, - confDetails, - confHeight, - } - heap.Push(n.confHeap, heapEntry) - - return true + return nil, nil } // handleBlocksConnected applies a chain update for a new block. Any watched @@ -489,14 +452,8 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // Next, we'll scan over the list of relevant transactions and possibly // dispatch notifications for confirmations and spends. for _, tx := range newBlock.txns { - // Check if the inclusion of this transaction within a block by itself - // triggers a block confirmation threshold, if so send a notification. - // Otherwise, place the notification on a heap to be triggered in the - // future once additional confirmations are attained. mtx := tx.MsgTx() - txIndex := tx.Index() txSha := mtx.TxHash() - n.checkConfirmationTrigger(&txSha, newBlock, txIndex) for i, txIn := range mtx.TxIn { prevOut := txIn.PreviousOutPoint @@ -537,7 +494,7 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // A new block has been connected to the main chain. // Send out any N confirmation notifications which may // have been triggered by this new block. - n.notifyConfs(int32(newBlock.height)) + n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns) return nil } @@ -572,91 +529,6 @@ func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash. } } -// notifyConfs examines the current confirmation heap, sending off any -// notifications which have been triggered by the connection of a new block at -// newBlockHeight. -func (n *NeutrinoNotifier) notifyConfs(newBlockHeight int32) { - // If the heap is empty, we have nothing to do. - if n.confHeap.Len() == 0 { - return - } - - // Traverse our confirmation heap. The heap is a min-heap, so the - // confirmation notification which requires the smallest block-height - // will always be at the top of the heap. If a confirmation - // notification is eligible for triggering, then fire it off, and check - // if another is eligible until there are no more eligible entries. - nextConf := heap.Pop(n.confHeap).(*confEntry) - for nextConf.triggerHeight <= uint32(newBlockHeight) { - - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", nextConf.numConfirmations, newBlockHeight) - - nextConf.finConf <- nextConf.initialConfDetails - - if n.confHeap.Len() == 0 { - return - } - - nextConf = heap.Pop(n.confHeap).(*confEntry) - } - - heap.Push(n.confHeap, nextConf) -} - -// checkConfirmationTrigger determines if the passed txSha included at -// blockHeight triggers any single confirmation notifications. In the event -// that the txid matches, yet needs additional confirmations, it is added to -// the confirmation heap to be triggered at a later time. -func (n *NeutrinoNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, - newTip *filteredBlock, txIndex int) { - - // If a confirmation notification has been registered for this txid, - // then either trigger a notification event if only a single - // confirmation notification was requested, or place the notification - // on the confirmation heap for future usage. - if confClients, ok := n.confNotifications[*txSha]; ok { - // Either all of the registered confirmations will be - // dispatched due to a single confirmation, or added to the - // conf head. Therefor we unconditionally delete the registered - // confirmations from the staging zone. - defer func() { - delete(n.confNotifications, *txSha) - }() - - for _, confClient := range confClients { - confDetails := &chainntnfs.TxConfirmation{ - BlockHash: &newTip.hash, - BlockHeight: uint32(newTip.height), - TxIndex: uint32(txIndex), - } - - if confClient.numConfirmations == 1 { - chainntnfs.Log.Infof("Dispatching single conf "+ - "notification, sha=%v, height=%v", txSha, - newTip.height) - confClient.finConf <- confDetails - continue - } - - // The registered notification requires more than one - // confirmation before triggering. So we create a - // heapConf entry for this notification. The heapConf - // allows us to easily keep track of which - // notification(s) we should fire off with each - // incoming block. - confClient.initialConfirmHeight = uint32(newTip.height) - finalConfHeight := confClient.initialConfirmHeight + confClient.numConfirmations - 1 - heapEntry := &confEntry{ - confClient, - confDetails, - finalConfHeight, - } - heap.Push(n.confHeap, heapEntry) - } - } -} - // spendNotification couples a target outpoint along with the channel used for // notifications once a spend of the outpoint has been detected. type spendNotification struct { @@ -799,15 +671,8 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // confirmationNotification represents a client's intent to receive a // notification once the target txid reaches numConfirmations confirmations. type confirmationsNotification struct { - txid *chainhash.Hash - + chainntnfs.ConfNtfn heightHint uint32 - - initialConfirmHeight uint32 - numConfirmations uint32 - - finConf chan *chainntnfs.TxConfirmation - negativeConf chan int32 // TODO(roasbeef): re-org funny business } // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier @@ -817,21 +682,19 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { ntfn := &confirmationsNotification{ - txid: txid, - heightHint: heightHint, - numConfirmations: numConfs, - finConf: make(chan *chainntnfs.TxConfirmation, 1), - negativeConf: make(chan int32, 1), + ConfNtfn: chainntnfs.ConfNtfn{ + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(), + }, + heightHint: heightHint, } select { case <-n.quit: return nil, ErrChainNotifierShuttingDown case n.notificationRegistry <- ntfn: - return &chainntnfs.ConfirmationEvent{ - Confirmed: ntfn.finConf, - NegativeConf: ntfn.negativeConf, - }, nil + return ntfn.Event, nil } }