Merge pull request #1628 from wpaulino/async-rescans

chainntnfs: handle historical confs and spends asynchronously
This commit is contained in:
Olaoluwa Osuntokun 2018-07-31 21:05:34 -07:00 committed by GitHub
commit ad7f87ef18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 415 additions and 225 deletions

View File

@ -19,7 +19,6 @@ import (
) )
const ( const (
// notifierType uniquely identifies this concrete implementation of the // notifierType uniquely identifies this concrete implementation of the
// ChainNotifier interface. // ChainNotifier interface.
notifierType = "bitcoind" notifierType = "bitcoind"
@ -35,6 +34,11 @@ var (
// measure a spend notification when notifier is already stopped. // measure a spend notification when notifier is already stopped.
ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " + ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " +
"while attempting to register for spend notification.") "while attempting to register for spend notification.")
// ErrTransactionNotFound is an error returned when we attempt to find a
// transaction by manually scanning the chain within a specific range
// but it is not found.
ErrTransactionNotFound = errors.New("transaction not found within range")
) )
// chainUpdate encapsulates an update to the current main chain. This struct is // chainUpdate encapsulates an update to the current main chain. This struct is
@ -54,6 +58,7 @@ type chainUpdate struct {
// chain client. Multiple concurrent clients are supported. All notifications // chain client. Multiple concurrent clients are supported. All notifications
// are achieved via non-blocking sends on client channels. // are achieved via non-blocking sends on client channels.
type BitcoindNotifier struct { type BitcoindNotifier struct {
confClientCounter uint64 // To be used atomically.
spendClientCounter uint64 // To be used atomically. spendClientCounter uint64 // To be used atomically.
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
@ -236,33 +241,46 @@ out:
b.spendNotifications[op] = make(map[uint64]*spendNotification) b.spendNotifications[op] = make(map[uint64]*spendNotification)
} }
b.spendNotifications[op][msg.spendID] = msg b.spendNotifications[op][msg.spendID] = msg
b.chainConn.NotifySpent([]*wire.OutPoint{&op})
case *confirmationNotification: case *confirmationNotification:
chainntnfs.Log.Infof("New confirmation "+ chainntnfs.Log.Infof("New confirmation "+
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations) msg.TxID, msg.NumConfirmations)
_, currentHeight, err := b.chainConn.GetBestBlock() currentHeight := uint32(bestHeight)
// Look up whether the transaction is already
// included in the active chain. We'll do this
// in a goroutine to prevent blocking
// potentially long rescans.
b.wg.Add(1)
go func() {
defer b.wg.Done()
confDetails, err := b.historicalConfDetails(
msg.TxID, msg.heightHint,
currentHeight,
)
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
return
} }
// Lookup whether the transaction is already included in the if confDetails != nil {
// active chain. err := b.txConfNotifier.UpdateConfDetails(
txConf, err := b.historicalConfDetails( *msg.TxID, msg.ConfID,
msg.TxID, msg.heightHint, uint32(currentHeight), confDetails,
) )
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
} }
}()
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
case chain.RelevantTx: case chain.RelevantTx:
b.handleRelevantTx(msg, bestHeight) b.handleRelevantTx(msg, bestHeight)
} }
@ -473,6 +491,14 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash,
// Begin scanning blocks at every height to determine where the // Begin scanning blocks at every height to determine where the
// transaction was included in. // transaction was included in.
for height := heightHint; height <= currentHeight; height++ { for height := heightHint; height <= currentHeight; height++ {
// Ensure we haven't been requested to shut down before
// processing the next height.
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
default:
}
blockHash, err := b.chainConn.GetBlockHash(int64(height)) blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to get hash from block "+ return nil, fmt.Errorf("unable to get hash from block "+
@ -632,43 +658,22 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, err return nil, err
} }
out: // In order to ensure we don't block the caller on what
for i := startHeight; i <= endHeight; i++ { // may be a long rescan, we'll launch a goroutine to do
blockHash, err := b.chainConn.GetBlockHash(int64(i)) // so in the background.
b.wg.Add(1)
go func() {
defer b.wg.Done()
err := b.dispatchSpendDetailsManually(
*outpoint, startHeight, endHeight,
)
if err != nil { if err != nil {
return nil, err chainntnfs.Log.Errorf("Rescan for spend "+
} "notification txout(%x) "+
block, err := b.chainConn.GetBlock(blockHash) "failed: %v", outpoint, err)
if err != nil {
return nil, err
}
for _, tx := range block.Transactions {
for _, in := range tx.TxIn {
if in.PreviousOutPoint == *outpoint {
relTx := chain.RelevantTx{
TxRecord: &wtxmgr.TxRecord{
MsgTx: *tx,
Hash: tx.TxHash(),
Received: block.Header.Timestamp,
},
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: block.BlockHash(),
Height: i,
},
Time: block.Header.Timestamp,
},
}
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- relTx:
}
break out
}
}
}
} }
}()
} }
} }
@ -683,8 +688,9 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// Submit spend cancellation to notification dispatcher. // Submit spend cancellation to notification dispatcher.
select { select {
case b.notificationCancels <- cancel: case b.notificationCancels <- cancel:
// Cancellation is being handled, drain the spend chan until it is // Cancellation is being handled, drain the
// closed before yielding to the caller. // spend chan until it is closed before yielding
// to the caller.
for { for {
select { select {
case _, ok := <-ntfn.spendChan: case _, ok := <-ntfn.spendChan:
@ -701,6 +707,72 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
}, nil }, nil
} }
// disaptchSpendDetailsManually attempts to manually scan the chain within the
// given height range for a transaction that spends the given outpoint. If one
// is found, it's spending details are sent to the notifier dispatcher, which
// will then dispatch the notification to all of its clients.
func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint,
startHeight, endHeight int32) error {
// Begin scanning blocks at every height to determine if the outpoint
// was spent.
for height := startHeight; height <= endHeight; height++ {
// Ensure we haven't been requested to shut down before
// processing the next height.
select {
case <-b.quit:
return ErrChainNotifierShuttingDown
default:
}
blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil {
return err
}
block, err := b.chainConn.GetBlock(blockHash)
if err != nil {
return err
}
for _, tx := range block.Transactions {
for _, in := range tx.TxIn {
if in.PreviousOutPoint != op {
continue
}
// If this transaction input spends the
// outpoint, we'll gather the details of the
// spending transaction and dispatch a spend
// notification to our clients.
relTx := chain.RelevantTx{
TxRecord: &wtxmgr.TxRecord{
MsgTx: *tx,
Hash: tx.TxHash(),
Received: block.Header.Timestamp,
},
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: *blockHash,
Height: height,
},
Time: block.Header.Timestamp,
},
}
select {
case b.notificationRegistry <- relTx:
case <-b.quit:
return ErrChainNotifierShuttingDown
}
return nil
}
}
}
return ErrTransactionNotFound
}
// confirmationNotification represents a client's intent to receive a // confirmationNotification represents a client's intent to receive a
// notification once the target txid reaches numConfirmations confirmations. // notification once the target txid reaches numConfirmations confirmations.
type confirmationNotification struct { type confirmationNotification struct {
@ -716,6 +788,7 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
ntfn := &confirmationNotification{ ntfn := &confirmationNotification{
ConfNtfn: chainntnfs.ConfNtfn{ ConfNtfn: chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&b.confClientCounter, 1),
TxID: txid, TxID: txid,
NumConfirmations: numConfs, NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(numConfs), Event: chainntnfs.NewConfirmationEvent(numConfs),
@ -723,11 +796,15 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
heightHint: heightHint, heightHint: heightHint,
} }
if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil {
return nil, err
}
select { select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn: case b.notificationRegistry <- ntfn:
return ntfn.Event, nil return ntfn.Event, nil
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
} }
} }

View File

@ -61,6 +61,7 @@ type txUpdate struct {
// notifications. Multiple concurrent clients are supported. All notifications // notifications. Multiple concurrent clients are supported. All notifications
// are achieved via non-blocking sends on client channels. // are achieved via non-blocking sends on client channels.
type BtcdNotifier struct { type BtcdNotifier struct {
confClientCounter uint64 // To be used aotmically.
spendClientCounter uint64 // To be used atomically. spendClientCounter uint64 // To be used atomically.
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
@ -298,24 +299,42 @@ out:
b.spendNotifications[op] = make(map[uint64]*spendNotification) b.spendNotifications[op] = make(map[uint64]*spendNotification)
} }
b.spendNotifications[op][msg.spendID] = msg b.spendNotifications[op][msg.spendID] = msg
case *confirmationNotification: case *confirmationNotification:
chainntnfs.Log.Infof("New confirmation "+ chainntnfs.Log.Infof("New confirmation "+
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations) msg.TxID, msg.NumConfirmations)
// Lookup whether the transaction is already included in the bestHeight := uint32(currentHeight)
// active chain.
txConf, err := b.historicalConfDetails( // Look up whether the transaction is already
msg.TxID, msg.heightHint, uint32(currentHeight), // included in the active chain. We'll do this
// in a goroutine to prevent blocking
// potentially long rescans.
b.wg.Add(1)
go func() {
defer b.wg.Done()
confDetails, err := b.historicalConfDetails(
msg.TxID, msg.heightHint,
bestHeight,
)
if err != nil {
chainntnfs.Log.Error(err)
return
}
if confDetails != nil {
err = b.txConfNotifier.UpdateConfDetails(
*msg.TxID, msg.ConfID,
confDetails,
) )
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
} }
}()
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
@ -532,6 +551,14 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash,
// Begin scanning blocks at every height to determine where the // Begin scanning blocks at every height to determine where the
// transaction was included in. // transaction was included in.
for height := heightHint; height <= currentHeight; height++ { for height := heightHint; height <= currentHeight; height++ {
// Ensure we haven't been requested to shut down before
// processing the next height.
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
default:
}
blockHash, err := b.chainConn.GetBlockHash(int64(height)) blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to get hash from block "+ return nil, fmt.Errorf("unable to get hash from block "+
@ -800,6 +827,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
ntfn := &confirmationNotification{ ntfn := &confirmationNotification{
ConfNtfn: chainntnfs.ConfNtfn{ ConfNtfn: chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&b.confClientCounter, 1),
TxID: txid, TxID: txid,
NumConfirmations: numConfs, NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(numConfs), Event: chainntnfs.NewConfirmationEvent(numConfs),
@ -807,11 +835,15 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
heightHint: heightHint, heightHint: heightHint,
} }
if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil {
return nil, err
}
select { select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn: case b.notificationRegistry <- ntfn:
return ntfn.Event, nil return ntfn.Event, nil
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
} }
} }

View File

@ -867,69 +867,14 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness,
// concrete implementations. // concrete implementations.
// //
// To do so, we first create a new output to our test target address. // To do so, we first create a new output to our test target address.
txid, err := getTestTxId(miner) outpoint, pkScript := createSpendableOutput(miner, t)
if err != nil {
t.Fatalf("unable to create test addr: %v", err)
}
err = waitForMempoolTx(miner, txid) // We'll then spend this output and broadcast the spend transaction.
if err != nil { spendingTx := createSpendTx(outpoint, pkScript, t)
t.Fatalf("tx not relayed to miner: %v", err)
}
// Mine a single block which should include that txid above.
if _, err := miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate single block: %v", err)
}
// Now that we have the txid, fetch the transaction itself.
wrappedTx, err := miner.Node.GetRawTransaction(txid)
if err != nil {
t.Fatalf("unable to get new tx: %v", err)
}
tx := wrappedTx.MsgTx()
// Locate the output index sent to us. We need this so we can construct
// a spending txn below.
outIndex := -1
var pkScript []byte
for i, txOut := range tx.TxOut {
if bytes.Contains(txOut.PkScript, testAddr.ScriptAddress()) {
pkScript = txOut.PkScript
outIndex = i
break
}
}
if outIndex == -1 {
t.Fatalf("unable to locate new output")
}
// Now that we've found the output index, register for a spentness
// notification for the newly created output.
outpoint := wire.NewOutPoint(txid, uint32(outIndex))
// Next, create a new transaction spending that output.
spendingTx := wire.NewMsgTx(1)
spendingTx.AddTxIn(&wire.TxIn{
PreviousOutPoint: *outpoint,
})
spendingTx.AddTxOut(&wire.TxOut{
Value: 1e8,
PkScript: pkScript,
})
sigScript, err := txscript.SignatureScript(spendingTx, 0, pkScript,
txscript.SigHashAll, privKey, true)
if err != nil {
t.Fatalf("unable to sign tx: %v", err)
}
spendingTx.TxIn[0].SignatureScript = sigScript
// Broadcast our spending transaction.
spenderSha, err := miner.Node.SendRawTransaction(spendingTx, true) spenderSha, err := miner.Node.SendRawTransaction(spendingTx, true)
if err != nil { if err != nil {
t.Fatalf("unable to broadcast tx: %v", err) t.Fatalf("unable to broadcast tx: %v", err)
} }
err = waitForMempoolTx(miner, spenderSha) err = waitForMempoolTx(miner, spenderSha)
if err != nil { if err != nil {
t.Fatalf("tx not relayed to miner: %v", err) t.Fatalf("tx not relayed to miner: %v", err)

View File

@ -48,12 +48,13 @@ var (
// TODO(roasbeef): heavily consolidate with NeutrinoNotifier code // TODO(roasbeef): heavily consolidate with NeutrinoNotifier code
// * maybe combine into single package? // * maybe combine into single package?
type NeutrinoNotifier struct { type NeutrinoNotifier struct {
started int32 // To be used atomically. confClientCounter uint64 // To be used atomically.
stopped int32 // To be used atomically.
spendClientCounter uint64 // To be used atomically. spendClientCounter uint64 // To be used atomically.
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically.
stopped int32 // To be used atomically.
heightMtx sync.RWMutex heightMtx sync.RWMutex
bestHeight uint32 bestHeight uint32
@ -306,31 +307,48 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
currentHeight := n.bestHeight currentHeight := n.bestHeight
n.heightMtx.RUnlock() n.heightMtx.RUnlock()
// Lookup whether the transaction is already included in the // Look up whether the transaction is already
// active chain. // included in the active chain. We'll do this
txConf, err := n.historicalConfDetails(msg.TxID, currentHeight, // in a goroutine to prevent blocking
msg.heightHint) // potentially long rescans.
n.wg.Add(1)
go func() {
defer n.wg.Done()
confDetails, err := n.historicalConfDetails(
msg.TxID, currentHeight,
msg.heightHint,
)
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
if txConf == nil { if confDetails != nil {
// If we can't fully dispatch confirmation, err := n.txConfNotifier.UpdateConfDetails(
// then we'll update our filter so we can be *msg.TxID, msg.ConfID,
// notified of its future initial confirmation. confDetails,
)
if err != nil {
chainntnfs.Log.Error(err)
}
return
}
// If we can't fully dispatch
// confirmation, then we'll update our
// filter so we can be notified of its
// future initial confirmation.
rescanUpdate := []neutrino.UpdateOption{ rescanUpdate := []neutrino.UpdateOption{
neutrino.AddTxIDs(*msg.TxID), neutrino.AddTxIDs(*msg.TxID),
neutrino.Rewind(currentHeight), neutrino.Rewind(currentHeight),
} }
if err := n.chainView.Update(rescanUpdate...); err != nil { err = n.chainView.Update(rescanUpdate...)
chainntnfs.Log.Errorf("unable to update rescan: %v", err)
}
}
err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Errorf("Unable "+
"to update rescan: %v",
err)
} }
}()
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
@ -400,6 +418,14 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash,
// Starting from the height hint, we'll walk forwards in the chain to // Starting from the height hint, we'll walk forwards in the chain to
// see if this transaction has already been confirmed. // see if this transaction has already been confirmed.
for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ {
// Ensure we haven't been requested to shut down before
// processing the next height.
select {
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
default:
}
// First, we'll fetch the block header for this height so we // First, we'll fetch the block header for this height so we
// can compute the current block hash. // can compute the current block hash.
header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight)
@ -696,6 +722,7 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
ntfn := &confirmationsNotification{ ntfn := &confirmationsNotification{
ConfNtfn: chainntnfs.ConfNtfn{ ConfNtfn: chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&n.confClientCounter, 1),
TxID: txid, TxID: txid,
NumConfirmations: numConfs, NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(numConfs), Event: chainntnfs.NewConfirmationEvent(numConfs),
@ -703,11 +730,15 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
heightHint: heightHint, heightHint: heightHint,
} }
if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil {
return nil, err
}
select { select {
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
case n.notificationRegistry <- ntfn: case n.notificationRegistry <- ntfn:
return ntfn.Event, nil return ntfn.Event, nil
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
} }
} }

View File

@ -3,15 +3,26 @@ package chainntnfs
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
) )
var (
// ErrTxConfNotifierExiting is an error returned when attempting to
// interact with the TxConfNotifier but it been shut down.
ErrTxConfNotifierExiting = errors.New("TxConfNotifier is exiting")
)
// ConfNtfn represents a notifier client's request to receive a notification // ConfNtfn represents a notifier client's request to receive a notification
// once the target transaction gets sufficient confirmations. The client is // once the target transaction gets sufficient confirmations. The client is
// asynchronously notified via the ConfirmationEvent channels. // asynchronously notified via the ConfirmationEvent channels.
type ConfNtfn struct { type ConfNtfn struct {
// ConfID uniquely identifies the confirmation notification request for
// the specified transaction.
ConfID uint64
// TxID is the hash of the transaction for which confirmation notifications // TxID is the hash of the transaction for which confirmation notifications
// are requested. // are requested.
TxID *chainhash.Hash TxID *chainhash.Hash
@ -66,7 +77,7 @@ type TxConfNotifier struct {
// confNotifications is an index of notification requests by transaction // confNotifications is an index of notification requests by transaction
// hash. // hash.
confNotifications map[chainhash.Hash][]*ConfNtfn confNotifications map[chainhash.Hash]map[uint64]*ConfNtfn
// txsByInitialHeight is an index of watched transactions by the height // txsByInitialHeight is an index of watched transactions by the height
// that they are included at in the blockchain. This is tracked so that // that they are included at in the blockchain. This is tracked so that
@ -81,6 +92,8 @@ type TxConfNotifier struct {
// quit is closed in order to signal that the notifier is gracefully // quit is closed in order to signal that the notifier is gracefully
// exiting. // exiting.
quit chan struct{} quit chan struct{}
sync.Mutex
} }
// NewTxConfNotifier creates a TxConfNotifier. The current height of the // NewTxConfNotifier creates a TxConfNotifier. The current height of the
@ -89,7 +102,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif
return &TxConfNotifier{ return &TxConfNotifier{
currentHeight: startHeight, currentHeight: startHeight,
reorgSafetyLimit: reorgSafetyLimit, reorgSafetyLimit: reorgSafetyLimit,
confNotifications: make(map[chainhash.Hash][]*ConfNtfn), confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn),
txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}),
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -98,27 +111,83 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotif
// Register handles a new notification request. The client will be notified when // Register handles a new notification request. The client will be notified when
// the transaction gets a sufficient number of confirmations on the blockchain. // the transaction gets a sufficient number of confirmations on the blockchain.
// If the transaction has already been included in a block on the chain, the //
// confirmation details must be given as the txConf argument, otherwise it // NOTE: If the transaction has already been included in a block on the chain,
// should be nil. If the transaction already has the sufficient number of // the confirmation details must be provided with the UpdateConfDetails method,
// confirmations, this dispatches the notification immediately. // otherwise we will wait for the transaction to confirm even though it already
func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) error { // has.
func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error {
select { select {
case <-tcn.quit: case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
default: default:
} }
if txConf == nil || txConf.BlockHeight > tcn.currentHeight { tcn.Lock()
// Transaction is unconfirmed. defer tcn.Unlock()
tcn.confNotifications[*ntfn.TxID] =
append(tcn.confNotifications[*ntfn.TxID], ntfn) ntfns, ok := tcn.confNotifications[*ntfn.TxID]
if !ok {
ntfns = make(map[uint64]*ConfNtfn)
tcn.confNotifications[*ntfn.TxID] = ntfns
}
ntfns[ntfn.ConfID] = ntfn
return nil
}
// UpdateConfDetails attempts to update the confirmation details for an active
// notification within the notifier. This should only be used in the case of a
// transaction that has confirmed before the notifier's current height.
//
// NOTE: The notification should be registered first to ensure notifications are
// dispatched correctly.
func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash,
clientID uint64, details *TxConfirmation) error {
select {
case <-tcn.quit:
return ErrTxConfNotifierExiting
default:
}
// Ensure we hold the lock throughout handling the notification to
// prevent the notifier from advancing its height underneath us.
tcn.Lock()
defer tcn.Unlock()
// First, we'll determine whether we have an active notification for
// this transaction with the given ID.
ntfns, ok := tcn.confNotifications[txid]
if !ok {
return fmt.Errorf("no notifications found for txid %v", txid)
}
ntfn, ok := ntfns[clientID]
if !ok {
return fmt.Errorf("no notification found with ID %v", clientID)
}
// If the notification has already recognized that the transaction
// confirmed, there's nothing left for us to do.
if ntfn.details != nil {
return nil return nil
} }
// If the transaction already has the required confirmations, we'll // The notifier has yet to reach the height at which the transaction was
// dispatch the notification immediately. // included in a block, so we should defer until handling it then within
confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 // ConnectTip.
if details == nil || details.BlockHeight > tcn.currentHeight {
return nil
}
ntfn.details = details
// Now, we'll examine whether the transaction of this notification
// request has reched its required number of confirmations. If it has,
// we'll disaptch a confirmation notification to the caller.
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
if confHeight <= tcn.currentHeight { if confHeight <= tcn.currentHeight {
Log.Infof("Dispatching %v conf notification for %v", Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID) ntfn.NumConfirmations, ntfn.TxID)
@ -126,21 +195,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro
// We'll send a 0 value to the Updates channel, indicating that // We'll send a 0 value to the Updates channel, indicating that
// the transaction has already been confirmed. // the transaction has already been confirmed.
select { select {
case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting")
case ntfn.Event.Updates <- 0: case ntfn.Event.Updates <- 0:
case <-tcn.quit:
return ErrTxConfNotifierExiting
} }
select { select {
case <-tcn.quit: case ntfn.Event.Confirmed <- details:
return fmt.Errorf("TxConfNotifier is exiting")
case ntfn.Event.Confirmed <- txConf:
ntfn.dispatched = true ntfn.dispatched = true
case <-tcn.quit:
return ErrTxConfNotifierExiting
} }
} else { } else {
// Otherwise, we'll record the transaction along with the height // Otherwise, we'll keep track of the notification request by
// at which we should notify the client. // the height at which we should dispatch the confirmation
ntfn.details = txConf // notification.
ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight]
if !exists { if !exists {
ntfnSet = make(map[*ConfNtfn]struct{}) ntfnSet = make(map[*ConfNtfn]struct{})
@ -154,22 +223,19 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro
select { select {
case ntfn.Event.Updates <- numConfsLeft: case ntfn.Event.Updates <- numConfsLeft:
case <-tcn.quit: case <-tcn.quit:
return errors.New("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
} }
} }
// As a final check, we'll also watch the transaction if it's still // As a final check, we'll also watch the transaction if it's still
// possible for it to get reorganized out of the chain. // possible for it to get reorged out of the chain.
if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight {
tcn.confNotifications[*ntfn.TxID] = txSet, exists := tcn.txsByInitialHeight[details.BlockHeight]
append(tcn.confNotifications[*ntfn.TxID], ntfn)
txSet, exists := tcn.txsByInitialHeight[txConf.BlockHeight]
if !exists { if !exists {
txSet = make(map[chainhash.Hash]struct{}) txSet = make(map[chainhash.Hash]struct{})
tcn.txsByInitialHeight[txConf.BlockHeight] = txSet tcn.txsByInitialHeight[details.BlockHeight] = txSet
} }
txSet[*ntfn.TxID] = struct{}{} txSet[txid] = struct{}{}
} }
return nil return nil
@ -185,10 +251,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
select { select {
case <-tcn.quit: case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
default: default:
} }
tcn.Lock()
defer tcn.Unlock()
if blockHeight != tcn.currentHeight+1 { if blockHeight != tcn.currentHeight+1 {
return fmt.Errorf("Received blocks out of order: "+ return fmt.Errorf("Received blocks out of order: "+
"current height=%d, new height=%d", "current height=%d, new height=%d",
@ -234,8 +303,10 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
for _, txHashes := range tcn.txsByInitialHeight { for _, txHashes := range tcn.txsByInitialHeight {
for txHash := range txHashes { for txHash := range txHashes {
for _, ntfn := range tcn.confNotifications[txHash] { for _, ntfn := range tcn.confNotifications[txHash] {
// If the transaction still hasn't been included // If the notification hasn't learned about the
// in a block, we'll skip it. // confirmation of its transaction yet (in the
// case of historical confirmations), we'll skip
// it.
if ntfn.details == nil { if ntfn.details == nil {
continue continue
} }
@ -256,7 +327,7 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
select { select {
case ntfn.Event.Updates <- numConfsLeft: case ntfn.Event.Updates <- numConfsLeft:
case <-tcn.quit: case <-tcn.quit:
return errors.New("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
} }
} }
} }
@ -267,11 +338,12 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] {
Log.Infof("Dispatching %v conf notification for %v", Log.Infof("Dispatching %v conf notification for %v",
ntfn.NumConfirmations, ntfn.TxID) ntfn.NumConfirmations, ntfn.TxID)
select { select {
case ntfn.Event.Confirmed <- ntfn.details: case ntfn.Event.Confirmed <- ntfn.details:
ntfn.dispatched = true ntfn.dispatched = true
case <-tcn.quit: case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
} }
} }
delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight)
@ -297,10 +369,13 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
select { select {
case <-tcn.quit: case <-tcn.quit:
return fmt.Errorf("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
default: default:
} }
tcn.Lock()
defer tcn.Unlock()
if blockHeight != tcn.currentHeight { if blockHeight != tcn.currentHeight {
return fmt.Errorf("Received blocks out of order: "+ return fmt.Errorf("Received blocks out of order: "+
"current height=%d, disconnected height=%d", "current height=%d, disconnected height=%d",
@ -321,7 +396,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
select { select {
case <-ntfn.Event.Updates: case <-ntfn.Event.Updates:
case <-tcn.quit: case <-tcn.quit:
return errors.New("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
default: default:
} }
@ -340,7 +415,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
select { select {
case <-ntfn.Event.Confirmed: case <-ntfn.Event.Confirmed:
case <-tcn.quit: case <-tcn.quit:
return errors.New("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
default: default:
} }
@ -352,7 +427,7 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
select { select {
case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth):
case <-tcn.quit: case <-tcn.quit:
return errors.New("TxConfNotifier is exiting") return ErrTxConfNotifierExiting
} }
continue continue
@ -383,6 +458,9 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
// This closes the event channels of all registered notifications that have // This closes the event channels of all registered notifications that have
// not been dispatched yet. // not been dispatched yet.
func (tcn *TxConfNotifier) TearDown() { func (tcn *TxConfNotifier) TearDown() {
tcn.Lock()
defer tcn.Unlock()
close(tcn.quit) close(tcn.quit)
for _, ntfns := range tcn.confNotifications { for _, ntfns := range tcn.confNotifications {

View File

@ -3,10 +3,10 @@ package chainntnfs_test
import ( import (
"testing" "testing"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/chainntnfs"
) )
var zeroHash chainhash.Hash var zeroHash chainhash.Hash
@ -38,7 +38,9 @@ func TestTxConfFutureDispatch(t *testing.T) {
NumConfirmations: tx1NumConfs, NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
} }
txConfNotifier.Register(&ntfn1, nil) if err := txConfNotifier.Register(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
tx2Hash := tx2.TxHash() tx2Hash := tx2.TxHash()
ntfn2 := chainntnfs.ConfNtfn{ ntfn2 := chainntnfs.ConfNtfn{
@ -46,7 +48,9 @@ func TestTxConfFutureDispatch(t *testing.T) {
NumConfirmations: tx2NumConfs, NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
} }
txConfNotifier.Register(&ntfn2, nil) if err := txConfNotifier.Register(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// We should not receive any notifications from both transactions // We should not receive any notifications from both transactions
// since they have not been included in a block yet. // since they have not been included in a block yet.
@ -202,32 +206,37 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
// starting height so that they are confirmed once registering them. // starting height so that they are confirmed once registering them.
tx1Hash := tx1.TxHash() tx1Hash := tx1.TxHash()
ntfn1 := chainntnfs.ConfNtfn{ ntfn1 := chainntnfs.ConfNtfn{
ConfID: 0,
TxID: &tx1Hash, TxID: &tx1Hash,
NumConfirmations: tx1NumConfs, NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
} }
if err := txConfNotifier.Register(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
tx2Hash := tx2.TxHash()
ntfn2 := chainntnfs.ConfNtfn{
ConfID: 1,
TxID: &tx2Hash,
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
}
if err := txConfNotifier.Register(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Update tx1 with its confirmation details. We should only receive one
// update since it only requires one confirmation and it already met it.
txConf1 := chainntnfs.TxConfirmation{ txConf1 := chainntnfs.TxConfirmation{
BlockHash: &zeroHash, BlockHash: &zeroHash,
BlockHeight: 9, BlockHeight: 9,
TxIndex: 1, TxIndex: 1,
} }
txConfNotifier.Register(&ntfn1, &txConf1) err := txConfNotifier.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1)
if err != nil {
tx2Hash := tx2.TxHash() t.Fatalf("unable to update conf details: %v", err)
txConf2 := chainntnfs.TxConfirmation{
BlockHash: &zeroHash,
BlockHeight: 9,
TxIndex: 2,
} }
ntfn2 := chainntnfs.ConfNtfn{
TxID: &tx2Hash,
NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
}
txConfNotifier.Register(&ntfn2, &txConf2)
// We should only receive one update for tx1 since it only requires
// one confirmation and it already met it.
select { select {
case numConfsLeft := <-ntfn1.Event.Updates: case numConfsLeft := <-ntfn1.Event.Updates:
const expected = 0 const expected = 0
@ -240,8 +249,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
t.Fatal("Expected confirmation update for tx1") t.Fatal("Expected confirmation update for tx1")
} }
// A confirmation notification for tx1 should be dispatched, as it met // A confirmation notification for tx1 should also be dispatched.
// its required number of confirmations.
select { select {
case txConf := <-ntfn1.Event.Confirmed: case txConf := <-ntfn1.Event.Confirmed:
assertEqualTxConf(t, txConf, &txConf1) assertEqualTxConf(t, txConf, &txConf1)
@ -249,8 +257,19 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
t.Fatalf("Expected confirmation for tx1") t.Fatalf("Expected confirmation for tx1")
} }
// We should only receive one update indicating how many confirmations // Update tx2 with its confirmation details. This should not trigger a
// are left for the transaction to be confirmed. // confirmation notification since it hasn't reached its required number
// of confirmations, but we should receive a confirmation update
// indicating how many confirmation are left.
txConf2 := chainntnfs.TxConfirmation{
BlockHash: &zeroHash,
BlockHeight: 9,
TxIndex: 2,
}
err = txConfNotifier.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2)
if err != nil {
t.Fatalf("unable to update conf details: %v", err)
}
select { select {
case numConfsLeft := <-ntfn2.Event.Updates: case numConfsLeft := <-ntfn2.Event.Updates:
const expected = 1 const expected = 1
@ -263,8 +282,6 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
t.Fatal("Expected confirmation update for tx2") t.Fatal("Expected confirmation update for tx2")
} }
// A confirmation notification for tx2 should not be dispatched yet, as
// it requires one more confirmation.
select { select {
case txConf := <-ntfn2.Event.Confirmed: case txConf := <-ntfn2.Event.Confirmed:
t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) t.Fatalf("Received unexpected confirmation for tx2: %v", txConf)
@ -277,7 +294,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
Transactions: []*wire.MsgTx{&tx3}, Transactions: []*wire.MsgTx{&tx3},
}) })
err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) err = txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions())
if err != nil { if err != nil {
t.Fatalf("Failed to connect block: %v", err) t.Fatalf("Failed to connect block: %v", err)
} }
@ -343,7 +360,9 @@ func TestTxConfChainReorg(t *testing.T) {
NumConfirmations: tx1NumConfs, NumConfirmations: tx1NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), Event: chainntnfs.NewConfirmationEvent(tx1NumConfs),
} }
txConfNotifier.Register(&ntfn1, nil) if err := txConfNotifier.Register(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Tx 2 will be confirmed in block 10 and requires 1 conf. // Tx 2 will be confirmed in block 10 and requires 1 conf.
tx2Hash := tx2.TxHash() tx2Hash := tx2.TxHash()
@ -352,7 +371,9 @@ func TestTxConfChainReorg(t *testing.T) {
NumConfirmations: tx2NumConfs, NumConfirmations: tx2NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), Event: chainntnfs.NewConfirmationEvent(tx2NumConfs),
} }
txConfNotifier.Register(&ntfn2, nil) if err := txConfNotifier.Register(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Tx 3 will be confirmed in block 10 and requires 2 confs. // Tx 3 will be confirmed in block 10 and requires 2 confs.
tx3Hash := tx3.TxHash() tx3Hash := tx3.TxHash()
@ -361,7 +382,9 @@ func TestTxConfChainReorg(t *testing.T) {
NumConfirmations: tx3NumConfs, NumConfirmations: tx3NumConfs,
Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), Event: chainntnfs.NewConfirmationEvent(tx3NumConfs),
} }
txConfNotifier.Register(&ntfn3, nil) if err := txConfNotifier.Register(&ntfn3); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Sync chain to block 10. Txs 1 & 2 should be confirmed. // Sync chain to block 10. Txs 1 & 2 should be confirmed.
block1 := btcutil.NewBlock(&wire.MsgBlock{ block1 := btcutil.NewBlock(&wire.MsgBlock{
@ -581,7 +604,9 @@ func TestTxConfTearDown(t *testing.T) {
NumConfirmations: 1, NumConfirmations: 1,
Event: chainntnfs.NewConfirmationEvent(1), Event: chainntnfs.NewConfirmationEvent(1),
} }
txConfNotifier.Register(&ntfn1, nil) if err := txConfNotifier.Register(&ntfn1); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
tx2Hash := tx2.TxHash() tx2Hash := tx2.TxHash()
ntfn2 := chainntnfs.ConfNtfn{ ntfn2 := chainntnfs.ConfNtfn{
@ -589,7 +614,9 @@ func TestTxConfTearDown(t *testing.T) {
NumConfirmations: 2, NumConfirmations: 2,
Event: chainntnfs.NewConfirmationEvent(2), Event: chainntnfs.NewConfirmationEvent(2),
} }
txConfNotifier.Register(&ntfn2, nil) if err := txConfNotifier.Register(&ntfn2); err != nil {
t.Fatalf("unable to register ntfn: %v", err)
}
// Include the transactions in a block and add it to the TxConfNotifier. // Include the transactions in a block and add it to the TxConfNotifier.
// This should confirm tx1, but not tx2. // This should confirm tx1, but not tx2.