diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index bc85d1c9d..193d3d85d 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -2,7 +2,6 @@ package btcdnotify import ( "container/heap" - "strings" "sync" "sync/atomic" "time" @@ -212,6 +211,8 @@ func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetai // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. func (b *BtcdNotifier) notificationDispatcher() { + var currentHeight int32 + out: for { select { @@ -226,9 +227,14 @@ out: chainntnfs.Log.Infof("New confirmations "+ "subscription: txid=%v, numconfs=%v", *msg.txid, msg.numConfirmations) - // TODO(roasbeef): perform a N-block look - // behind to catch race-condition due to faster - // inter-block time? + + // If the notification can be partially or + // fully dispatched, then we can skip the first + // phase for ntfns. + if b.attemptHistoricalDispatch(msg, currentHeight) { + continue + } + txid := *msg.txid b.confNotifications[txid] = append(b.confNotifications[txid], msg) case *blockEpochRegistration: @@ -252,6 +258,8 @@ out: b.chainUpdates = b.chainUpdates[1:] b.chainUpdateMtx.Unlock() + currentHeight = update.blockHeight + newBlock, err := b.chainConn.GetBlock(update.blockHash) if err != nil { chainntnfs.Log.Errorf("Unable to get block: %v", err) @@ -329,6 +337,41 @@ out: b.wg.Done() } +// attemptHistoricalDispatch tries to use historical information to decide if a +// notificaiton ca be disptahced immediately, or is partially confirmed so it +// can skip straight to the confirmations heap. +func (b *BtcdNotifier) attemptHistoricalDispatch(msg *confirmationsNotification, + currentHeight int32) bool { + + // If the transaction already has some or all of the confirmations, + // then we may be able to + // dispatch it immediately. + tx, err := b.chainConn.GetRawTransactionVerbose(msg.txid) + if err != nil { + return false + } + + // If the transaction has more that enough confirmations, then we can + // dispatch it immediately after obtaininig for information w.r.t + // exaclty *when* if got all its confirmations. + if uint32(tx.Confirmations) >= msg.numConfirmations { + msg.finConf <- int32(tx.Confirmations) + return true + } + + // Otherwise, the transaciton has only been *partially* confirmed, so + // we need to insert it into the confirmationheap. + confsLeft := msg.numConfirmations - uint32(tx.Confirmations) + confHeight := uint32(currentHeight) + confsLeft + heapEntry := &confEntry{ + msg, + confHeight, + } + heap.Push(b.confHeap, heapEntry) + + return false +} + // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *wire.ShaHash) { @@ -506,19 +549,6 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash, b.notificationRegistry <- ntfn - // The following conditional checks transaction confirmation notification - // requests so that if the transaction has already been included in a block - // with the requested number of confirmations, the notification will be - // dispatched immediately. - tx, err := b.chainConn.GetRawTransactionVerbose(txid) - if err != nil { - if !strings.Contains(err.Error(), "No information") { - return nil, err - } - } else if uint32(tx.Confirmations) > numConfs { - ntfn.finConf <- int32(tx.Confirmations) - } - return &chainntnfs.ConfirmationEvent{ Confirmed: ntfn.finConf, NegativeConf: ntfn.negativeConf, diff --git a/chainntnfs/interface_test.go b/chainntnfs/interface_test.go index 8ab5b039c..66d378848 100644 --- a/chainntnfs/interface_test.go +++ b/chainntnfs/interface_test.go @@ -413,15 +413,15 @@ func testMultiClientConfirmationNotification(miner *rpctest.Harness, } // Tests the case in which a confirmation notification is requested for a -// transaction that has already been included in a block. In this case, -// the confirmation notification should be dispatched immediately. +// transaction that has already been included in a block. In this case, the +// confirmation notification should be dispatched immediately. func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, notifier chainntnfs.ChainNotifier, t *testing.T) { t.Logf("testing transaction confirmed before notification registration") - // First, let's send some coins to "ourself", obtainig a txid. - // We're spending from a coinbase output here, so we use the dedicated + // First, let's send some coins to "ourself", obtaining a txid. We're + // spending from a coinbase output here, so we use the dedicated // function. txid, err := getTestTxId(miner) @@ -429,10 +429,10 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("unable to create test tx: %v", err) } - // Now generate one block. The notifier must check older blocks when the - // confirmation event is registered below to ensure that the TXID hasn't - // already been included in the chain, otherwise the notification will - // never be sent. + // Now generate one block. The notifier must check older blocks when + // the confirmation event is registered below to ensure that the TXID + // hasn't already been included in the chain, otherwise the + // notification will never be sent. if _, err := miner.Node.Generate(1); err != nil { t.Fatalf("unable to generate two blocks: %v", err) } @@ -456,6 +456,48 @@ func testTxConfirmedBeforeNtfnRegistration(miner *rpctest.Harness, case <-time.After(2 * time.Second): t.Fatalf("confirmation notification never received") } + + // Next, we want to test fully dispatching the notification for a + // transaction that has been *partially* confirmed. So we'll create + // another test txid. + txid, err = getTestTxId(miner) + if err != nil { + t.Fatalf("unable to create test tx: %v", err) + } + + // We'll request 6 confirmations for the above generated txid, but we + // will generate the confirmations in chunks. + numConfs = 6 + + // First, generate 2 confirmations. + if _, err := miner.Node.Generate(2); err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + + // Next, register for the notification *after* the transition has + // already been partially confirmed. + confIntent, err = notifier.RegisterConfirmationsNtfn(txid, numConfs) + if err != nil { + t.Fatalf("unable to register ntfn: %v", err) + } + + // With the notification registered, generate another 4 blocks, this + // should dispatch the notification. + if _, err := miner.Node.Generate(4); err != nil { + t.Fatalf("unable to generate blocks: %v", err) + } + + confSent = make(chan int32) + go func() { + confSent <- <-confIntent.Confirmed + }() + + select { + case <-confSent: + break + case <-time.After(2 * time.Second): + t.Fatalf("confirmation notification never received") + } } // Tests the case in which a spend notification is requested for a spend that @@ -466,11 +508,10 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, t.Logf("testing spend broadcast before notification registration") - // We'd like to test the spend notifications for all - // ChainNotifier concrete implemenations. + // We'd like to test the spend notifications for all ChainNotifier + // concrete implementations. // - // To do so, we first create a new output to our test target - // address. + // To do so, we first create a new output to our test target address. txid, err := getTestTxId(miner) if err != nil { t.Fatalf("unable to create test addr: %v", err) @@ -488,8 +529,8 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, } tx := wrappedTx.MsgTx() - // Locate the output index sent to us. We need this so we can - // construct a spending txn below. + // Locate the output index sent to us. We need this so we can construct + // a spending txn below. outIndex := -1 var pkScript []byte for i, txOut := range tx.TxOut { @@ -534,8 +575,9 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, t.Fatalf("unable to generate single block: %v", err) } - // Now, we register to be notified of a spend that has already happened. - // The notifier should dispatch a spend notification immediately. + // Now, we register to be notified of a spend that has already + // happened. The notifier should dispatch a spend notification + // immediately. spentIntent, err := notifier.RegisterSpendNtfn(outpoint) if err != nil { t.Fatalf("unable to register for spend ntfn: %v", err)