diff --git a/routing/chainview/btcd.go b/routing/chainview/btcd.go index 41cc57362..04c4ce842 100644 --- a/routing/chainview/btcd.go +++ b/routing/chainview/btcd.go @@ -1,14 +1,17 @@ package chainview import ( + "bytes" + "encoding/hex" "fmt" "sync" "sync/atomic" - "time" + "github.com/roasbeef/btcd/btcjson" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/rpcclient" "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" ) // BtcdFilteredChainView is an implementation of the FilteredChainView @@ -17,34 +20,27 @@ type BtcdFilteredChainView struct { started int32 stopped int32 - // bestHash is the hash of the latest block in the main chain. - bestHash chainhash.Hash - - // bestHeight is the height of the latest block in the main chain. - bestHeight int32 + // bestHeight is the height of the latest block added to the + // blockQueue from the onFilteredConnectedMethod. It is used to + // determine up to what height we would need to rescan in case + // of a filter update. + bestHeightMtx sync.Mutex + bestHeight uint32 btcdConn *rpcclient.Client - // newBlocks is the channel in which new filtered blocks are sent over. - newBlocks chan *FilteredBlock - - // staleBlocks is the channel in which blocks that have been - // disconnected from the mainchain are sent over. - staleBlocks chan *FilteredBlock + // blockEventQueue is the ordered queue used to keep the order + // of connected and disconnected blocks sent to the reader of the + // chainView. + blockQueue *blockEventQueue // filterUpdates is a channel in which updates to the utxo filter // attached to this instance are sent over. filterUpdates chan filterUpdate - // The three field below are used to implement a synchronized queue - // that lets use instantly handle sent notifications without blocking - // the main websockets notification loop. - chainUpdates []*chainUpdate - chainUpdateSignal chan struct{} - chainUpdateMtx sync.Mutex - // chainFilter is the set of utox's that we're currently watching // spends for within the chain. + filterMtx sync.RWMutex chainFilter map[wire.OutPoint]struct{} // filterBlockReqs is a channel in which requests to filter select @@ -63,18 +59,15 @@ var _ FilteredChainView = (*BtcdFilteredChainView)(nil) // RPC credentials for an active btcd instance. func NewBtcdFilteredChainView(config rpcclient.ConnConfig) (*BtcdFilteredChainView, error) { chainView := &BtcdFilteredChainView{ - newBlocks: make(chan *FilteredBlock), - staleBlocks: make(chan *FilteredBlock), - chainUpdateSignal: make(chan struct{}), - chainFilter: make(map[wire.OutPoint]struct{}), - filterUpdates: make(chan filterUpdate), - filterBlockReqs: make(chan *filterBlockReq), - quit: make(chan struct{}), + chainFilter: make(map[wire.OutPoint]struct{}), + filterUpdates: make(chan filterUpdate), + filterBlockReqs: make(chan *filterBlockReq), + quit: make(chan struct{}), } ntfnCallbacks := &rpcclient.NotificationHandlers{ - OnBlockConnected: chainView.onBlockConnected, - OnBlockDisconnected: chainView.onBlockDisconnected, + OnFilteredBlockConnected: chainView.onFilteredBlockConnected, + OnFilteredBlockDisconnected: chainView.onFilteredBlockDisconnected, } // Disable connecting to btcd within the rpcclient.New method. We @@ -87,6 +80,8 @@ func NewBtcdFilteredChainView(config rpcclient.ConnConfig) (*BtcdFilteredChainVi } chainView.btcdConn = chainConn + chainView.blockQueue = newBlockEventQueue() + return chainView, nil } @@ -110,12 +105,16 @@ func (b *BtcdFilteredChainView) Start() error { return err } - bestHash, bestHeight, err := b.btcdConn.GetBestBlock() + _, bestHeight, err := b.btcdConn.GetBestBlock() if err != nil { return err } - b.bestHash, b.bestHeight = *bestHash, bestHeight + b.bestHeightMtx.Lock() + b.bestHeight = uint32(bestHeight) + b.bestHeightMtx.Unlock() + + b.blockQueue.Start() b.wg.Add(1) go b.chainFilterer() @@ -137,6 +136,8 @@ func (b *BtcdFilteredChainView) Stop() error { // cleans up all related resources. b.btcdConn.Shutdown() + b.blockQueue.Stop() + log.Infof("FilteredChainView stopping") close(b.quit) @@ -145,39 +146,68 @@ func (b *BtcdFilteredChainView) Stop() error { return nil } -// chainUpdate encapsulates an update to the current main chain. This struct is -// used as an element within an unbounded queue in order to avoid blocking the -// main rpc dispatch rule. -type chainUpdate struct { - blockHash *chainhash.Hash - blockHeight int32 +// onFilteredBlockConnected is called for each block that's connected to the +// end of the main chain. Based on our current chain filter, the block may or +// may not include any relevant transactions. +func (b *BtcdFilteredChainView) onFilteredBlockConnected(height int32, + header *wire.BlockHeader, txns []*btcutil.Tx) { + + mtxs := make([]*wire.MsgTx, len(txns)) + for i, tx := range txns { + mtx := tx.MsgTx() + mtxs[i] = mtx + + for _, txIn := range mtx.TxIn { + // We can delete this outpoint from the chainFilter, as + // we just received a block where it was spent. In case + // of a reorg, this outpoint might get "un-spent", but + // that's okay since it would never be wise to consider + // the channel open again (since a spending transaction + // exists on the network). + b.filterMtx.Lock() + delete(b.chainFilter, txIn.PreviousOutPoint) + b.filterMtx.Unlock() + } + + } + + // We record the height of the last connected block added to the + // blockQueue such that we can scan up to this height in case of + // a rescan. It must be protected by a mutex since a filter update + // might be trying to read it concurrently. + b.bestHeightMtx.Lock() + b.bestHeight = uint32(height) + b.bestHeightMtx.Unlock() + + block := &FilteredBlock{ + Hash: header.BlockHash(), + Height: uint32(height), + Transactions: mtxs, + } + + b.blockQueue.Add(&blockEvent{ + eventType: connected, + block: block, + }) } -// onBlockConnected implements on OnBlockConnected callback for rpcclient. -// Ingesting a block updates the wallet's internal utxo state based on the -// outputs created and destroyed within each block. -func (b *BtcdFilteredChainView) onBlockConnected(hash *chainhash.Hash, - height int32, t time.Time) { +// onFilteredBlockDisconnected is a callback which is executed once a block is +// disconnected from the end of the main chain. +func (b *BtcdFilteredChainView) onFilteredBlockDisconnected(height int32, + header *wire.BlockHeader) { - // Append this new chain update to the end of the queue of new chain - // updates. - b.chainUpdateMtx.Lock() - b.chainUpdates = append(b.chainUpdates, &chainUpdate{hash, height}) - b.chainUpdateMtx.Unlock() + log.Debugf("got disconnected block at height %d: %v", height, + header.BlockHash()) - // Launch a goroutine to signal the notification dispatcher that a new - // block update is available. We do this in a new goroutine in order to - // avoid blocking the main loop of the rpc client. - go func() { - b.chainUpdateSignal <- struct{}{} - }() -} + filteredBlock := &FilteredBlock{ + Hash: header.BlockHash(), + Height: uint32(height), + } -// onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. -func (b *BtcdFilteredChainView) onBlockDisconnected(hash *chainhash.Hash, - height int32, t time.Time) { - - // TODO(roasbeef): impl + b.blockQueue.Add(&blockEvent{ + eventType: disconnected, + block: filteredBlock, + }) } // filterBlockReq houses a request to manually filter a block specified by @@ -231,7 +261,9 @@ func (b *BtcdFilteredChainView) chainFilterer() { if _, ok := b.chainFilter[prevOp]; ok { filteredTxns = append(filteredTxns, tx) + b.filterMtx.Lock() delete(b.chainFilter, prevOp) + b.filterMtx.Unlock() break } @@ -241,87 +273,118 @@ func (b *BtcdFilteredChainView) chainFilterer() { return filteredTxns } + decodeJSONBlock := func(block *btcjson.RescannedBlock, + height uint32) (*FilteredBlock, error) { + hash, err := chainhash.NewHashFromStr(block.Hash) + if err != nil { + return nil, err + + } + txs := make([]*wire.MsgTx, 0, len(block.Transactions)) + for _, str := range block.Transactions { + b, err := hex.DecodeString(str) + if err != nil { + return nil, err + } + tx := &wire.MsgTx{} + err = tx.Deserialize(bytes.NewReader(b)) + if err != nil { + return nil, err + } + txs = append(txs, tx) + } + return &FilteredBlock{ + Hash: *hash, + Height: height, + Transactions: txs, + }, nil + } + for { select { - - // A new block has been connected to the end of the main chain. - // So we'll need to dispatch a new FilteredBlock notification. - case <-b.chainUpdateSignal: - // A new update is available, so pop the new chain - // update from the front of the update queue. - b.chainUpdateMtx.Lock() - update := b.chainUpdates[0] - b.chainUpdates[0] = nil // Set to nil to prevent GC leak. - b.chainUpdates = b.chainUpdates[1:] - b.chainUpdateMtx.Unlock() - - // Now that we have the new block has, fetch the new - // block itself. - newBlock, err := b.btcdConn.GetBlock(update.blockHash) - if err != nil { - log.Errorf("Unable to get block: %v", err) - continue - } - b.bestHash, b.bestHeight = *update.blockHash, update.blockHeight - - // Next, we'll scan this block to see if it modified - // any of the UTXO set that we're watching. - filteredTxns := filterBlock(newBlock) - - // Finally, launch a goroutine to dispatch this - // filtered block notification. - go func() { - b.newBlocks <- &FilteredBlock{ - Hash: *update.blockHash, - Height: uint32(update.blockHeight), - Transactions: filteredTxns, - } - }() - // The caller has just sent an update to the current chain // filter, so we'll apply the update, possibly rewinding our // state partially. case update := <-b.filterUpdates: + // First, we'll add all the new UTXO's to the set of // watched UTXO's, eliminating any duplicates in the // process. log.Debugf("Updating chain filter with new UTXO's: %v", update.newUtxos) for _, newOp := range update.newUtxos { + b.filterMtx.Lock() b.chainFilter[newOp] = struct{}{} + b.filterMtx.Unlock() } + // Apply the new TX filter to btcd, which will cause + // all following notifications from and calls to it + // return blocks filtered with the new filter. + b.btcdConn.LoadTxFilter(false, []btcutil.Address{}, + update.newUtxos) + + // All blocks gotten after we loaded the filter will + // have the filter applied, but we will need to rescan + // the blocks up to the height of the block we last + // added to the blockQueue. + b.bestHeightMtx.Lock() + bestHeight := b.bestHeight + b.bestHeightMtx.Unlock() + // If the update height matches our best known height, // then we don't need to do any rewinding. - if update.updateHeight == uint32(b.bestHeight) { + if update.updateHeight == bestHeight { continue } // Otherwise, we'll rewind the state to ensure the // caller doesn't miss any relevant notifications. // Starting from the height _after_ the update height, - // we'll walk forwards, manually filtering blocks. - for i := int32(update.updateHeight) + 1; i < b.bestHeight+1; i++ { + // we'll walk forwards, rescanning one block at a time + // with btcd applying the newly loaded filter to each + // block. + for i := update.updateHeight + 1; i < bestHeight+1; i++ { blockHash, err := b.btcdConn.GetBlockHash(int64(i)) if err != nil { - log.Errorf("Unable to get block hash: %v", err) + log.Warnf("Unable to get block hash "+ + "for block at height %d: %v", + i, err) continue } - block, err := b.btcdConn.GetBlock(blockHash) + + // To avoid dealing with the case where a reorg + // is happening while we rescan, we scan one + // block at a time, skipping blocks that might + // have gone missing. + rescanned, err := b.btcdConn.RescanBlocks( + []chainhash.Hash{*blockHash}) if err != nil { - log.Errorf("Unable to get block: %v", err) + log.Warnf("Unable to rescan block "+ + "with hash %v at height %d: %v", + blockHash, i, err) continue } - filteredTxns := filterBlock(block) - - go func(height uint32) { - b.newBlocks <- &FilteredBlock{ - Hash: *blockHash, - Height: height, - Transactions: filteredTxns, - } - }(uint32(i)) + // If no block was returned from the rescan, + // it means no maching transactions were found. + if len(rescanned) != 1 { + log.Debugf("no matching block found "+ + "for rescan of hash %v", + blockHash) + continue + } + decoded, err := decodeJSONBlock( + &rescanned[0], uint32(i)) + if err != nil { + log.Errorf("Unable to decode block: %v", + err) + continue + } + b.blockQueue.Add(&blockEvent{ + eventType: connected, + block: decoded, + }) } // We've received a new request to manually filter a block. @@ -393,7 +456,7 @@ func (b *BtcdFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight u // // NOTE: This is part of the FilteredChainView interface. func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { - return b.newBlocks + return b.blockQueue.newBlocks } // DisconnectedBlocks returns a receive only channel which will be sent upon @@ -402,5 +465,5 @@ func (b *BtcdFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { // // NOTE: This is part of the FilteredChainView interface. func (b *BtcdFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock { - return b.staleBlocks + return b.blockQueue.staleBlocks }