From 4405dac4d046a7b24e3b800a9e0955f901489a22 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 13 Nov 2017 12:42:50 -0800 Subject: [PATCH] chainntnfs/btcd: Refactor BtcdNotifier to use TxConfNotifier. --- chainntnfs/btcdnotify/btcd.go | 265 ++++++++---------------------- chainntnfs/btcdnotify/confheap.go | 58 ------- 2 files changed, 66 insertions(+), 257 deletions(-) delete mode 100644 chainntnfs/btcdnotify/confheap.go diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 249e861f1..bc9004358 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -1,8 +1,8 @@ package btcdnotify import ( - "container/heap" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -20,6 +20,11 @@ const ( // notifierType uniquely identifies this concrete implementation of the // ChainNotifier interface. notifierType = "btcd" + + // reorgSafetyLimit is assumed maximum depth of a chain reorganization. + // After this many confirmation, transaction confirmation info will be + // pruned. + reorgSafetyLimit = 100 ) var ( @@ -69,8 +74,7 @@ type BtcdNotifier struct { spendNotifications map[wire.OutPoint]map[uint64]*spendNotification - confNotifications map[chainhash.Hash][]*confirmationsNotification - confHeap *confirmationHeap + txConfNotifier *chainntnfs.TxConfNotifier blockEpochClients map[uint64]*blockEpochRegistration @@ -96,9 +100,6 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { spendNotifications: make(map[wire.OutPoint]map[uint64]*spendNotification), - confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), - confHeap: newConfirmationHeap(), - chainUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: chainntnfs.NewConcurrentQueue(10), @@ -146,6 +147,9 @@ func (b *BtcdNotifier) Start() error { return err } + b.txConfNotifier = chainntnfs.NewTxConfNotifier( + uint32(currentHeight), reorgSafetyLimit) + b.chainUpdates.Start() b.txUpdates.Start() @@ -179,15 +183,10 @@ func (b *BtcdNotifier) Stop() error { close(spendClient.spendChan) } } - for _, confClients := range b.confNotifications { - for _, confClient := range confClients { - close(confClient.finConf) - close(confClient.negativeConf) - } - } for _, epochClient := range b.blockEpochClients { close(epochClient.epochChan) } + b.txConfNotifier.TearDown() return nil } @@ -277,17 +276,15 @@ out: case *confirmationsNotification: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", - *msg.txid, msg.numConfirmations) + msg.TxID, msg.NumConfirmations) - // If the notification can be partially or - // fully dispatched, then we can skip the first - // phase for ntfns. - if b.attemptHistoricalDispatch(msg) { - continue + // Lookup whether the transaction is already included in the + // active chain. + txConf, err := b.historicalConfDetails(msg.TxID) + if err != nil { + chainntnfs.Log.Error(err) } - - txid := *msg.txid - b.confNotifications[txid] = append(b.confNotifications[txid], msg) + b.txConfNotifier.Register(&msg.ConfNtfn, txConf) case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg @@ -304,7 +301,7 @@ out: currentHeight = update.blockHeight - newBlock, err := b.chainConn.GetBlock(update.blockHash) + rawBlock, err := b.chainConn.GetBlock(update.blockHash) if err != nil { chainntnfs.Log.Errorf("Unable to get block: %v", err) continue @@ -313,26 +310,14 @@ out: chainntnfs.Log.Infof("New block: height=%v, sha=%v", update.blockHeight, update.blockHash) - b.notifyBlockEpochs(update.blockHeight, - update.blockHash) + b.notifyBlockEpochs(update.blockHeight, update.blockHash) - newHeight := update.blockHeight - for i, tx := range newBlock.Transactions { - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the - // notification on a heap to be triggered in - // the future once additional confirmations are - // attained. - txSha := tx.TxHash() - b.checkConfirmationTrigger(&txSha, update, i) + txns := btcutil.NewBlock(rawBlock).Transactions() + err = b.txConfNotifier.ConnectTip(update.blockHash, + uint32(update.blockHeight), txns) + if err != nil { + chainntnfs.Log.Error(err) } - - // A new block has been connected to the main - // chain. Send out any N confirmation notifications - // which may have been triggered by this new block. - b.notifyConfs(newHeight) } else { if update.blockHeight != currentHeight { chainntnfs.Log.Warnf("Received blocks out of order: "+ @@ -342,12 +327,13 @@ out: currentHeight = update.blockHeight - 1 - // TODO(roasbeef): re-orgs - // * second channel to notify of confirmation decrementing - // re-org? - // * notify of negative confirmations chainntnfs.Log.Infof("Block disconnected from main chain: "+ "height=%v, sha=%v", update.blockHeight, update.blockHash) + + err := b.txConfNotifier.DisconnectTip(uint32(update.blockHeight)) + if err != nil { + chainntnfs.Log.Error(err) + } } case item := <-b.txUpdates.ChanOut(): @@ -403,29 +389,25 @@ out: b.wg.Done() } -// attemptHistoricalDispatch tries to use historical information to decide if a -// notification ca be dispatched immediately, or is partially confirmed so it -// can skip straight to the confirmations heap. -// -// Returns true if the transaction was either partially or completely confirmed -func (b *BtcdNotifier) attemptHistoricalDispatch( - msg *confirmationsNotification) bool { - - chainntnfs.Log.Infof("Attempting to trigger dispatch for %v from "+ - "historical chain", msg.txid) +// historicalConfDetails looks up whether a transaction is already included in a +// block in the active chain and, if so, returns details about the confirmation. +func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, +) (*chainntnfs.TxConfirmation, error) { // If the transaction already has some or all of the confirmations, // then we may be able to dispatch it immediately. - tx, err := b.chainConn.GetRawTransactionVerbose(msg.txid) + tx, err := b.chainConn.GetRawTransactionVerbose(txid) if err != nil || tx == nil || tx.BlockHash == "" { - jsonErr, ok := err.(*btcjson.RPCError) - switch { - case ok && jsonErr.Code == -5: - default: - chainntnfs.Log.Warnf("unable to query for txid(%v): %v", - msg.txid, err) + if err == nil { + return nil, nil } - return false + // Do not return an error if the transaction was not found. + if jsonErr, ok := err.(*btcjson.RPCError); ok { + if jsonErr.Code == btcjson.ErrRPCNoTxInfo { + return nil, nil + } + } + return nil, fmt.Errorf("unable to query for txid(%v): %v", txid, err) } // As we need to fully populate the returned TxConfirmation struct, @@ -433,55 +415,36 @@ func (b *BtcdNotifier) attemptHistoricalDispatch( // locate its exact index within the block. blockHash, err := chainhash.NewHashFromStr(tx.BlockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block hash %v for "+ - "historical dispatch: %v", tx.BlockHash, err) - return false + return nil, fmt.Errorf("unable to get block hash %v for historical "+ + "dispatch: %v", tx.BlockHash, err) } block, err := b.chainConn.GetBlockVerbose(blockHash) if err != nil { - chainntnfs.Log.Errorf("unable to get block hash: %v", err) - return false + return nil, fmt.Errorf("unable to get block hash: %v", err) } // If the block obtained, locate the transaction's index within the // block so we can give the subscriber full confirmation details. - var txIndex uint32 - targetTxidStr := msg.txid.String() + txIndex := -1 + targetTxidStr := txid.String() for i, txHash := range block.Tx { if txHash == targetTxidStr { - txIndex = uint32(i) + txIndex = i break } } - confDetails := &chainntnfs.TxConfirmation{ + if txIndex == -1 { + return nil, fmt.Errorf("unable to locate tx %v in block %v", + txid, blockHash) + } + + txConf := chainntnfs.TxConfirmation{ BlockHash: blockHash, BlockHeight: uint32(block.Height), - TxIndex: txIndex, + TxIndex: uint32(txIndex), } - - // If the transaction has more that enough confirmations, then we can - // dispatch it immediately after obtaining for information w.r.t - // exactly *when* if got all its confirmations. - if uint32(tx.Confirmations) >= msg.numConfirmations { - chainntnfs.Log.Infof("Dispatching %v conf notification", - msg.numConfirmations) - msg.finConf <- confDetails - return true - } - - // Otherwise, the transaction has only been *partially* confirmed, so - // we need to insert it into the confirmation heap. - // Find the block height at which this transaction will be confirmed - confHeight := uint32(block.Height) + msg.numConfirmations - 1 - heapEntry := &confEntry{ - msg, - confDetails, - confHeight, - } - heap.Push(b.confHeap, heapEntry) - - return true + return &txConf, nil } // notifyBlockEpochs notifies all registered block epoch clients of the newly @@ -517,92 +480,6 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash } } -// notifyConfs examines the current confirmation heap, sending off any -// notifications which have been triggered by the connection of a new block at -// newBlockHeight. -func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) { - // If the heap is empty, we have nothing to do. - if b.confHeap.Len() == 0 { - return - } - - // Traverse our confirmation heap. The heap is a - // min-heap, so the confirmation notification which requires - // the smallest block-height will always be at the top - // of the heap. If a confirmation notification is eligible - // for triggering, then fire it off, and check if another - // is eligible until there are no more eligible entries. - nextConf := heap.Pop(b.confHeap).(*confEntry) - for nextConf.triggerHeight <= uint32(newBlockHeight) { - chainntnfs.Log.Infof("Dispatching %v conf notification, "+ - "height=%v", nextConf.numConfirmations, newBlockHeight) - nextConf.finConf <- nextConf.initialConfDetails - - if b.confHeap.Len() == 0 { - return - } - - nextConf = heap.Pop(b.confHeap).(*confEntry) - } - - heap.Push(b.confHeap, nextConf) -} - -// checkConfirmationTrigger determines if the passed txSha included at blockHeight -// triggers any single confirmation notifications. In the event that the txid -// matches, yet needs additional confirmations, it is added to the confirmation -// heap to be triggered at a later time. -// TODO(roasbeef): perhaps lookup, then track by inputs instead? -func (b *BtcdNotifier) checkConfirmationTrigger(txSha *chainhash.Hash, - newTip *chainUpdate, txIndex int) { - - // If a confirmation notification has been registered - // for this txid, then either trigger a notification - // event if only a single confirmation notification was - // requested, or place the notification on the - // confirmation heap for future usage. - if confClients, ok := b.confNotifications[*txSha]; ok { - // Either all of the registered confirmations will be - // dispatched due to a single confirmation, or added to the - // conf head. Therefore we unconditionally delete the registered - // confirmations from the staging zone. - defer func() { - delete(b.confNotifications, *txSha) - }() - - for _, confClient := range confClients { - confDetails := &chainntnfs.TxConfirmation{ - BlockHash: newTip.blockHash, - BlockHeight: uint32(newTip.blockHeight), - TxIndex: uint32(txIndex), - } - - if confClient.numConfirmations == 1 { - chainntnfs.Log.Infof("Dispatching single conf "+ - "notification, sha=%v, height=%v", txSha, - newTip.blockHeight) - confClient.finConf <- confDetails - continue - } - - // The registered notification requires more - // than one confirmation before triggering. So - // we create a heapConf entry for this notification. - // The heapConf allows us to easily keep track of - // which notification(s) we should fire off with - // each incoming block. - confClient.initialConfirmHeight = uint32(newTip.blockHeight) - finalConfHeight := confClient.initialConfirmHeight + confClient.numConfirmations - 1 - heapEntry := &confEntry{ - confClient, - confDetails, - finalConfHeight, - } - heap.Push(b.confHeap, heapEntry) - } - } -} - // spendNotification couples a target outpoint along with the channel used for // notifications once a spend of the outpoint has been detected. type spendNotification struct { @@ -659,9 +536,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, transaction, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) if err != nil { jsonErr, ok := err.(*btcjson.RPCError) - switch { - case ok && jsonErr.Code == -5: - default: + if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { return nil, err } } @@ -713,13 +588,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // confirmationNotification represents a client's intent to receive a // notification once the target txid reaches numConfirmations confirmations. type confirmationsNotification struct { - txid *chainhash.Hash - - initialConfirmHeight uint32 - numConfirmations uint32 - - finConf chan *chainntnfs.TxConfirmation - negativeConf chan int32 // TODO(roasbeef): re-org funny business + chainntnfs.ConfNtfn } // RegisterConfirmationsNtfn registers a notification with BtcdNotifier @@ -729,20 +598,18 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, _ uint32) (*chainntnfs.ConfirmationEvent, error) { ntfn := &confirmationsNotification{ - txid: txid, - numConfirmations: numConfs, - finConf: make(chan *chainntnfs.TxConfirmation, 1), - negativeConf: make(chan int32, 1), + chainntnfs.ConfNtfn{ + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(), + }, } select { case <-b.quit: return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: - return &chainntnfs.ConfirmationEvent{ - Confirmed: ntfn.finConf, - NegativeConf: ntfn.negativeConf, - }, nil + return ntfn.Event, nil } } diff --git a/chainntnfs/btcdnotify/confheap.go b/chainntnfs/btcdnotify/confheap.go deleted file mode 100644 index 84379c29c..000000000 --- a/chainntnfs/btcdnotify/confheap.go +++ /dev/null @@ -1,58 +0,0 @@ -package btcdnotify - -import "github.com/lightningnetwork/lnd/chainntnfs" - -// confEntry represents an entry in the min-confirmation heap. -type confEntry struct { - *confirmationsNotification - - initialConfDetails *chainntnfs.TxConfirmation - - triggerHeight uint32 -} - -// confirmationHeap is a list of confEntries sorted according to nearest -// "confirmation" height.Each entry within the min-confirmation heap is sorted -// according to the smallest delta from the current blockheight to the -// triggerHeight of the next entry confirmationHeap -type confirmationHeap struct { - items []*confEntry -} - -// newConfirmationHeap returns a new confirmationHeap with zero items. -func newConfirmationHeap() *confirmationHeap { - var confItems []*confEntry - return &confirmationHeap{confItems} -} - -// Len returns the number of items in the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Len() int { return len(c.items) } - -// Less returns whether the item in the priority queue with index i should sort -// before the item with index j. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Less(i, j int) bool { - return c.items[i].triggerHeight < c.items[j].triggerHeight -} - -// Swap swaps the items at the passed indices in the priority queue. It is -// part of the heap.Interface implementation. -func (c *confirmationHeap) Swap(i, j int) { - c.items[i], c.items[j] = c.items[j], c.items[i] -} - -// Push pushes the passed item onto the priority queue. It is part of the -// heap.Interface implementation. -func (c *confirmationHeap) Push(x interface{}) { - c.items = append(c.items, x.(*confEntry)) -} - -// Pop removes the highest priority item (according to Less) from the priority -// queue and returns it. It is part of the heap.Interface implementation. -func (c *confirmationHeap) Pop() interface{} { - n := len(c.items) - x := c.items[n-1] - c.items[n-1] = nil - c.items = c.items[0 : n-1] - return x -}