chainntnfs/txconfnotifier: isolate scanning ntfns

This commit is contained in:
Conner Fromknecht 2018-08-24 19:29:55 -07:00
parent 2f0b5596da
commit 1babec971f
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

View File

@ -77,7 +77,7 @@ type TxConfNotifier struct {
// confNotifications is an index of notification requests by transaction // confNotifications is an index of notification requests by transaction
// hash. // hash.
confNotifications map[chainhash.Hash]map[uint64]*ConfNtfn confNotifications map[chainhash.Hash]*confNtfnSet
// txsByInitialHeight is an index of watched transactions by the height // txsByInitialHeight is an index of watched transactions by the height
// that they are included at in the blockchain. This is tracked so that // that they are included at in the blockchain. This is tracked so that
@ -122,6 +122,26 @@ const (
rescanComplete rescanComplete
) )
// confNtfnSet holds all known, registered confirmation notifications for a
// single txid. If duplicates notifications are requested, only one historical
// dispatch will be spawned to ensure redundant scans are not permitted. A
// single conf detail will be constructed and dispatched to all interested
// clients.
type confNtfnSet struct {
ntfns map[uint64]*ConfNtfn
rescanStatus rescanState
details *TxConfirmation
}
// newConfNtfnSet constructs a fresh confNtfnSet for a group of clients
// interested in a notification for a particular txid.
func newConfNtfnSet() *confNtfnSet {
return &confNtfnSet{
ntfns: make(map[uint64]*ConfNtfn),
rescanStatus: rescanNotStarted,
}
}
// NewTxConfNotifier creates a TxConfNotifier. The current height of the // NewTxConfNotifier creates a TxConfNotifier. The current height of the
// blockchain is accepted as a parameter. // blockchain is accepted as a parameter.
func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32,
@ -130,7 +150,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32,
return &TxConfNotifier{ return &TxConfNotifier{
currentHeight: startHeight, currentHeight: startHeight,
reorgSafetyLimit: reorgSafetyLimit, reorgSafetyLimit: reorgSafetyLimit,
confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), confNotifications: make(map[chainhash.Hash]*confNtfnSet),
txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}),
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
hintCache: hintCache, hintCache: hintCache,
@ -145,35 +165,58 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32,
// the confirmation details must be provided with the UpdateConfDetails method, // the confirmation details must be provided with the UpdateConfDetails method,
// otherwise we will wait for the transaction to confirm even though it already // otherwise we will wait for the transaction to confirm even though it already
// has. // has.
func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) (bool, uint32, error) {
select { select {
case <-tcn.quit: case <-tcn.quit:
return ErrTxConfNotifierExiting return false, 0, ErrTxConfNotifierExiting
default: default:
} }
tcn.Lock() tcn.Lock()
defer tcn.Unlock() defer tcn.Unlock()
ntfns, ok := tcn.confNotifications[*ntfn.TxID] // TODO(conner): promote immediately to confNotifications if a
if !ok { // historical dispatch has already completed.
ntfns = make(map[uint64]*ConfNtfn)
tcn.confNotifications[*ntfn.TxID] = ntfns
err := tcn.hintCache.CommitConfirmHint( confSet, ok := tcn.confNotifications[*ntfn.TxID]
tcn.currentHeight, *ntfn.TxID, if !ok {
) confSet = newConfNtfnSet()
if err != nil { tcn.confNotifications[*ntfn.TxID] = confSet
// The error is not fatal, so we should not return an
// error to the caller.
Log.Errorf("Unable to update confirm hint to %d for "+
"%v: %v", tcn.currentHeight, *ntfn.TxID, err)
}
} }
ntfns[ntfn.ConfID] = ntfn confSet.ntfns[ntfn.ConfID] = ntfn
return nil switch confSet.rescanStatus {
// A prior rescan has already completed and we are actively watching at
// tip for this txid.
case rescanComplete:
return nil, nil
// A rescan is already in progress, return here to prevent dispatching
// another. When the scan returns, this notifications details will be
// updated as well.
case rescanPending:
return nil, nil
// If no rescan has been dispatched, attempt to do so now.
case rescanNotStarted:
}
// If the provided or cached height hint indicates that the transaction
// is to be confirmed at a height greater than the conf notifier's
// current height, we'll refrain from spawning a historical dispatch.
if startHeight > tcn.currentHeight {
// Set the rescan status to complete, which will allow the conf
// notifier to start delivering messages for this set
// immediately.
confSet.rescanStatus = rescanComplete
return nil, nil
}
// Set this confSet's status to pending, ensuring subsequent
// registrations don't also attempt a dispatch.
confSet.rescanStatus = rescanPending
} }
// UpdateConfDetails attempts to update the confirmation details for an active // UpdateConfDetails attempts to update the confirmation details for an active
@ -198,19 +241,21 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash,
// First, we'll determine whether we have an active notification for // First, we'll determine whether we have an active notification for
// this transaction with the given ID. // this transaction with the given ID.
ntfns, ok := tcn.confNotifications[txid] confSet, ok := tcn.confNotifications[txid]
if !ok { if !ok {
return fmt.Errorf("no notifications found for txid %v", txid) return fmt.Errorf("no notification found with TxID %v", txid)
} }
ntfn, ok := ntfns[clientID] // The historical dispatch has been completed for this confSet. We'll
if !ok { // update the rescan status and cache any details that were found. If
return fmt.Errorf("no notification found with ID %v", clientID) // the details are nil, that implies we did not find them and will
} // continue to watch for them at tip.
confSet.rescanStatus = rescanComplete
// If the notification has already recognized that the transaction // The notifier has yet to reach the height at which the transaction was
// confirmed, there's nothing left for us to do. // included in a block, so we should defer until handling it then within
if ntfn.details != nil { // ConnectTip.
if details == nil || details.BlockHeight > tcn.currentHeight {
return nil return nil
} }
@ -222,67 +267,72 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash,
details.BlockHeight, txid, err) details.BlockHeight, txid, err)
} }
// The notifier has yet to reach the height at which the transaction was // Update the conf details of all ntfns that don't yet have them.
// included in a block, so we should defer until handling it then within for _, ntfn := range confSet.ntfns {
// ConnectTip. if ntfn.details != nil {
if details == nil || details.BlockHeight > tcn.currentHeight { continue
return nil
}
ntfn.details = details
// Now, we'll examine whether the transaction of this notification
// request has reached its required number of confirmations. If it has,
// we'll disaptch a confirmation notification to the caller.
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
if confHeight <= tcn.currentHeight {
Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID)
// We'll send a 0 value to the Updates channel, indicating that
// the transaction has already been confirmed.
select {
case ntfn.Event.Updates <- 0:
case <-tcn.quit:
return ErrTxConfNotifierExiting
} }
select { ntfn.details = details
case ntfn.Event.Confirmed <- details:
ntfn.dispatched = true
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
} else {
// Otherwise, we'll keep track of the notification request by
// the height at which we should dispatch the confirmation
// notification.
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if !exists {
ntfnSet = make(map[*ConfNtfn]struct{})
tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet
}
ntfnSet[ntfn] = struct{}{}
// We'll also send an update to the client of how many // Now, we'll examine whether the transaction of this
// confirmations are left for the transaction to be confirmed. // notification request has reached its required number of
numConfsLeft := confHeight - tcn.currentHeight // confirmations. If it has, we'll dispatch a confirmation
select { // notification to the caller.
case ntfn.Event.Updates <- numConfsLeft: confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
case <-tcn.quit: if confHeight <= tcn.currentHeight {
return ErrTxConfNotifierExiting Log.Infof("Dispatching %v conf notification for %v",
} ntfn.NumConfirmations, ntfn.TxID)
}
// As a final check, we'll also watch the transaction if it's still // We'll send a 0 value to the Updates channel,
// possible for it to get reorged out of the chain. // indicating that the transaction has already been
if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { // confirmed.
txSet, exists := tcn.txsByInitialHeight[details.BlockHeight] select {
if !exists { case ntfn.Event.Updates <- 0:
txSet = make(map[chainhash.Hash]struct{}) case <-tcn.quit:
tcn.txsByInitialHeight[details.BlockHeight] = txSet return ErrTxConfNotifierExiting
}
select {
case ntfn.Event.Confirmed <- details:
ntfn.dispatched = true
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
} else {
// Otherwise, we'll keep track of the notification
// request by the height at which we should dispatch the
// confirmation notification.
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if !exists {
ntfnSet = make(map[*ConfNtfn]struct{})
tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet
}
ntfnSet[ntfn] = struct{}{}
// We'll also send an update to the client of how many
// confirmations are left for the transaction to be
// confirmed.
numConfsLeft := confHeight - tcn.currentHeight
select {
case ntfn.Event.Updates <- numConfsLeft:
case <-tcn.quit:
return ErrTxConfNotifierExiting
}
}
// As a final check, we'll also watch the transaction if it's
// still possible for it to get reorged out of the chain.
blockHeight := details.BlockHeight
reorgSafeHeight := blockHeight + tcn.reorgSafetyLimit
if reorgSafeHeight > tcn.currentHeight {
txSet, exists := tcn.txsByInitialHeight[blockHeight]
if !exists {
txSet = make(map[chainhash.Hash]struct{})
tcn.txsByInitialHeight[blockHeight] = txSet
}
txSet[txid] = struct{}{}
} }
txSet[txid] = struct{}{}
} }
return nil return nil
@ -320,7 +370,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
// handled correctly. // handled correctly.
for _, tx := range txns { for _, tx := range txns {
txHash := tx.Hash() txHash := tx.Hash()
for _, ntfn := range tcn.confNotifications[*txHash] { confSet, ok := tcn.confNotifications[*txHash]
if !ok {
continue
}
for _, ntfn := range confSet.ntfns {
ntfn.details = &TxConfirmation{ ntfn.details = &TxConfirmation{
BlockHash: blockHash, BlockHash: blockHash,
BlockHeight: blockHeight, BlockHeight: blockHeight,
@ -356,7 +411,11 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
txsToUpdateHints = append(txsToUpdateHints, confirmedTx) txsToUpdateHints = append(txsToUpdateHints, confirmedTx)
} }
out: out:
for maybeUnconfirmedTx := range tcn.confNotifications { for maybeUnconfirmedTx, confSet := range tcn.confNotifications {
if confSet.rescanStatus != rescanComplete {
continue
}
for height, confirmedTxs := range tcn.txsByInitialHeight { for height, confirmedTxs := range tcn.txsByInitialHeight {
// Skip the transactions that confirmed at the new block // Skip the transactions that confirmed at the new block
// height as those have already been added. // height as those have already been added.
@ -391,7 +450,8 @@ out:
// this new height. // this new height.
for _, txHashes := range tcn.txsByInitialHeight { for _, txHashes := range tcn.txsByInitialHeight {
for txHash := range txHashes { for txHash := range txHashes {
for _, ntfn := range tcn.confNotifications[txHash] { confSet := tcn.confNotifications[txHash]
for _, ntfn := range confSet.ntfns {
// If the notification hasn't learned about the // If the notification hasn't learned about the
// confirmation of its transaction yet (in the // confirmation of its transaction yet (in the
// case of historical confirmations), we'll skip // case of historical confirmations), we'll skip
@ -491,7 +551,8 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
// clients is always non-blocking. // clients is always non-blocking.
for initialHeight, txHashes := range tcn.txsByInitialHeight { for initialHeight, txHashes := range tcn.txsByInitialHeight {
for txHash := range txHashes { for txHash := range txHashes {
for _, ntfn := range tcn.confNotifications[txHash] { confSet := tcn.confNotifications[txHash]
for _, ntfn := range confSet.ntfns {
// First, we'll attempt to drain an update // First, we'll attempt to drain an update
// from each notification to ensure sends to the // from each notification to ensure sends to the
// Updates channel are always non-blocking. // Updates channel are always non-blocking.
@ -544,6 +605,17 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
continue continue
} }
delete(ntfnSet, ntfn) delete(ntfnSet, ntfn)
// Intuitively, we should also remove
// the txHash from confNotifications if
// the ntfnSet is now empty. However, we
// will not do so since we may want to
// continue rewinding the height hints
// for this txid.
//
// NOTE(conner): safe to delete if
// blockHeight is below client-provided
// height hint?
} }
} }
} }
@ -565,8 +637,8 @@ func (tcn *TxConfNotifier) TearDown() {
close(tcn.quit) close(tcn.quit)
for _, ntfns := range tcn.confNotifications { for _, confSet := range tcn.confNotifications {
for _, ntfn := range ntfns { for _, ntfn := range confSet.ntfns {
if ntfn.dispatched { if ntfn.dispatched {
continue continue
} }