diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index a93726e24..21e7ae172 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -207,6 +207,23 @@ func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t ti } } +// filteredBlock represents a new block which has been connected to the main +// chain. The slice of transactions will only be populated if the block +// includes a transaction that confirmed one of our watched txids, or spends +// one of the outputs currently being watched. +// TODO(halseth): this is currently used for complete blocks. Change to use +// onFilteredBlockConnected and onFilteredBlockDisconnected, making it easier +// to unify with the Neutrino implementation. +type filteredBlock struct { + hash chainhash.Hash + height uint32 + txns []*btcutil.Tx + + // connected is true if this update is a new block and false if it is a + // disconnected block. + connect bool +} + // onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) { // Append this new chain update to the end of the queue of new chain @@ -322,12 +339,15 @@ out: chainntnfs.Log.Infof("New block: height=%v, sha=%v", update.blockHeight, update.blockHash) - b.notifyBlockEpochs(update.blockHeight, update.blockHash) - txns := btcutil.NewBlock(rawBlock).Transactions() - err = b.txConfNotifier.ConnectTip(update.blockHash, - uint32(update.blockHeight), txns) - if err != nil { + + block := &filteredBlock{ + hash: *update.blockHash, + height: uint32(update.blockHeight), + txns: txns, + connect: true, + } + if err := b.handleBlockConnected(block); err != nil { chainntnfs.Log.Error(err) } continue @@ -350,6 +370,8 @@ out: chainntnfs.Log.Error(err) } + // NOTE: we currently only use txUpdates for mempool spends. It + // might get removed entirely in the future. case item := <-b.txUpdates.ChanOut(): newSpend := item.(*txUpdate) spendingTx := newSpend.tx @@ -381,7 +403,20 @@ out: spendDetails.SpendingHeight = currentHeight + 1 } - for _, ntfn := range clients { + // Keep spendNotifications that are + // waiting for a confirmation around. + // They will be notified when we find + // the spend within a block. + rem := make(map[uint64]*spendNotification) + for c, ntfn := range clients { + // If this client didn't want + // to be notified on mempool + // spends, store it for later. + if !ntfn.mempool { + rem[c] = ntfn + continue + } + chainntnfs.Log.Infof("Dispatching "+ "spend notification for "+ "outpoint=%v", ntfn.targetOutpoint) @@ -393,6 +428,12 @@ out: close(ntfn.spendChan) } delete(b.spendNotifications, prevOut) + + // If we had any clients left, add them + // back to the map. + if len(rem) > 0 { + b.spendNotifications[prevOut] = rem + } } } @@ -461,6 +502,65 @@ func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, return &txConf, nil } +// handleBlocksConnected applies a chain update for a new block. Any watched +// transactions included this block will processed to either send notifications +// now or after numConfirmations confs. +// TODO(halseth): this is reusing the neutrino notifier implementation, unify +// them. +func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { + // First we'll notify any subscribed clients of the block. + b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + + // Next, we'll scan over the list of relevant transactions and possibly + // dispatch notifications for confirmations and spends. + for _, tx := range newBlock.txns { + mtx := tx.MsgTx() + txSha := mtx.TxHash() + + for i, txIn := range mtx.TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does spend an output which we have a + // registered notification for, then create a spend summary, finally + // sending off the details to the notification subscriber. + clients, ok := b.spendNotifications[prevOut] + if !ok { + continue + } + + // TODO(roasbeef): many integration tests expect spend to be + // notified within the mempool. + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: &prevOut, + SpenderTxHash: &txSha, + SpendingTx: mtx, + SpenderInputIndex: uint32(i), + SpendingHeight: int32(newBlock.height), + } + + for _, ntfn := range clients { + chainntnfs.Log.Infof("Dispatching spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails + + // Close spendChan to ensure that any calls to Cancel will not + // block. This is safe to do since the channel is buffered, and + // the message can still be read by the receiver. + close(ntfn.spendChan) + } + + delete(b.spendNotifications, prevOut) + } + } + + // A new block has been connected to the main chain. + // Send out any N confirmation notifications which may + // have been triggered by this new block. + b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns) + + return nil +} + // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { @@ -489,6 +589,7 @@ type spendNotification struct { spendChan chan *chainntnfs.SpendDetail spendID uint64 + mempool bool } // spendCancel is a message sent to the BtcdNotifier when a client wishes to @@ -506,12 +607,13 @@ type spendCancel struct { // outpoint has been detected, the details of the spending event will be sent // across the 'Spend' channel. func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, - _ uint32) (*chainntnfs.SpendEvent, error) { + _ uint32, mempool bool) (*chainntnfs.SpendEvent, error) { ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), spendID: atomic.AddUint64(&b.spendClientCounter, 1), + mempool: mempool, } select {