From 1babec971f974319093691dafc81a7475781ca94 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 24 Aug 2018 19:29:55 -0700 Subject: [PATCH] chainntnfs/txconfnotifier: isolate scanning ntfns --- chainntnfs/txconfnotifier.go | 252 ++++++++++++++++++++++------------- 1 file changed, 162 insertions(+), 90 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 4914c2ff4..099066c1f 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -77,7 +77,7 @@ type TxConfNotifier struct { // confNotifications is an index of notification requests by transaction // hash. - confNotifications map[chainhash.Hash]map[uint64]*ConfNtfn + confNotifications map[chainhash.Hash]*confNtfnSet // txsByInitialHeight is an index of watched transactions by the height // that they are included at in the blockchain. This is tracked so that @@ -122,6 +122,26 @@ const ( rescanComplete ) +// confNtfnSet holds all known, registered confirmation notifications for a +// single txid. If duplicates notifications are requested, only one historical +// dispatch will be spawned to ensure redundant scans are not permitted. A +// single conf detail will be constructed and dispatched to all interested +// clients. +type confNtfnSet struct { + ntfns map[uint64]*ConfNtfn + rescanStatus rescanState + details *TxConfirmation +} + +// newConfNtfnSet constructs a fresh confNtfnSet for a group of clients +// interested in a notification for a particular txid. +func newConfNtfnSet() *confNtfnSet { + return &confNtfnSet{ + ntfns: make(map[uint64]*ConfNtfn), + rescanStatus: rescanNotStarted, + } +} + // NewTxConfNotifier creates a TxConfNotifier. The current height of the // blockchain is accepted as a parameter. func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, @@ -130,7 +150,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, return &TxConfNotifier{ currentHeight: startHeight, reorgSafetyLimit: reorgSafetyLimit, - confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), + confNotifications: make(map[chainhash.Hash]*confNtfnSet), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), hintCache: hintCache, @@ -145,35 +165,58 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, // the confirmation details must be provided with the UpdateConfDetails method, // otherwise we will wait for the transaction to confirm even though it already // has. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { +func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) (bool, uint32, error) { select { case <-tcn.quit: - return ErrTxConfNotifierExiting + return false, 0, ErrTxConfNotifierExiting default: } tcn.Lock() defer tcn.Unlock() - ntfns, ok := tcn.confNotifications[*ntfn.TxID] - if !ok { - ntfns = make(map[uint64]*ConfNtfn) - tcn.confNotifications[*ntfn.TxID] = ntfns + // TODO(conner): promote immediately to confNotifications if a + // historical dispatch has already completed. - err := tcn.hintCache.CommitConfirmHint( - tcn.currentHeight, *ntfn.TxID, - ) - if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. - Log.Errorf("Unable to update confirm hint to %d for "+ - "%v: %v", tcn.currentHeight, *ntfn.TxID, err) - } + confSet, ok := tcn.confNotifications[*ntfn.TxID] + if !ok { + confSet = newConfNtfnSet() + tcn.confNotifications[*ntfn.TxID] = confSet } - ntfns[ntfn.ConfID] = ntfn + confSet.ntfns[ntfn.ConfID] = ntfn - return nil + switch confSet.rescanStatus { + + // A prior rescan has already completed and we are actively watching at + // tip for this txid. + case rescanComplete: + return nil, nil + + // A rescan is already in progress, return here to prevent dispatching + // another. When the scan returns, this notifications details will be + // updated as well. + case rescanPending: + return nil, nil + + // If no rescan has been dispatched, attempt to do so now. + case rescanNotStarted: + } + + // If the provided or cached height hint indicates that the transaction + // is to be confirmed at a height greater than the conf notifier's + // current height, we'll refrain from spawning a historical dispatch. + if startHeight > tcn.currentHeight { + // Set the rescan status to complete, which will allow the conf + // notifier to start delivering messages for this set + // immediately. + confSet.rescanStatus = rescanComplete + return nil, nil + } + + // Set this confSet's status to pending, ensuring subsequent + // registrations don't also attempt a dispatch. + confSet.rescanStatus = rescanPending } // UpdateConfDetails attempts to update the confirmation details for an active @@ -198,19 +241,21 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, // First, we'll determine whether we have an active notification for // this transaction with the given ID. - ntfns, ok := tcn.confNotifications[txid] + confSet, ok := tcn.confNotifications[txid] if !ok { - return fmt.Errorf("no notifications found for txid %v", txid) + return fmt.Errorf("no notification found with TxID %v", txid) } - ntfn, ok := ntfns[clientID] - if !ok { - return fmt.Errorf("no notification found with ID %v", clientID) - } + // The historical dispatch has been completed for this confSet. We'll + // update the rescan status and cache any details that were found. If + // the details are nil, that implies we did not find them and will + // continue to watch for them at tip. + confSet.rescanStatus = rescanComplete - // If the notification has already recognized that the transaction - // confirmed, there's nothing left for us to do. - if ntfn.details != nil { + // The notifier has yet to reach the height at which the transaction was + // included in a block, so we should defer until handling it then within + // ConnectTip. + if details == nil || details.BlockHeight > tcn.currentHeight { return nil } @@ -222,67 +267,72 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, details.BlockHeight, txid, err) } - // The notifier has yet to reach the height at which the transaction was - // included in a block, so we should defer until handling it then within - // ConnectTip. - if details == nil || details.BlockHeight > tcn.currentHeight { - return nil - } - - ntfn.details = details - - // Now, we'll examine whether the transaction of this notification - // request has reached its required number of confirmations. If it has, - // we'll disaptch a confirmation notification to the caller. - confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 - if confHeight <= tcn.currentHeight { - Log.Infof("Dispatching %v conf notification for %v", - ntfn.NumConfirmations, ntfn.TxID) - - // We'll send a 0 value to the Updates channel, indicating that - // the transaction has already been confirmed. - select { - case ntfn.Event.Updates <- 0: - case <-tcn.quit: - return ErrTxConfNotifierExiting + // Update the conf details of all ntfns that don't yet have them. + for _, ntfn := range confSet.ntfns { + if ntfn.details != nil { + continue } - select { - case ntfn.Event.Confirmed <- details: - ntfn.dispatched = true - case <-tcn.quit: - return ErrTxConfNotifierExiting - } - } else { - // Otherwise, we'll keep track of the notification request by - // the height at which we should dispatch the confirmation - // notification. - ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] - if !exists { - ntfnSet = make(map[*ConfNtfn]struct{}) - tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet - } - ntfnSet[ntfn] = struct{}{} + ntfn.details = details - // We'll also send an update to the client of how many - // confirmations are left for the transaction to be confirmed. - numConfsLeft := confHeight - tcn.currentHeight - select { - case ntfn.Event.Updates <- numConfsLeft: - case <-tcn.quit: - return ErrTxConfNotifierExiting - } - } + // Now, we'll examine whether the transaction of this + // notification request has reached its required number of + // confirmations. If it has, we'll dispatch a confirmation + // notification to the caller. + confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 + if confHeight <= tcn.currentHeight { + Log.Infof("Dispatching %v conf notification for %v", + ntfn.NumConfirmations, ntfn.TxID) - // As a final check, we'll also watch the transaction if it's still - // possible for it to get reorged out of the chain. - if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { - txSet, exists := tcn.txsByInitialHeight[details.BlockHeight] - if !exists { - txSet = make(map[chainhash.Hash]struct{}) - tcn.txsByInitialHeight[details.BlockHeight] = txSet + // We'll send a 0 value to the Updates channel, + // indicating that the transaction has already been + // confirmed. + select { + case ntfn.Event.Updates <- 0: + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + + select { + case ntfn.Event.Confirmed <- details: + ntfn.dispatched = true + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + } else { + // Otherwise, we'll keep track of the notification + // request by the height at which we should dispatch the + // confirmation notification. + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + ntfnSet = make(map[*ConfNtfn]struct{}) + tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet + } + ntfnSet[ntfn] = struct{}{} + + // We'll also send an update to the client of how many + // confirmations are left for the transaction to be + // confirmed. + numConfsLeft := confHeight - tcn.currentHeight + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + } + + // As a final check, we'll also watch the transaction if it's + // still possible for it to get reorged out of the chain. + blockHeight := details.BlockHeight + reorgSafeHeight := blockHeight + tcn.reorgSafetyLimit + if reorgSafeHeight > tcn.currentHeight { + txSet, exists := tcn.txsByInitialHeight[blockHeight] + if !exists { + txSet = make(map[chainhash.Hash]struct{}) + tcn.txsByInitialHeight[blockHeight] = txSet + } + txSet[txid] = struct{}{} } - txSet[txid] = struct{}{} } return nil @@ -320,7 +370,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // handled correctly. for _, tx := range txns { txHash := tx.Hash() - for _, ntfn := range tcn.confNotifications[*txHash] { + confSet, ok := tcn.confNotifications[*txHash] + if !ok { + continue + } + + for _, ntfn := range confSet.ntfns { ntfn.details = &TxConfirmation{ BlockHash: blockHash, BlockHeight: blockHeight, @@ -356,7 +411,11 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, txsToUpdateHints = append(txsToUpdateHints, confirmedTx) } out: - for maybeUnconfirmedTx := range tcn.confNotifications { + for maybeUnconfirmedTx, confSet := range tcn.confNotifications { + if confSet.rescanStatus != rescanComplete { + continue + } + for height, confirmedTxs := range tcn.txsByInitialHeight { // Skip the transactions that confirmed at the new block // height as those have already been added. @@ -391,7 +450,8 @@ out: // this new height. for _, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { - for _, ntfn := range tcn.confNotifications[txHash] { + confSet := tcn.confNotifications[txHash] + for _, ntfn := range confSet.ntfns { // If the notification hasn't learned about the // confirmation of its transaction yet (in the // case of historical confirmations), we'll skip @@ -491,7 +551,8 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // clients is always non-blocking. for initialHeight, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { - for _, ntfn := range tcn.confNotifications[txHash] { + confSet := tcn.confNotifications[txHash] + for _, ntfn := range confSet.ntfns { // First, we'll attempt to drain an update // from each notification to ensure sends to the // Updates channel are always non-blocking. @@ -544,6 +605,17 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { continue } delete(ntfnSet, ntfn) + + // Intuitively, we should also remove + // the txHash from confNotifications if + // the ntfnSet is now empty. However, we + // will not do so since we may want to + // continue rewinding the height hints + // for this txid. + // + // NOTE(conner): safe to delete if + // blockHeight is below client-provided + // height hint? } } } @@ -565,8 +637,8 @@ func (tcn *TxConfNotifier) TearDown() { close(tcn.quit) - for _, ntfns := range tcn.confNotifications { - for _, ntfn := range ntfns { + for _, confSet := range tcn.confNotifications { + for _, ntfn := range confSet.ntfns { if ntfn.dispatched { continue }