diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 6a6d0e0a3..9c3c9818f 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -613,7 +613,7 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, chainntnfs.ConfNtfn{ TxID: txid, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(numConfs), }, } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 21e7ae172..b254be875 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -709,7 +709,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, chainntnfs.ConfNtfn{ TxID: txid, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(numConfs), }, } diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index e1e9393d0..326a05ba6 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -88,8 +88,10 @@ type TxConfirmation struct { // ConfirmationEvent encapsulates a confirmation notification. With this struct, // callers can be notified of: the instance the target txid reaches the targeted -// number of confirmations, and also in the event that the original txid becomes -// disconnected from the blockchain as a result of a re-org. +// number of confirmations, how many confirmations are left for the target txid +// to be fully confirmed at every new block height, and also in the event that +// the original txid becomes disconnected from the blockchain as a result of a +// re-org. // // Once the txid reaches the specified number of confirmations, the 'Confirmed' // channel will be sent upon fulfilling the notification. @@ -103,6 +105,11 @@ type ConfirmationEvent struct { // details of the channel's confirmation. Confirmed chan *TxConfirmation // MUST be buffered. + // Updates is a channel that will sent upon, at every incremental + // confirmation, how many confirmations are left to declare the + // transaction as fully confirmed. + Updates chan uint32 // MUST be buffered. + // TODO(roasbeef): all goroutines on ln channel updates should also // have a struct chan that's closed if funding gets re-org out. Need // to sync, to request another confirmation event ntfn, then re-open diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index b10bc0702..231196fbd 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -698,7 +698,7 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, ConfNtfn: chainntnfs.ConfNtfn{ TxID: txid, NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(numConfs), }, heightHint: heightHint, } diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 6d9415531..b955bc76c 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -1,6 +1,7 @@ package chainntnfs import ( + "errors" "fmt" "github.com/roasbeef/btcd/chaincfg/chainhash" @@ -33,9 +34,10 @@ type ConfNtfn struct { // NewConfirmationEvent constructs a new ConfirmationEvent with newly opened // channels. -func NewConfirmationEvent() *ConfirmationEvent { +func NewConfirmationEvent(numConfs uint32) *ConfirmationEvent { return &ConfirmationEvent{ Confirmed: make(chan *TxConfirmation, 1), + Updates: make(chan uint32, numConfs), NegativeConf: make(chan int32, 1), } } @@ -66,11 +68,11 @@ type TxConfNotifier struct { // hash. confNotifications map[chainhash.Hash][]*ConfNtfn - // confTxsByInitialHeight is an index of watched transactions by the height + // txsByInitialHeight is an index of watched transactions by the height // that they are included at in the blockchain. This is tracked so that - // incorrect notifications are not sent if a transaction is reorganized out - // of the chain and so that negative confirmations can be recognized. - confTxsByInitialHeight map[uint32][]*chainhash.Hash + // incorrect notifications are not sent if a transaction is reorganized + // out of the chain and so that negative confirmations can be recognized. + txsByInitialHeight map[uint32]map[chainhash.Hash]struct{} // ntfnsByConfirmHeight is an index of notification requests by the height // at which the transaction will have sufficient confirmations. @@ -85,12 +87,12 @@ type TxConfNotifier struct { // blockchain is accepted as a parameter. func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier { return &TxConfNotifier{ - currentHeight: startHeight, - reorgSafetyLimit: reorgSafetyLimit, - confNotifications: make(map[chainhash.Hash][]*ConfNtfn), - confTxsByInitialHeight: make(map[uint32][]*chainhash.Hash), - ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), - quit: make(chan struct{}), + currentHeight: startHeight, + reorgSafetyLimit: reorgSafetyLimit, + confNotifications: make(map[chainhash.Hash][]*ConfNtfn), + txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), + ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + quit: make(chan struct{}), } } @@ -114,13 +116,21 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro return nil } - // If the transaction already has the required confirmations, dispatch - // notification immediately, otherwise record along with the height at - // which to notify. + // If the transaction already has the required confirmations, we'll + // dispatch the notification immediately. confHeight := txConf.BlockHeight + ntfn.NumConfirmations - 1 if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) + + // We'll send a 0 value to the Updates channel, indicating that + // the transaction has already been confirmed. + select { + case <-tcn.quit: + return fmt.Errorf("TxConfNotifier is exiting") + case ntfn.Event.Updates <- 0: + } + select { case <-tcn.quit: return fmt.Errorf("TxConfNotifier is exiting") @@ -128,6 +138,8 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro ntfn.dispatched = true } } else { + // Otherwise, we'll record the transaction along with the height + // at which we should notify the client. ntfn.details = txConf ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { @@ -135,16 +147,29 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn, txConf *TxConfirmation) erro tcn.ntfnsByConfirmHeight[confHeight] = ntfnSet } ntfnSet[ntfn] = struct{}{} + + // We'll also send an update to the client of how many + // confirmations are left for the transaction to be confirmed. + numConfsLeft := confHeight - tcn.currentHeight + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } } - // Unless the transaction is finalized, include transaction information in - // confNotifications and confTxsByInitialHeight in case the tx gets - // reorganized out of the chain. + // As a final check, we'll also watch the transaction if it's still + // possible for it to get reorganized out of the chain. if txConf.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { tcn.confNotifications[*ntfn.TxID] = append(tcn.confNotifications[*ntfn.TxID], ntfn) - tcn.confTxsByInitialHeight[txConf.BlockHeight] = - append(tcn.confTxsByInitialHeight[txConf.BlockHeight], ntfn.TxID) + + txSet, exists := tcn.txsByInitialHeight[txConf.BlockHeight] + if !exists { + txSet = make(map[chainhash.Hash]struct{}) + tcn.txsByInitialHeight[txConf.BlockHeight] = txSet + } + txSet[*ntfn.TxID] = struct{}{} } return nil @@ -172,10 +197,11 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, tcn.currentHeight++ tcn.reorgDepth = 0 - // Record any newly confirmed transactions in ntfnsByConfirmHeight so that - // notifications get dispatched when the tx gets sufficient confirmations. - // Also record txs in confTxsByInitialHeight so reorgs can be handled - // correctly. + // Record any newly confirmed transactions by their confirmed height so + // that notifications get dispatched when the transactions reach their + // required number of confirmations. We'll also watch these transactions + // at the height they were included in the chain so reorgs can be + // handled correctly. for _, tx := range txns { txHash := tx.Hash() for _, ntfn := range tcn.confNotifications[*txHash] { @@ -193,13 +219,51 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } ntfnSet[ntfn] = struct{}{} - tcn.confTxsByInitialHeight[blockHeight] = - append(tcn.confTxsByInitialHeight[blockHeight], tx.Hash()) + txSet, exists := tcn.txsByInitialHeight[blockHeight] + if !exists { + txSet = make(map[chainhash.Hash]struct{}) + tcn.txsByInitialHeight[blockHeight] = txSet + } + txSet[*txHash] = struct{}{} } } - // Dispatch notifications for all transactions that are considered confirmed - // at this new block height. + // Next, we'll dispatch an update to all of the notification clients for + // our watched transactions with the number of confirmations left at + // this new height. + for _, txHashes := range tcn.txsByInitialHeight { + for txHash := range txHashes { + for _, ntfn := range tcn.confNotifications[txHash] { + // If the transaction still hasn't been included + // in a block, we'll skip it. + if ntfn.details == nil { + continue + } + + txConfHeight := ntfn.details.BlockHeight + + ntfn.NumConfirmations - 1 + numConfsLeft := txConfHeight - blockHeight + + // Since we don't clear notifications until + // transactions are no longer under the risk of + // being reorganized out of the chain, we'll + // skip sending updates for transactions that + // have already been confirmed. + if int32(numConfsLeft) < 0 { + continue + } + + select { + case ntfn.Event.Updates <- numConfsLeft: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } + } + } + } + + // Then, we'll dispatch notifications for all the transactions that have + // become confirmed at this new block height. for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) @@ -213,14 +277,14 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, delete(tcn.ntfnsByConfirmHeight, tcn.currentHeight) // Clear entries from confNotifications and confTxsByInitialHeight. We - // assume that reorgs deeper than the reorg safety limit do not happen, so - // we can clear out entries for the block that is now mature. + // assume that reorgs deeper than the reorg safety limit do not happen, + // so we can clear out entries for the block that is now mature. if tcn.currentHeight >= tcn.reorgSafetyLimit { matureBlockHeight := tcn.currentHeight - tcn.reorgSafetyLimit - for _, txHash := range tcn.confTxsByInitialHeight[matureBlockHeight] { - delete(tcn.confNotifications, *txHash) + for txHash := range tcn.txsByInitialHeight[matureBlockHeight] { + delete(tcn.confNotifications, txHash) } - delete(tcn.confTxsByInitialHeight, matureBlockHeight) + delete(tcn.txsByInitialHeight, matureBlockHeight) } return nil @@ -245,34 +309,72 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { tcn.currentHeight-- tcn.reorgDepth++ - for _, txHash := range tcn.confTxsByInitialHeight[blockHeight] { - for _, ntfn := range tcn.confNotifications[*txHash] { - // If notification has been dispatched with sufficient - // confirmations, notify of the reversal. - if ntfn.dispatched { + // We'll go through all of our watched transactions and attempt to drain + // their notification channels to ensure sending notifications to the + // clients is always non-blocking. + for initialHeight, txHashes := range tcn.txsByInitialHeight { + for txHash := range txHashes { + for _, ntfn := range tcn.confNotifications[txHash] { + // First, we'll attempt to drain an update + // from each notification to ensure sends to the + // Updates channel are always non-blocking. select { - case <-ntfn.Event.Confirmed: - // Drain confirmation notification instead of sending - // negative conf if the receiver has not processed it yet. - // This ensures sends to the Confirmed channel are always - // non-blocking. - case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-ntfn.Event.Updates: case <-tcn.quit: - return fmt.Errorf("TxConfNotifier is exiting") + return errors.New("TxConfNotifier is exiting") + default: } - ntfn.dispatched = false - continue - } - confHeight := blockHeight + ntfn.NumConfirmations - 1 - ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] - if !exists { - continue + // Then, we'll check if the current transaction + // was included in the block currently being + // disconnected. If it was, we'll need to take + // some necessary precautions. + if initialHeight == blockHeight { + // If the transaction's confirmation notification + // has already been dispatched, we'll attempt to + // notify the client it was reorged out of the chain. + if ntfn.dispatched { + // Attempt to drain the confirmation notification + // to ensure sends to the Confirmed channel are + // always non-blocking. + select { + case <-ntfn.Event.Confirmed: + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + default: + } + + ntfn.dispatched = false + + // Send a negative confirmation notification to the + // client indicating how many blocks have been + // disconnected successively. + select { + case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-tcn.quit: + return errors.New("TxConfNotifier is exiting") + } + + continue + } + + // Otherwise, since the transactions was reorged out + // of the chain, we can safely remove its accompanying + // confirmation notification. + confHeight := blockHeight + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if !exists { + continue + } + delete(ntfnSet, ntfn) + } } - delete(ntfnSet, ntfn) } } - delete(tcn.confTxsByInitialHeight, blockHeight) + + // Finally, we can remove the transactions we're currently watching that + // were included in this block height. + delete(tcn.txsByInitialHeight, blockHeight) return nil } @@ -290,6 +392,7 @@ func (tcn *TxConfNotifier) TearDown() { } close(ntfn.Event.Confirmed) + close(ntfn.Event.Updates) close(ntfn.Event.NegativeConf) } } diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index d60b52270..f1ef90301 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -16,7 +16,10 @@ var zeroHash chainhash.Hash func TestTxConfFutureDispatch(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + const ( + tx1NumConfs uint32 = 1 + tx2NumConfs uint32 = 2 + ) var ( tx1 = wire.MsgTx{Version: 1} @@ -24,43 +27,74 @@ func TestTxConfFutureDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + // Create the test transactions and register them with the + // TxConfNotifier before including them in a block to receive future + // notifications. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, - NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx1NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } txConfNotifier.Register(&ntfn1, nil) tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, - NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } txConfNotifier.Register(&ntfn2, nil) + // We should not receive any notifications from both transactions + // since they have not been included in a block yet. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } select { + case <-ntfn2.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx2") case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Include the transactions in a block and add it to the TxConfNotifier. + // This should confirm tx1, but not tx2. block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, }) - err := txConfNotifier.ConnectTip(block1.Hash(), 11, block1.Transactions()) + err := txConfNotifier.ConnectTip( + block1.Hash(), 11, block1.Transactions(), + ) if err != nil { t.Fatalf("Failed to connect block: %v", err) } + // We should only receive one update for tx1 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn1.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx1 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx1") + } + + // A confirmation notification for this tranaction should be dispatched, + // as it only required one confirmation. select { case txConf := <-ntfn1.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -73,12 +107,30 @@ func TestTxConfFutureDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update for tx2 since it only has one + // confirmation so far and it requires two. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 1 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Create a new block and add it to the TxConfNotifier at the next + // height. This should confirm tx2. block2 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx3}, }) @@ -88,12 +140,32 @@ func TestTxConfFutureDispatch(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should not receive any event notifications for tx1 since it has + // already been confirmed. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // We should only receive one update since the last at the new height, + // indicating how many confirmations are still left. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, since it + // now meets its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -113,7 +185,10 @@ func TestTxConfFutureDispatch(t *testing.T) { func TestTxConfHistoricalDispatch(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + const ( + tx1NumConfs uint32 = 1 + tx2NumConfs uint32 = 3 + ) var ( tx1 = wire.MsgTx{Version: 1} @@ -121,11 +196,15 @@ func TestTxConfHistoricalDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + // Create the test transactions at a height before the TxConfNotifier's + // starting height so that they are confirmed once registering them. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, - NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx1NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } txConf1 := chainntnfs.TxConfirmation{ BlockHash: &zeroHash, @@ -142,11 +221,27 @@ func TestTxConfHistoricalDispatch(t *testing.T) { } ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, - NumConfirmations: 3, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } txConfNotifier.Register(&ntfn2, &txConf2) + // We should only receive one update for tx1 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn1.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx1 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx1") + } + + // A confirmation notification for tx1 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn1.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf1) @@ -154,12 +249,30 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update indicating how many confirmations + // are left for the transaction to be confirmed. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 1 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } + // Create a new block and add it to the TxConfNotifier at the next + // height. This should confirm tx2. block := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx3}, }) @@ -169,12 +282,32 @@ func TestTxConfHistoricalDispatch(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should not receive any event notifications for tx1 since it has + // already been confirmed. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // We should only receive one update for tx2 since the last one, + // indicating how many confirmations are still left. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: assertEqualTxConf(t, txConf, &txConf2) @@ -189,7 +322,11 @@ func TestTxConfHistoricalDispatch(t *testing.T) { func TestTxConfChainReorg(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(8, 100) + const ( + tx1NumConfs uint32 = 2 + tx2NumConfs uint32 = 1 + tx3NumConfs uint32 = 2 + ) var ( tx1 = wire.MsgTx{Version: 1} @@ -197,12 +334,14 @@ func TestTxConfChainReorg(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100) + // Tx 1 will be confirmed in block 9 and requires 2 confs. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, - NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx1NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } txConfNotifier.Register(&ntfn1, nil) @@ -210,8 +349,8 @@ func TestTxConfChainReorg(t *testing.T) { tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, - NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx2NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } txConfNotifier.Register(&ntfn2, nil) @@ -219,8 +358,8 @@ func TestTxConfChainReorg(t *testing.T) { tx3Hash := tx3.TxHash() ntfn3 := chainntnfs.ConfNtfn{ TxID: &tx3Hash, - NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + NumConfirmations: tx3NumConfs, + Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } txConfNotifier.Register(&ntfn3, nil) @@ -228,7 +367,11 @@ func TestTxConfChainReorg(t *testing.T) { block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1}, }) - err := txConfNotifier.ConnectTip(nil, 9, block1.Transactions()) + err := txConfNotifier.ConnectTip(nil, 8, block1.Transactions()) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + err = txConfNotifier.ConnectTip(nil, 9, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -241,25 +384,57 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We should receive two updates for tx1 since it requires two + // confirmations and it has already met them. + for i := 0; i < 2; i++ { + select { + case <-ntfn1.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx1") + } + } + + // A confirmation notification for tx1 should be dispatched, as it met + // its required number of confirmations. select { case <-ntfn1.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx1") } + // We should only receive one update for tx2 since it only requires + // one confirmation and it already met it. + select { + case <-ntfn2.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case <-ntfn2.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx2") } + // We should only receive one update for tx3 since it only has one + // confirmation so far and it requires two. + select { + case <-ntfn3.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx3") + } + + // A confirmation notification for tx3 should not be dispatched yet, as + // it requires one more confirmation. select { case txConf := <-ntfn3.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) default: } - // Block that tx2 and tx3 were included in is disconnected and two next + // The block that included tx2 and tx3 is disconnected and two next // blocks without them are connected. err = txConfNotifier.DisconnectTip(10) if err != nil { @@ -286,19 +461,28 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Expected negative conf notification for tx1") } + // We should not receive any event notifications from all of the + // transactions because tx1 has already been confirmed and tx2 and tx3 + // have not been included in the chain since the reorg. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } select { + case <-ntfn2.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx2") case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } select { + case <-ntfn3.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx3") case txConf := <-ntfn3.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx3: %v", txConf) default: @@ -320,7 +504,22 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } - // Both transactions should be newly confirmed. + // We should only receive one update for tx2 since it only requires + // one confirmation and it already met it. + select { + case numConfsLeft := <-ntfn2.Event.Updates: + const expected = 0 + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx2 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + + // A confirmation notification for tx2 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn2.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -333,6 +532,24 @@ func TestTxConfChainReorg(t *testing.T) { t.Fatalf("Expected confirmation for tx2") } + // We should receive two updates for tx3 since it requires two + // confirmations and it has already met them. + for i := uint32(1); i <= 2; i++ { + select { + case numConfsLeft := <-ntfn3.Event.Updates: + expected := tx3NumConfs - i + if numConfsLeft != expected { + t.Fatalf("Received incorrect confirmation update: tx3 "+ + "expected %d confirmations left, got %d", + expected, numConfsLeft) + } + default: + t.Fatal("Expected confirmation update for tx2") + } + } + + // A confirmation notification for tx3 should be dispatched, as it met + // its required number of confirmations. select { case txConf := <-ntfn3.Event.Confirmed: expectedConf := chainntnfs.TxConfirmation{ @@ -349,18 +566,20 @@ func TestTxConfChainReorg(t *testing.T) { func TestTxConfTearDown(t *testing.T) { t.Parallel() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) - var ( tx1 = wire.MsgTx{Version: 1} tx2 = wire.MsgTx{Version: 2} ) + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + + // Create the test transactions and register them with the + // TxConfNotifier to receive notifications. tx1Hash := tx1.TxHash() ntfn1 := chainntnfs.ConfNtfn{ TxID: &tx1Hash, NumConfirmations: 1, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(1), } txConfNotifier.Register(&ntfn1, nil) @@ -368,10 +587,12 @@ func TestTxConfTearDown(t *testing.T) { ntfn2 := chainntnfs.ConfNtfn{ TxID: &tx2Hash, NumConfirmations: 2, - Event: chainntnfs.NewConfirmationEvent(), + Event: chainntnfs.NewConfirmationEvent(2), } txConfNotifier.Register(&ntfn2, nil) + // Include the transactions in a block and add it to the TxConfNotifier. + // This should confirm tx1, but not tx2. block := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1, &tx2}, }) @@ -381,35 +602,62 @@ func TestTxConfTearDown(t *testing.T) { t.Fatalf("Failed to connect block: %v", err) } + // We do not care about the correctness of the notifications since they + // are tested in other methods, but we'll still attempt to retrieve them + // for the sake of not being able to later once the notification + // channels are closed. + select { + case <-ntfn1.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx1") + } + select { case <-ntfn1.Event.Confirmed: default: t.Fatalf("Expected confirmation for tx1") } + select { + case <-ntfn2.Event.Updates: + default: + t.Fatal("Expected confirmation update for tx2") + } + select { case txConf := <-ntfn2.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx2: %v", txConf) default: } - // Confirmed channels should be closed for notifications that have not been - // dispatched yet. + // The notification channels should be closed for notifications that + // have not been dispatched yet, so we should not expect to receive any + // more updates. txConfNotifier.TearDown() + // tx1 should not receive any more updates because it has already been + // confirmed and the TxConfNotifier has been shut down. select { + case <-ntfn1.Event.Updates: + t.Fatal("Received unexpected confirmation update for tx1") case txConf := <-ntfn1.Event.Confirmed: t.Fatalf("Received unexpected confirmation for tx1: %v", txConf) default: } + // tx2 should not receive any more updates after the notifications + // channels have been closed and the TxConfNotifier shut down. select { + case _, more := <-ntfn2.Event.Updates: + if more { + t.Fatal("Expected closed Updates channel for tx2") + } case _, more := <-ntfn2.Event.Confirmed: if more { - t.Fatalf("Expected channel close for tx2") + t.Fatalf("Expected closed Confirmed channel for tx2") } default: - t.Fatalf("Expected channel close for tx2") + t.Fatalf("Expected closed notification channels for tx2") } }