diff --git a/src/bench/CMakeLists.txt b/src/bench/CMakeLists.txt index 16eb29250f5..2b006cb14b6 100644 --- a/src/bench/CMakeLists.txt +++ b/src/bench/CMakeLists.txt @@ -27,6 +27,7 @@ add_executable(bench_bitcoin gcs_filter.cpp hashpadding.cpp index_blockfilter.cpp + inputfetcher.cpp load_external.cpp lockedpool.cpp logging.cpp diff --git a/src/bench/inputfetcher.cpp b/src/bench/inputfetcher.cpp new file mode 100644 index 00000000000..66be4a6ff59 --- /dev/null +++ b/src/bench/inputfetcher.cpp @@ -0,0 +1,57 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static constexpr auto QUEUE_BATCH_SIZE{128}; +static constexpr auto DELAY{2ms}; + +//! Simulates a DB by adding a delay when calling GetCoin +class DelayedCoinsView : public CCoinsView +{ +private: + std::chrono::milliseconds m_delay; + +public: + DelayedCoinsView(std::chrono::milliseconds delay) : m_delay(delay) {} + + std::optional GetCoin(const COutPoint& outpoint) const override + { + UninterruptibleSleep(m_delay); + return Coin{}; + } + + bool BatchWrite(CoinsViewCacheCursor& cursor, const uint256 &hashBlock) override { return true; } +}; + +static void InputFetcherBenchmark(benchmark::Bench& bench) +{ + DataStream stream{benchmark::data::block413567}; + CBlock block; + stream >> TX_WITH_WITNESS(block); + + DelayedCoinsView db(DELAY); + CCoinsViewCache cache(&db); + + // The main thread should be counted to prevent thread oversubscription, and + // to decrease the variance of benchmark results. + const auto worker_threads_num{GetNumCores() - 1}; + InputFetcher fetcher{QUEUE_BATCH_SIZE, worker_threads_num}; + + bench.run([&] { + const auto ok{cache.Flush()}; + assert(ok); + fetcher.FetchInputs(cache, db, block); + }); +} + +BENCHMARK(InputFetcherBenchmark, benchmark::PriorityLevel::HIGH); diff --git a/src/coins.cpp b/src/coins.cpp index 24a102b0bc1..10eed6360d5 100644 --- a/src/coins.cpp +++ b/src/coins.cpp @@ -110,10 +110,15 @@ void CCoinsViewCache::AddCoin(const COutPoint &outpoint, Coin&& coin, bool possi (bool)it->second.coin.IsCoinBase()); } -void CCoinsViewCache::EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin) { - cachedCoinsUsage += coin.DynamicMemoryUsage(); +void CCoinsViewCache::EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin, bool set_dirty) { + const auto mem_usage{coin.DynamicMemoryUsage()}; auto [it, inserted] = cacheCoins.try_emplace(std::move(outpoint), std::move(coin)); - if (inserted) CCoinsCacheEntry::SetDirty(*it, m_sentinel); + if (inserted) { + cachedCoinsUsage += mem_usage; + if (set_dirty) { + CCoinsCacheEntry::SetDirty(*it, m_sentinel); + } + } } void AddCoins(CCoinsViewCache& cache, const CTransaction &tx, int nHeight, bool check_for_overwrite) { diff --git a/src/coins.h b/src/coins.h index 61fb4af6420..71049c89e69 100644 --- a/src/coins.h +++ b/src/coins.h @@ -423,12 +423,13 @@ public: /** * Emplace a coin into cacheCoins without performing any checks, marking - * the emplaced coin as dirty. + * the emplaced coin as dirty unless `set_dirty` is `false`. * - * NOT FOR GENERAL USE. Used only when loading coins from a UTXO snapshot. + * NOT FOR GENERAL USE. Used when loading coins from a UTXO snapshot, and + * in the InputFetcher. * @sa ChainstateManager::PopulateAndValidateSnapshot() */ - void EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin); + void EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin, bool set_dirty = true); /** * Spend a coin. Pass moveto in order to get the deleted data. diff --git a/src/inputfetcher.h b/src/inputfetcher.h new file mode 100644 index 00000000000..84ee0f4cdc0 --- /dev/null +++ b/src/inputfetcher.h @@ -0,0 +1,246 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_INPUTFETCHER_H +#define BITCOIN_INPUTFETCHER_H + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/** + * Input fetcher for fetching inputs from the CoinsDB and inserting + * into the CoinsTip. + * + * The main thread loops through the block and writes all input prevouts to a + * global vector. It then wakes all workers and starts working as well. Each + * thread assigns itself a range of outpoints from the shared vector, and + * fetches the coins from disk. The outpoint and coin pairs are written to a + * thread local vector of pairs. Once all outpoints are fetched, the main thread + * loops through all thread local vectors and writes the pairs to the cache. + */ +class InputFetcher +{ +private: + //! Mutex to protect the inner state + Mutex m_mutex{}; + //! Worker threads block on this when out of work + std::condition_variable m_worker_cv{}; + //! Main thread blocks on this when out of work + std::condition_variable m_main_cv{}; + + /** + * The outpoints to be fetched from disk. + * This is written to on the main thread, then read from all worker + * threads only after the main thread is done writing. Hence, it doesn't + * need to be guarded by a lock. + */ + std::vector m_outpoints{}; + /** + * The index of the last outpoint that is being fetched. Workers assign + * themselves a range of outpoints to fetch from m_outpoints. They will use + * this index as the end of their range, and then set this index to the + * beginning of the range they take for the next worker. Once it gets to + * zero, all outpoints have been assigned and the next worker will wait. + */ + size_t m_last_outpoint_index GUARDED_BY(m_mutex){0}; + + //! The set of txids of the transactions in the current block being fetched. + std::unordered_set m_txids{}; + //! The vector of thread local vectors of pairs to be written to the cache. + std::vector>> m_pairs{}; + + /** + * Number of outpoint fetches that haven't completed yet. + * This includes outpoints that have already been assigned, but are still in + * the worker's own batches. + */ + int32_t m_in_flight_outpoints_count GUARDED_BY(m_mutex){0}; + //! The number of worker threads that are waiting on m_worker_cv + int32_t m_idle_worker_count GUARDED_BY(m_mutex){0}; + //! The maximum number of outpoints to be assigned in one batch + const int32_t m_batch_size; + //! DB coins view to fetch from. + const CCoinsView* m_db{nullptr}; + //! The cache to check if we already have this input. + const CCoinsViewCache* m_cache{nullptr}; + + std::vector m_worker_threads; + bool m_request_stop GUARDED_BY(m_mutex){false}; + + //! Internal function that does the fetching from disk. + void Loop(int32_t index, bool is_main_thread = false) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + auto local_batch_size{0}; + auto end_index{0}; + auto& cond{is_main_thread ? m_main_cv : m_worker_cv}; + do { + { + WAIT_LOCK(m_mutex, lock); + // first do the clean-up of the previous loop run (allowing us to do + // it in the same critsect) local_batch_size will only be + // truthy after first run. + if (local_batch_size) { + m_in_flight_outpoints_count -= local_batch_size; + if (!is_main_thread && m_in_flight_outpoints_count == 0) { + m_main_cv.notify_one(); + } + } + + // logically, the do loop starts here + while (m_last_outpoint_index == 0) { + if ((is_main_thread && m_in_flight_outpoints_count == 0) || m_request_stop) { + return; + } + ++m_idle_worker_count; + cond.wait(lock); + --m_idle_worker_count; + } + + // Assign a batch of outpoints to this thread + local_batch_size = std::max(1, std::min(m_batch_size, + static_cast(m_last_outpoint_index / + (m_worker_threads.size() + 1 + m_idle_worker_count)))); + end_index = m_last_outpoint_index; + m_last_outpoint_index -= local_batch_size; + } + + auto& local_pairs{m_pairs[index]}; + local_pairs.reserve(local_pairs.size() + local_batch_size); + try { + for (auto i{end_index - local_batch_size}; i < end_index; ++i) { + const auto& outpoint{m_outpoints[i]}; + // If an input spends an outpoint from earlier in the + // block, it won't be in the cache yet but it also won't be + // in the db either. + if (m_txids.contains(outpoint.hash)) { + continue; + } + if (m_cache->HaveCoinInCache(outpoint)) { + continue; + } + if (auto coin{m_db->GetCoin(outpoint)}; coin) { + local_pairs.emplace_back(outpoint, std::move(*coin)); + } else { + // Missing an input. This block will fail validation. + // Skip remaining outpoints and continue so main thread + // can proceed. + LOCK(m_mutex); + m_in_flight_outpoints_count -= m_last_outpoint_index; + m_last_outpoint_index = 0; + break; + } + } + } catch (const std::runtime_error&) { + // Database error. This will be handled later in validation. + // Skip remaining outpoints and continue so main thread + // can proceed. + LOCK(m_mutex); + m_in_flight_outpoints_count -= m_last_outpoint_index; + m_last_outpoint_index = 0; + } + } while (true); + } + +public: + + //! Create a new input fetcher + explicit InputFetcher(int32_t batch_size, int32_t worker_thread_count) noexcept + : m_batch_size(batch_size) + { + if (worker_thread_count < 1) { + // Don't do anything if there are no worker threads. + return; + } + m_pairs.reserve(worker_thread_count + 1); + for (auto n{0}; n < worker_thread_count + 1; ++n) { + m_pairs.emplace_back(); + } + m_worker_threads.reserve(worker_thread_count); + for (auto n{0}; n < worker_thread_count; ++n) { + m_worker_threads.emplace_back([this, n]() { + util::ThreadRename(strprintf("inputfetch.%i", n)); + Loop(n); + }); + } + } + + // Since this class manages its own resources, which is a thread + // pool `m_worker_threads`, copy and move operations are not appropriate. + InputFetcher(const InputFetcher&) = delete; + InputFetcher& operator=(const InputFetcher&) = delete; + InputFetcher(InputFetcher&&) = delete; + InputFetcher& operator=(InputFetcher&&) = delete; + + //! Fetch all block inputs from db, and insert into cache. + void FetchInputs(CCoinsViewCache& cache, + const CCoinsView& db, + const CBlock& block) noexcept + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + if (m_worker_threads.empty() || block.vtx.size() <= 1) { + return; + } + + // Set the db and cache to use for this block. + m_db = &db; + m_cache = &cache; + + // Loop through the inputs of the block and add them to the queue + m_txids.reserve(block.vtx.size() - 1); + for (const auto& tx : block.vtx) { + if (tx->IsCoinBase()) { + continue; + } + m_outpoints.reserve(m_outpoints.size() + tx->vin.size()); + for (const auto& in : tx->vin) { + m_outpoints.emplace_back(in.prevout); + } + m_txids.emplace(tx->GetHash()); + } + { + LOCK(m_mutex); + m_in_flight_outpoints_count = m_outpoints.size(); + m_last_outpoint_index = m_outpoints.size(); + } + m_worker_cv.notify_all(); + + // Have the main thread work too while we wait for other threads + Loop(m_worker_threads.size(), /*is_main_thread=*/true); + + // At this point all threads are done writing to m_pairs, so we can + // safely read from it and insert the fetched coins into the cache. + for (auto& local_pairs : m_pairs) { + for (auto&& [outpoint, coin] : local_pairs) { + cache.EmplaceCoinInternalDANGER(std::move(outpoint), + std::move(coin), + /*set_dirty=*/false); + } + local_pairs.clear(); + } + m_txids.clear(); + m_outpoints.clear(); + } + + ~InputFetcher() + { + WITH_LOCK(m_mutex, m_request_stop = true); + m_worker_cv.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); + } + } +}; + +#endif // BITCOIN_INPUTFETCHER_H diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 336377331d9..c1b966a3d8c 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -47,6 +47,7 @@ add_executable(test_bitcoin headers_sync_chainwork_tests.cpp httpserver_tests.cpp i2p_tests.cpp + inputfetcher_tests.cpp interfaces_tests.cpp key_io_tests.cpp key_tests.cpp diff --git a/src/test/fuzz/CMakeLists.txt b/src/test/fuzz/CMakeLists.txt index e99c6d91f47..b0dd23a1d77 100644 --- a/src/test/fuzz/CMakeLists.txt +++ b/src/test/fuzz/CMakeLists.txt @@ -53,6 +53,7 @@ add_executable(fuzz hex.cpp http_request.cpp i2p.cpp + inputfetcher.cpp integer.cpp key.cpp key_io.cpp diff --git a/src/test/fuzz/inputfetcher.cpp b/src/test/fuzz/inputfetcher.cpp new file mode 100644 index 00000000000..5cb3e0f4cc6 --- /dev/null +++ b/src/test/fuzz/inputfetcher.cpp @@ -0,0 +1,151 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using DbMap = std::map, bool>>; + +class DbCoinsView : public CCoinsView +{ +private: + DbMap& m_map; + +public: + DbCoinsView(DbMap& map) : m_map(map) {} + + std::optional GetCoin(const COutPoint& outpoint) const override + { + const auto it{m_map.find(outpoint)}; + assert(it != m_map.end()); + const auto [coin, err] = it->second; + if (err) { + throw std::runtime_error("database error"); + } + return coin; + } +}; + +class NoAccessCoinsView : public CCoinsView +{ +public: + std::optional GetCoin(const COutPoint& outpoint) const override + { + abort(); + } +}; + +FUZZ_TARGET(inputfetcher) +{ + FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size()); + + const auto batch_size{ + fuzzed_data_provider.ConsumeIntegralInRange(0, 1024)}; + const auto worker_threads{ + fuzzed_data_provider.ConsumeIntegralInRange(2, 4)}; + InputFetcher fetcher{batch_size, worker_threads}; + + LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) { + CBlock block; + Txid prevhash{Txid::FromUint256(ConsumeUInt256(fuzzed_data_provider))}; + + DbMap db_map{}; + std::map cache_map{}; + + DbCoinsView db(db_map); + + NoAccessCoinsView back; + CCoinsViewCache cache(&back); + + LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), static_cast(batch_size * worker_threads * 2)) { + CMutableTransaction tx; + + LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10) { + const auto txid{fuzzed_data_provider.ConsumeBool() + ? Txid::FromUint256(ConsumeUInt256(fuzzed_data_provider)) + : prevhash}; + const auto index{fuzzed_data_provider.ConsumeIntegral()}; + const COutPoint outpoint(txid, index); + + tx.vin.emplace_back(outpoint); + + std::optional maybe_coin; + if (fuzzed_data_provider.ConsumeBool()) { + Coin coin{}; + coin.fCoinBase = fuzzed_data_provider.ConsumeBool(); + coin.nHeight = + fuzzed_data_provider.ConsumeIntegralInRange( + 0, std::numeric_limits::max()); + coin.out.nValue = ConsumeMoney(fuzzed_data_provider); + maybe_coin = coin; + } else { + maybe_coin = std::nullopt; + } + db_map.try_emplace(outpoint, std::make_pair( + maybe_coin, + fuzzed_data_provider.ConsumeBool())); + + // Add the coin to the cache + if (fuzzed_data_provider.ConsumeBool()) { + Coin coin{}; + coin.fCoinBase = fuzzed_data_provider.ConsumeBool(); + coin.nHeight = + fuzzed_data_provider.ConsumeIntegralInRange( + 0, std::numeric_limits::max()); + coin.out.nValue = + fuzzed_data_provider.ConsumeIntegralInRange( + -1, MAX_MONEY); + cache_map.try_emplace(outpoint, coin); + cache.EmplaceCoinInternalDANGER( + COutPoint(outpoint), + std::move(coin), + /*set_dirty=*/fuzzed_data_provider.ConsumeBool()); + } + } + + prevhash = tx.GetHash(); + block.vtx.push_back(MakeTransactionRef(tx)); + } + + fetcher.FetchInputs(cache, db, block); + + for (const auto& [outpoint, pair] : db_map) { + // Check pre-existing coins in the cache have not been updated + const auto it{cache_map.find(outpoint)}; + if (it != cache_map.end()) { + const auto& cache_coin{it->second}; + const auto& coin{cache.AccessCoin(outpoint)}; + assert(coin.IsSpent() == cache_coin.IsSpent()); + assert(coin.fCoinBase == cache_coin.fCoinBase); + assert(coin.nHeight == cache_coin.nHeight); + assert(coin.out == cache_coin.out); + continue; + } + + if (!cache.HaveCoinInCache(outpoint)) { + continue; + } + + const auto& [maybe_coin, err] = pair; + assert(maybe_coin && !err); + + // Check any newly added coins in the cache are the same as the db + const auto& coin{cache.AccessCoin(outpoint)}; + assert(!coin.IsSpent()); + assert(coin.fCoinBase == (*maybe_coin).fCoinBase); + assert(coin.nHeight == (*maybe_coin).nHeight); + assert(coin.out == (*maybe_coin).out); + } + } +} diff --git a/src/test/inputfetcher_tests.cpp b/src/test/inputfetcher_tests.cpp new file mode 100644 index 00000000000..054e43049f5 --- /dev/null +++ b/src/test/inputfetcher_tests.cpp @@ -0,0 +1,191 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +BOOST_AUTO_TEST_SUITE(inputfetcher_tests) + +struct InputFetcherTest : BasicTestingSetup { +private: + std::unique_ptr m_fetcher{nullptr}; + std::unique_ptr m_block{nullptr}; + + CBlock CreateBlock(int32_t num_txs) + { + CBlock block; + CMutableTransaction coinbase; + coinbase.vin.emplace_back(); + block.vtx.push_back(MakeTransactionRef(coinbase)); + + Txid prevhash{Txid::FromUint256(uint256(1))}; + + for (auto i{1}; i < num_txs; ++i) { + CMutableTransaction tx; + const auto txid{m_rng.randbool() ? Txid::FromUint256(uint256(i)) : prevhash}; + tx.vin.emplace_back(COutPoint(txid, 0)); + prevhash = tx.GetHash(); + block.vtx.push_back(MakeTransactionRef(tx)); + } + + return block; + } + +public: + explicit InputFetcherTest(const ChainType chainType = ChainType::MAIN, + TestOpts opts = {}) + : BasicTestingSetup{chainType, opts} + { + SeedRandomForTest(SeedRand::ZEROS); + + const auto cores{GetNumCores()}; + const auto num_txs{m_rng.randrange(cores * 10)}; + m_block = std::make_unique(CreateBlock(num_txs)); + const auto batch_size{m_rng.randrange(m_block->vtx.size() * 2)}; + const auto worker_threads{m_rng.randrange(cores * 2) + 1}; + m_fetcher = std::make_unique(batch_size, worker_threads); + } + + InputFetcher& getFetcher() { return *m_fetcher; } + const CBlock& getBlock() { return *m_block; } +}; + +BOOST_FIXTURE_TEST_CASE(fetch_inputs, InputFetcherTest) +{ + const auto& block{getBlock()}; + for (auto i{0}; i < 3; ++i) { + CCoinsView dummy; + CCoinsViewCache db(&dummy); + + for (const auto& tx : block.vtx) { + for (const auto& in : tx->vin) { + auto outpoint{in.prevout}; + Coin coin{}; + coin.out.nValue = 1; + db.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin)); + } + } + + CCoinsViewCache cache(&db); + getFetcher().FetchInputs(cache, db, block); + + std::unordered_set txids{}; + txids.reserve(block.vtx.size() - 1); + + for (const auto& tx : block.vtx) { + if (tx->IsCoinBase()) { + BOOST_CHECK(!cache.HaveCoinInCache(tx->vin[0].prevout)); + } else { + for (const auto& in : tx->vin) { + const auto& outpoint{in.prevout}; + const auto have{cache.HaveCoinInCache(outpoint)}; + const auto should_have{!txids.contains(outpoint.hash)}; + BOOST_CHECK(should_have ? have : !have); + } + txids.emplace(tx->GetHash()); + } + } + } +} + +// Test for the case where a block spends coins that are spent in the cache, but +// the spentness has not been flushed to the db. So the input fetcher will fetch +// the coin from the db since HaveCoinInCache will return false for an existing +// but spent coin. However, the fetched coin will fail to be inserted into the +// cache because the emplace call in EmplaceCoinInternalDANGER will not insert +// the unspent coin due to the collision with the already spent coin in the map. +BOOST_FIXTURE_TEST_CASE(fetch_no_double_spend, InputFetcherTest) +{ + const auto& block{getBlock()}; + for (auto i{0}; i < 3; ++i) { + CCoinsView dummy; + CCoinsViewCache db(&dummy); + + for (const auto& tx : block.vtx) { + for (const auto& in : tx->vin) { + auto outpoint{in.prevout}; + Coin coin{}; + coin.out.nValue = 1; + db.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin)); + } + } + + CCoinsViewCache cache(&db); + + // Add all inputs as spent already in cache + for (const auto& tx : block.vtx) { + for (const auto& in : tx->vin) { + auto outpoint{in.prevout}; + Coin coin{}; + cache.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin)); + } + } + + getFetcher().FetchInputs(cache, db, block); + + // Coins are still spent, even though they exist unspent in the parent db + for (const auto& tx : block.vtx) { + for (const auto& in : tx->vin) { + BOOST_CHECK(!cache.HaveCoinInCache(in.prevout)); + } + } + } +} + +BOOST_FIXTURE_TEST_CASE(fetch_no_inputs, InputFetcherTest) +{ + const auto& block{getBlock()}; + for (auto i{0}; i < 3; ++i) { + CCoinsView db; + CCoinsViewCache cache(&db); + getFetcher().FetchInputs(cache, db, block); + + for (const auto& tx : block.vtx) { + for (const auto& in : tx->vin) { + BOOST_CHECK(!cache.HaveCoinInCache(in.prevout)); + } + } + } +} + +class ThrowCoinsView : public CCoinsView +{ + std::optional GetCoin(const COutPoint& outpoint) const override + { + throw std::runtime_error("database error"); + } +}; + +BOOST_FIXTURE_TEST_CASE(fetch_input_exceptions, InputFetcherTest) +{ + const auto& block{getBlock()}; + for (auto i{0}; i < 3; ++i) { + ThrowCoinsView db; + CCoinsViewCache cache(&db); + getFetcher().FetchInputs(cache, db, block); + + for (const auto& tx : block.vtx) { + for (const auto& in : tx->vin) { + BOOST_CHECK(!cache.HaveCoinInCache(in.prevout)); + } + } + } +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/validation.cpp b/src/validation.cpp index bf370d171a5..9354207299b 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -3196,6 +3196,8 @@ bool Chainstate::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew, LogDebug(BCLog::BENCH, " - Load block from disk: %.2fms\n", Ticks(time_2 - time_1)); { + m_chainman.GetInputFetcher().FetchInputs(CoinsTip(), CoinsDB(), blockConnecting); + CCoinsViewCache view(&CoinsTip()); bool rv = ConnectBlock(blockConnecting, state, pindexNew, view); if (m_chainman.m_options.signals) { @@ -6296,6 +6298,7 @@ static ChainstateManager::Options&& Flatten(ChainstateManager::Options&& opts) ChainstateManager::ChainstateManager(const util::SignalInterrupt& interrupt, Options options, node::BlockManager::Options blockman_options) : m_script_check_queue{/*batch_size=*/128, std::clamp(options.worker_threads_num, 0, MAX_SCRIPTCHECK_THREADS)}, + m_input_fetcher{/*batch_size=*/128, std::clamp(options.worker_threads_num, 0, MAX_SCRIPTCHECK_THREADS)}, m_interrupt{interrupt}, m_options{Flatten(std::move(options))}, m_blockman{interrupt, std::move(blockman_options)}, diff --git a/src/validation.h b/src/validation.h index f6cbee28fc5..edd387f0b52 100644 --- a/src/validation.h +++ b/src/validation.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -948,6 +949,7 @@ private: //! A queue for script verifications that have to be performed by worker threads. CCheckQueue m_script_check_queue; + InputFetcher m_input_fetcher; //! Timers and counters used for benchmarking validation in both background //! and active chainstates. @@ -1323,6 +1325,7 @@ public: void RecalculateBestHeader() EXCLUSIVE_LOCKS_REQUIRED(::cs_main); CCheckQueue& GetCheckQueue() { return m_script_check_queue; } + InputFetcher& GetInputFetcher() { return m_input_fetcher; } ~ChainstateManager(); };