mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-25 17:51:17 +02:00
chainntnfs: cache confirm hints within TxConfNotifier
In this commit, we extend our TxConfNotifier to cache height hints for our confirmation events. Each transaction we've requested a confirmation notification for will have its initial height hint cached. We increment this height hint at every new block for unconfirmed transactions. This allows us to retrieve the *exact* height at which the transaction has been included in a block. By doing this, we optimize the different ChainNotifier implementations since they will no longer have to scan forward (and possibly fetch blocks in the neutrino/pruned node case) from the initial height hint looking for the confirmation.
This commit is contained in:
@@ -89,6 +89,11 @@ type TxConfNotifier struct {
|
|||||||
// at which the transaction will have sufficient confirmations.
|
// at which the transaction will have sufficient confirmations.
|
||||||
ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{}
|
ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{}
|
||||||
|
|
||||||
|
// hintCache is a cache used to maintain the latest height hints for
|
||||||
|
// transactions. Each height hint represents the earliest height at
|
||||||
|
// which the transactions could have been confirmed within the chain.
|
||||||
|
hintCache ConfirmHintCache
|
||||||
|
|
||||||
// quit is closed in order to signal that the notifier is gracefully
|
// quit is closed in order to signal that the notifier is gracefully
|
||||||
// exiting.
|
// exiting.
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
@@ -98,13 +103,16 @@ type TxConfNotifier struct {
|
|||||||
|
|
||||||
// NewTxConfNotifier creates a TxConfNotifier. The current height of the
|
// NewTxConfNotifier creates a TxConfNotifier. The current height of the
|
||||||
// blockchain is accepted as a parameter.
|
// blockchain is accepted as a parameter.
|
||||||
func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32) *TxConfNotifier {
|
func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32,
|
||||||
|
hintCache ConfirmHintCache) *TxConfNotifier {
|
||||||
|
|
||||||
return &TxConfNotifier{
|
return &TxConfNotifier{
|
||||||
currentHeight: startHeight,
|
currentHeight: startHeight,
|
||||||
reorgSafetyLimit: reorgSafetyLimit,
|
reorgSafetyLimit: reorgSafetyLimit,
|
||||||
confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn),
|
confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn),
|
||||||
txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}),
|
txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}),
|
||||||
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
|
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
|
||||||
|
hintCache: hintCache,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -130,6 +138,16 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error {
|
|||||||
if !ok {
|
if !ok {
|
||||||
ntfns = make(map[uint64]*ConfNtfn)
|
ntfns = make(map[uint64]*ConfNtfn)
|
||||||
tcn.confNotifications[*ntfn.TxID] = ntfns
|
tcn.confNotifications[*ntfn.TxID] = ntfns
|
||||||
|
|
||||||
|
err := tcn.hintCache.CommitConfirmHint(
|
||||||
|
tcn.currentHeight, *ntfn.TxID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
// The error is not fatal, so we should not return an
|
||||||
|
// error to the caller.
|
||||||
|
Log.Errorf("Unable to update confirm hint to %d for "+
|
||||||
|
"%v: %v", tcn.currentHeight, *ntfn.TxID, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ntfns[ntfn.ConfID] = ntfn
|
ntfns[ntfn.ConfID] = ntfn
|
||||||
@@ -175,6 +193,14 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := tcn.hintCache.CommitConfirmHint(details.BlockHeight, txid)
|
||||||
|
if err != nil {
|
||||||
|
// The error is not fatal, so we should not return an error to
|
||||||
|
// the caller.
|
||||||
|
Log.Errorf("Unable to update confirm hint to %d for %v: %v",
|
||||||
|
details.BlockHeight, txid, err)
|
||||||
|
}
|
||||||
|
|
||||||
// The notifier has yet to reach the height at which the transaction was
|
// The notifier has yet to reach the height at which the transaction was
|
||||||
// included in a block, so we should defer until handling it then within
|
// included in a block, so we should defer until handling it then within
|
||||||
// ConnectTip.
|
// ConnectTip.
|
||||||
@@ -297,6 +323,48 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// In order to update the height hint for all the required transactions
|
||||||
|
// under one database transaction, we'll gather the set of unconfirmed
|
||||||
|
// transactions along with the ones that confirmed at the current
|
||||||
|
// height. To do so, we'll iterate over the confNotifications map, which
|
||||||
|
// contains the transactions we currently have notifications for. Since
|
||||||
|
// this map doesn't tell us whether the transaction hsa confirmed or
|
||||||
|
// not, we'll need to look at txsByInitialHeight to determine so.
|
||||||
|
var txsToUpdateHints []chainhash.Hash
|
||||||
|
for confirmedTx := range tcn.txsByInitialHeight[tcn.currentHeight] {
|
||||||
|
txsToUpdateHints = append(txsToUpdateHints, confirmedTx)
|
||||||
|
}
|
||||||
|
out:
|
||||||
|
for maybeUnconfirmedTx := range tcn.confNotifications {
|
||||||
|
for height, confirmedTxs := range tcn.txsByInitialHeight {
|
||||||
|
// Skip the transactions that confirmed at the new block
|
||||||
|
// height as those have already been added.
|
||||||
|
if height == blockHeight {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the transaction was found within the set of
|
||||||
|
// confirmed transactions at this height, we'll skip it.
|
||||||
|
if _, ok := confirmedTxs[maybeUnconfirmedTx]; ok {
|
||||||
|
continue out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
txsToUpdateHints = append(txsToUpdateHints, maybeUnconfirmedTx)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(txsToUpdateHints) > 0 {
|
||||||
|
err := tcn.hintCache.CommitConfirmHint(
|
||||||
|
tcn.currentHeight, txsToUpdateHints...,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
// The error is not fatal, so we should not return an
|
||||||
|
// error to the caller.
|
||||||
|
Log.Errorf("Unable to update confirm hint to %d for "+
|
||||||
|
"%v: %v", tcn.currentHeight, txsToUpdateHints,
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Next, we'll dispatch an update to all of the notification clients for
|
// Next, we'll dispatch an update to all of the notification clients for
|
||||||
// our watched transactions with the number of confirmations left at
|
// our watched transactions with the number of confirmations left at
|
||||||
// this new height.
|
// this new height.
|
||||||
@@ -447,6 +515,20 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Rewind the height hint for all watched transactions.
|
||||||
|
var txs []chainhash.Hash
|
||||||
|
for tx := range tcn.confNotifications {
|
||||||
|
txs = append(txs, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...)
|
||||||
|
if err != nil {
|
||||||
|
// The error is not fatal, so we should not return an error to
|
||||||
|
// the caller.
|
||||||
|
Log.Errorf("Unable to update confirm hint to %d for %v: %v",
|
||||||
|
tcn.currentHeight, txs, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Finally, we can remove the transactions we're currently watching that
|
// Finally, we can remove the transactions we're currently watching that
|
||||||
// were included in this block height.
|
// were included in this block height.
|
||||||
delete(tcn.txsByInitialHeight, blockHeight)
|
delete(tcn.txsByInitialHeight, blockHeight)
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package chainntnfs_test
|
package chainntnfs_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
@@ -11,6 +12,90 @@ import (
|
|||||||
|
|
||||||
var zeroHash chainhash.Hash
|
var zeroHash chainhash.Hash
|
||||||
|
|
||||||
|
type mockHintCache struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
confHints map[chainhash.Hash]uint32
|
||||||
|
spendHints map[wire.OutPoint]uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ chainntnfs.SpendHintCache = (*mockHintCache)(nil)
|
||||||
|
var _ chainntnfs.ConfirmHintCache = (*mockHintCache)(nil)
|
||||||
|
|
||||||
|
func (c *mockHintCache) CommitSpendHint(heightHint uint32, ops ...wire.OutPoint) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
for _, op := range ops {
|
||||||
|
c.spendHints[op] = heightHint
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockHintCache) QuerySpendHint(op wire.OutPoint) (uint32, error) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
hint, ok := c.spendHints[op]
|
||||||
|
if !ok {
|
||||||
|
return 0, chainntnfs.ErrSpendHintNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return hint, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockHintCache) PurgeSpendHint(ops ...wire.OutPoint) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
for _, op := range ops {
|
||||||
|
delete(c.spendHints, op)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockHintCache) CommitConfirmHint(heightHint uint32, txids ...chainhash.Hash) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
for _, txid := range txids {
|
||||||
|
c.confHints[txid] = heightHint
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockHintCache) QueryConfirmHint(txid chainhash.Hash) (uint32, error) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
hint, ok := c.confHints[txid]
|
||||||
|
if !ok {
|
||||||
|
return 0, chainntnfs.ErrConfirmHintNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return hint, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockHintCache) PurgeConfirmHint(txids ...chainhash.Hash) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
for _, txid := range txids {
|
||||||
|
delete(c.confHints, txid)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockHintCache() *mockHintCache {
|
||||||
|
return &mockHintCache{
|
||||||
|
confHints: make(map[chainhash.Hash]uint32),
|
||||||
|
spendHints: make(map[wire.OutPoint]uint32),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestTxConfFutureDispatch tests that the TxConfNotifier dispatches
|
// TestTxConfFutureDispatch tests that the TxConfNotifier dispatches
|
||||||
// registered notifications when the transaction confirms after registration.
|
// registered notifications when the transaction confirms after registration.
|
||||||
func TestTxConfFutureDispatch(t *testing.T) {
|
func TestTxConfFutureDispatch(t *testing.T) {
|
||||||
@@ -27,7 +112,8 @@ func TestTxConfFutureDispatch(t *testing.T) {
|
|||||||
tx3 = wire.MsgTx{Version: 3}
|
tx3 = wire.MsgTx{Version: 3}
|
||||||
)
|
)
|
||||||
|
|
||||||
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
|
hintCache := newMockHintCache()
|
||||||
|
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache)
|
||||||
|
|
||||||
// Create the test transactions and register them with the
|
// Create the test transactions and register them with the
|
||||||
// TxConfNotifier before including them in a block to receive future
|
// TxConfNotifier before including them in a block to receive future
|
||||||
@@ -200,7 +286,8 @@ func TestTxConfHistoricalDispatch(t *testing.T) {
|
|||||||
tx3 = wire.MsgTx{Version: 3}
|
tx3 = wire.MsgTx{Version: 3}
|
||||||
)
|
)
|
||||||
|
|
||||||
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
|
hintCache := newMockHintCache()
|
||||||
|
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache)
|
||||||
|
|
||||||
// Create the test transactions at a height before the TxConfNotifier's
|
// Create the test transactions at a height before the TxConfNotifier's
|
||||||
// starting height so that they are confirmed once registering them.
|
// starting height so that they are confirmed once registering them.
|
||||||
@@ -351,7 +438,8 @@ func TestTxConfChainReorg(t *testing.T) {
|
|||||||
tx3 = wire.MsgTx{Version: 3}
|
tx3 = wire.MsgTx{Version: 3}
|
||||||
)
|
)
|
||||||
|
|
||||||
txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100)
|
hintCache := newMockHintCache()
|
||||||
|
txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100, hintCache)
|
||||||
|
|
||||||
// Tx 1 will be confirmed in block 9 and requires 2 confs.
|
// Tx 1 will be confirmed in block 9 and requires 2 confs.
|
||||||
tx1Hash := tx1.TxHash()
|
tx1Hash := tx1.TxHash()
|
||||||
@@ -586,6 +674,147 @@ func TestTxConfChainReorg(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestTxConfHeightHintCache ensures that the height hints for transactions are
|
||||||
|
// kept track of correctly with each new block connected/disconnected.
|
||||||
|
func TestTxConfHeightHintCache(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const (
|
||||||
|
startingHeight = 10
|
||||||
|
tx1Height = 11
|
||||||
|
tx2Height = 12
|
||||||
|
)
|
||||||
|
|
||||||
|
// Initialize our TxConfNotifier instance backed by a height hint cache.
|
||||||
|
hintCache := newMockHintCache()
|
||||||
|
txConfNotifier := chainntnfs.NewTxConfNotifier(
|
||||||
|
startingHeight, 100, hintCache,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create two test transactions and register them for notifications.
|
||||||
|
tx1 := wire.MsgTx{Version: 1}
|
||||||
|
tx1Hash := tx1.TxHash()
|
||||||
|
ntfn1 := &chainntnfs.ConfNtfn{
|
||||||
|
TxID: &tx1Hash,
|
||||||
|
NumConfirmations: 1,
|
||||||
|
Event: chainntnfs.NewConfirmationEvent(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
tx2 := wire.MsgTx{Version: 2}
|
||||||
|
tx2Hash := tx2.TxHash()
|
||||||
|
ntfn2 := &chainntnfs.ConfNtfn{
|
||||||
|
TxID: &tx2Hash,
|
||||||
|
NumConfirmations: 2,
|
||||||
|
Event: chainntnfs.NewConfirmationEvent(2),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := txConfNotifier.Register(ntfn1); err != nil {
|
||||||
|
t.Fatalf("unable to register tx1: %v", err)
|
||||||
|
}
|
||||||
|
if err := txConfNotifier.Register(ntfn2); err != nil {
|
||||||
|
t.Fatalf("unable to register tx2: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Both transactions should have a height hint of the starting height
|
||||||
|
// due to registering notifications for them.
|
||||||
|
hint, err := hintCache.QueryConfirmHint(tx1Hash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to query for hint: %v", err)
|
||||||
|
}
|
||||||
|
if hint != startingHeight {
|
||||||
|
t.Fatalf("expected hint %d, got %d", startingHeight, hint)
|
||||||
|
}
|
||||||
|
|
||||||
|
hint, err = hintCache.QueryConfirmHint(tx2Hash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to query for hint: %v", err)
|
||||||
|
}
|
||||||
|
if hint != startingHeight {
|
||||||
|
t.Fatalf("expected hint %d, got %d", startingHeight, hint)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new block that will include the first transaction and extend
|
||||||
|
// the chain.
|
||||||
|
block1 := btcutil.NewBlock(&wire.MsgBlock{
|
||||||
|
Transactions: []*wire.MsgTx{&tx1},
|
||||||
|
})
|
||||||
|
|
||||||
|
err = txConfNotifier.ConnectTip(
|
||||||
|
block1.Hash(), tx1Height, block1.Transactions(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to connect block: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The height hint for the first transaction should now be updated to
|
||||||
|
// reflect its confirmation.
|
||||||
|
hint, err = hintCache.QueryConfirmHint(tx1Hash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to query for hint: %v", err)
|
||||||
|
}
|
||||||
|
if hint != tx1Height {
|
||||||
|
t.Fatalf("expected hint %d, got %d", tx1Height, hint)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The height hint for the second transaction should also be updated due
|
||||||
|
// to it still being unconfirmed.
|
||||||
|
hint, err = hintCache.QueryConfirmHint(tx2Hash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to query for hint: %v", err)
|
||||||
|
}
|
||||||
|
if hint != tx1Height {
|
||||||
|
t.Fatalf("expected hint %d, got %d", tx1Height, hint)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, we'll create another block that will include the second
|
||||||
|
// transaction and extend the chain.
|
||||||
|
block2 := btcutil.NewBlock(&wire.MsgBlock{
|
||||||
|
Transactions: []*wire.MsgTx{&tx2},
|
||||||
|
})
|
||||||
|
|
||||||
|
err = txConfNotifier.ConnectTip(
|
||||||
|
block2.Hash(), tx2Height, block2.Transactions(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to connect block: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The height hint for the first transaction should remain the same.
|
||||||
|
hint, err = hintCache.QueryConfirmHint(tx1Hash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to query for hint: %v", err)
|
||||||
|
}
|
||||||
|
if hint != tx1Height {
|
||||||
|
t.Fatalf("expected hint %d, got %d", tx1Height, hint)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The height hint for the second transaction should now be updated to
|
||||||
|
// reflect its confirmation.
|
||||||
|
hint, err = hintCache.QueryConfirmHint(tx2Hash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to query for hint: %v", err)
|
||||||
|
}
|
||||||
|
if hint != tx2Height {
|
||||||
|
t.Fatalf("expected hint %d, got %d", tx2Height, hint)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, we'll attempt do disconnect the last block in order to simulate
|
||||||
|
// a chain reorg.
|
||||||
|
if err := txConfNotifier.DisconnectTip(tx2Height); err != nil {
|
||||||
|
t.Fatalf("Failed to disconnect block: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should update the second transaction's height hint within the
|
||||||
|
// cache to the previous height.
|
||||||
|
hint, err = hintCache.QueryConfirmHint(tx2Hash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to query for hint: %v", err)
|
||||||
|
}
|
||||||
|
if hint != tx1Height {
|
||||||
|
t.Fatalf("expected hint %d, got %d", tx1Height, hint)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestTxConfTearDown(t *testing.T) {
|
func TestTxConfTearDown(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
@@ -594,7 +823,8 @@ func TestTxConfTearDown(t *testing.T) {
|
|||||||
tx2 = wire.MsgTx{Version: 2}
|
tx2 = wire.MsgTx{Version: 2}
|
||||||
)
|
)
|
||||||
|
|
||||||
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100)
|
hintCache := newMockHintCache()
|
||||||
|
txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache)
|
||||||
|
|
||||||
// Create the test transactions and register them with the
|
// Create the test transactions and register them with the
|
||||||
// TxConfNotifier to receive notifications.
|
// TxConfNotifier to receive notifications.
|
||||||
|
Reference in New Issue
Block a user