From 6be642a0338dce469ceb2813e8005d3656372cd7 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 22 May 2018 15:55:32 -0400 Subject: [PATCH] chainntnfs: cache confirm hints within TxConfNotifier In this commit, we extend our TxConfNotifier to cache height hints for our confirmation events. Each transaction we've requested a confirmation notification for will have its initial height hint cached. We increment this height hint at every new block for unconfirmed transactions. This allows us to retrieve the *exact* height at which the transaction has been included in a block. By doing this, we optimize the different ChainNotifier implementations since they will no longer have to scan forward (and possibly fetch blocks in the neutrino/pruned node case) from the initial height hint looking for the confirmation. --- chainntnfs/txconfnotifier.go | 84 ++++++++++- chainntnfs/txconfnotifier_test.go | 238 +++++++++++++++++++++++++++++- 2 files changed, 317 insertions(+), 5 deletions(-) diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index bb049b8a2..ff97b23e7 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -89,6 +89,11 @@ type TxConfNotifier struct { // at which the transaction will have sufficient confirmations. ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} + // hintCache is a cache used to maintain the latest height hints for + // transactions. Each height hint represents the earliest height at + // which the transactions could have been confirmed within the chain. + hintCache ConfirmHintCache + // quit is closed in order to signal that the notifier is gracefully // exiting. quit chan struct{} @@ -98,13 +103,16 @@ type TxConfNotifier struct { // NewTxConfNotifier creates a TxConfNotifier. The current height of the // blockchain is accepted as a parameter. -func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier { +func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, + hintCache ConfirmHintCache) *TxConfNotifier { + return &TxConfNotifier{ currentHeight: startHeight, reorgSafetyLimit: reorgSafetyLimit, confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), + hintCache: hintCache, quit: make(chan struct{}), } } @@ -130,6 +138,16 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { if !ok { ntfns = make(map[uint64]*ConfNtfn) tcn.confNotifications[*ntfn.TxID] = ntfns + + err := tcn.hintCache.CommitConfirmHint( + tcn.currentHeight, *ntfn.TxID, + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + Log.Errorf("Unable to update confirm hint to %d for "+ + "%v: %v", tcn.currentHeight, *ntfn.TxID, err) + } } ntfns[ntfn.ConfID] = ntfn @@ -175,6 +193,14 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, return nil } + err := tcn.hintCache.CommitConfirmHint(details.BlockHeight, txid) + if err != nil { + // The error is not fatal, so we should not return an error to + // the caller. + Log.Errorf("Unable to update confirm hint to %d for %v: %v", + details.BlockHeight, txid, err) + } + // The notifier has yet to reach the height at which the transaction was // included in a block, so we should defer until handling it then within // ConnectTip. @@ -297,6 +323,48 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } } + // In order to update the height hint for all the required transactions + // under one database transaction, we'll gather the set of unconfirmed + // transactions along with the ones that confirmed at the current + // height. To do so, we'll iterate over the confNotifications map, which + // contains the transactions we currently have notifications for. Since + // this map doesn't tell us whether the transaction hsa confirmed or + // not, we'll need to look at txsByInitialHeight to determine so. + var txsToUpdateHints []chainhash.Hash + for confirmedTx := range tcn.txsByInitialHeight[tcn.currentHeight] { + txsToUpdateHints = append(txsToUpdateHints, confirmedTx) + } +out: + for maybeUnconfirmedTx := range tcn.confNotifications { + for height, confirmedTxs := range tcn.txsByInitialHeight { + // Skip the transactions that confirmed at the new block + // height as those have already been added. + if height == blockHeight { + continue + } + + // If the transaction was found within the set of + // confirmed transactions at this height, we'll skip it. + if _, ok := confirmedTxs[maybeUnconfirmedTx]; ok { + continue out + } + } + txsToUpdateHints = append(txsToUpdateHints, maybeUnconfirmedTx) + } + + if len(txsToUpdateHints) > 0 { + err := tcn.hintCache.CommitConfirmHint( + tcn.currentHeight, txsToUpdateHints..., + ) + if err != nil { + // The error is not fatal, so we should not return an + // error to the caller. + Log.Errorf("Unable to update confirm hint to %d for "+ + "%v: %v", tcn.currentHeight, txsToUpdateHints, + err) + } + } + // Next, we'll dispatch an update to all of the notification clients for // our watched transactions with the number of confirmations left at // this new height. @@ -447,6 +515,20 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { } } + // Rewind the height hint for all watched transactions. + var txs []chainhash.Hash + for tx := range tcn.confNotifications { + txs = append(txs, tx) + } + + err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...) + if err != nil { + // The error is not fatal, so we should not return an error to + // the caller. + Log.Errorf("Unable to update confirm hint to %d for %v: %v", + tcn.currentHeight, txs, err) + } + // Finally, we can remove the transactions we're currently watching that // were included in this block height. delete(tcn.txsByInitialHeight, blockHeight) diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index 2a57245a5..fb996d0a3 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -1,6 +1,7 @@ package chainntnfs_test import ( + "sync" "testing" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -11,6 +12,90 @@ import ( var zeroHash chainhash.Hash +type mockHintCache struct { + mu sync.Mutex + confHints map[chainhash.Hash]uint32 + spendHints map[wire.OutPoint]uint32 +} + +var _ chainntnfs.SpendHintCache = (*mockHintCache)(nil) +var _ chainntnfs.ConfirmHintCache = (*mockHintCache)(nil) + +func (c *mockHintCache) CommitSpendHint(heightHint uint32, ops ...wire.OutPoint) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, op := range ops { + c.spendHints[op] = heightHint + } + + return nil +} + +func (c *mockHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) { + c.mu.Lock() + defer c.mu.Unlock() + + hint, ok := c.spendHints[op] + if !ok { + return 0, chainntnfs.ErrSpendHintNotFound + } + + return hint, nil +} + +func (c *mockHintCache) PurgeSpendHint(ops ...wire.OutPoint) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, op := range ops { + delete(c.spendHints, op) + } + + return nil +} + +func (c *mockHintCache) CommitConfirmHint(heightHint uint32, txids ...chainhash.Hash) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, txid := range txids { + c.confHints[txid] = heightHint + } + + return nil +} + +func (c *mockHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) { + c.mu.Lock() + defer c.mu.Unlock() + + hint, ok := c.confHints[txid] + if !ok { + return 0, chainntnfs.ErrConfirmHintNotFound + } + + return hint, nil +} + +func (c *mockHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, txid := range txids { + delete(c.confHints, txid) + } + + return nil +} + +func newMockHintCache() *mockHintCache { + return &mockHintCache{ + confHints: make(map[chainhash.Hash]uint32), + spendHints: make(map[wire.OutPoint]uint32), + } +} + // TestTxConfFutureDispatch tests that the TxConfNotifier dispatches // registered notifications when the transaction confirms after registration. func TestTxConfFutureDispatch(t *testing.T) { @@ -27,7 +112,8 @@ func TestTxConfFutureDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier before including them in a block to receive future @@ -200,7 +286,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions at a height before the TxConfNotifier's // starting height so that they are confirmed once registering them. @@ -351,7 +438,8 @@ func TestTxConfChainReorg(t *testing.T) { tx3 = wire.MsgTx{Version: 3} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100, hintCache) // Tx 1 will be confirmed in block 9 and requires 2 confs. tx1Hash := tx1.TxHash() @@ -586,6 +674,147 @@ func TestTxConfChainReorg(t *testing.T) { } } +// TestTxConfHeightHintCache ensures that the height hints for transactions are +// kept track of correctly with each new block connected/disconnected. +func TestTxConfHeightHintCache(t *testing.T) { + t.Parallel() + + const ( + startingHeight = 10 + tx1Height = 11 + tx2Height = 12 + ) + + // Initialize our TxConfNotifier instance backed by a height hint cache. + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier( + startingHeight, 100, hintCache, + ) + + // Create two test transactions and register them for notifications. + tx1 := wire.MsgTx{Version: 1} + tx1Hash := tx1.TxHash() + ntfn1 := &chainntnfs.ConfNtfn{ + TxID: &tx1Hash, + NumConfirmations: 1, + Event: chainntnfs.NewConfirmationEvent(1), + } + + tx2 := wire.MsgTx{Version: 2} + tx2Hash := tx2.TxHash() + ntfn2 := &chainntnfs.ConfNtfn{ + TxID: &tx2Hash, + NumConfirmations: 2, + Event: chainntnfs.NewConfirmationEvent(2), + } + + if err := txConfNotifier.Register(ntfn1); err != nil { + t.Fatalf("unable to register tx1: %v", err) + } + if err := txConfNotifier.Register(ntfn2); err != nil { + t.Fatalf("unable to register tx2: %v", err) + } + + // Both transactions should have a height hint of the starting height + // due to registering notifications for them. + hint, err := hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != startingHeight { + t.Fatalf("expected hint %d, got %d", startingHeight, hint) + } + + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != startingHeight { + t.Fatalf("expected hint %d, got %d", startingHeight, hint) + } + + // Create a new block that will include the first transaction and extend + // the chain. + block1 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1}, + }) + + err = txConfNotifier.ConnectTip( + block1.Hash(), tx1Height, block1.Transactions(), + ) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // The height hint for the first transaction should now be updated to + // reflect its confirmation. + hint, err = hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // The height hint for the second transaction should also be updated due + // to it still being unconfirmed. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // Now, we'll create another block that will include the second + // transaction and extend the chain. + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx2}, + }) + + err = txConfNotifier.ConnectTip( + block2.Hash(), tx2Height, block2.Transactions(), + ) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // The height hint for the first transaction should remain the same. + hint, err = hintCache.QueryConfirmHint(tx1Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } + + // The height hint for the second transaction should now be updated to + // reflect its confirmation. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx2Height { + t.Fatalf("expected hint %d, got %d", tx2Height, hint) + } + + // Now, we'll attempt do disconnect the last block in order to simulate + // a chain reorg. + if err := txConfNotifier.DisconnectTip(tx2Height); err != nil { + t.Fatalf("Failed to disconnect block: %v", err) + } + + // This should update the second transaction's height hint within the + // cache to the previous height. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", tx1Height, hint) + } +} + func TestTxConfTearDown(t *testing.T) { t.Parallel() @@ -594,7 +823,8 @@ func TestTxConfTearDown(t *testing.T) { tx2 = wire.MsgTx{Version: 2} ) - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100) + hintCache := newMockHintCache() + txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier to receive notifications.