mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-03-17 21:32:00 +01:00
Merge b2da76444613229e0bf5539596903ac3c1db3053 into db2c57ae9eebdb75c58cd165ac929919969c19a9
This commit is contained in:
commit
012ccb394c
@ -27,6 +27,7 @@ add_executable(bench_bitcoin
|
||||
gcs_filter.cpp
|
||||
hashpadding.cpp
|
||||
index_blockfilter.cpp
|
||||
inputfetcher.cpp
|
||||
load_external.cpp
|
||||
lockedpool.cpp
|
||||
logging.cpp
|
||||
|
57
src/bench/inputfetcher.cpp
Normal file
57
src/bench/inputfetcher.cpp
Normal file
@ -0,0 +1,57 @@
|
||||
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include <bench/bench.h>
|
||||
#include <bench/data/block413567.raw.h>
|
||||
#include <coins.h>
|
||||
#include <common/system.h>
|
||||
#include <inputfetcher.h>
|
||||
#include <primitives/block.h>
|
||||
#include <serialize.h>
|
||||
#include <streams.h>
|
||||
#include <util/time.h>
|
||||
|
||||
static constexpr auto QUEUE_BATCH_SIZE{128};
|
||||
static constexpr auto DELAY{2ms};
|
||||
|
||||
//! Simulates a DB by adding a delay when calling GetCoin
|
||||
class DelayedCoinsView : public CCoinsView
|
||||
{
|
||||
private:
|
||||
std::chrono::milliseconds m_delay;
|
||||
|
||||
public:
|
||||
DelayedCoinsView(std::chrono::milliseconds delay) : m_delay(delay) {}
|
||||
|
||||
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
|
||||
{
|
||||
UninterruptibleSleep(m_delay);
|
||||
return Coin{};
|
||||
}
|
||||
|
||||
bool BatchWrite(CoinsViewCacheCursor& cursor, const uint256 &hashBlock) override { return true; }
|
||||
};
|
||||
|
||||
static void InputFetcherBenchmark(benchmark::Bench& bench)
|
||||
{
|
||||
DataStream stream{benchmark::data::block413567};
|
||||
CBlock block;
|
||||
stream >> TX_WITH_WITNESS(block);
|
||||
|
||||
DelayedCoinsView db(DELAY);
|
||||
CCoinsViewCache cache(&db);
|
||||
|
||||
// The main thread should be counted to prevent thread oversubscription, and
|
||||
// to decrease the variance of benchmark results.
|
||||
const auto worker_threads_num{GetNumCores() - 1};
|
||||
InputFetcher fetcher{QUEUE_BATCH_SIZE, worker_threads_num};
|
||||
|
||||
bench.run([&] {
|
||||
const auto ok{cache.Flush()};
|
||||
assert(ok);
|
||||
fetcher.FetchInputs(cache, db, block);
|
||||
});
|
||||
}
|
||||
|
||||
BENCHMARK(InputFetcherBenchmark, benchmark::PriorityLevel::HIGH);
|
@ -110,10 +110,15 @@ void CCoinsViewCache::AddCoin(const COutPoint &outpoint, Coin&& coin, bool possi
|
||||
(bool)it->second.coin.IsCoinBase());
|
||||
}
|
||||
|
||||
void CCoinsViewCache::EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin) {
|
||||
cachedCoinsUsage += coin.DynamicMemoryUsage();
|
||||
void CCoinsViewCache::EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin, bool set_dirty) {
|
||||
const auto mem_usage{coin.DynamicMemoryUsage()};
|
||||
auto [it, inserted] = cacheCoins.try_emplace(std::move(outpoint), std::move(coin));
|
||||
if (inserted) CCoinsCacheEntry::SetDirty(*it, m_sentinel);
|
||||
if (inserted) {
|
||||
cachedCoinsUsage += mem_usage;
|
||||
if (set_dirty) {
|
||||
CCoinsCacheEntry::SetDirty(*it, m_sentinel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AddCoins(CCoinsViewCache& cache, const CTransaction &tx, int nHeight, bool check_for_overwrite) {
|
||||
|
@ -423,12 +423,13 @@ public:
|
||||
|
||||
/**
|
||||
* Emplace a coin into cacheCoins without performing any checks, marking
|
||||
* the emplaced coin as dirty.
|
||||
* the emplaced coin as dirty unless `set_dirty` is `false`.
|
||||
*
|
||||
* NOT FOR GENERAL USE. Used only when loading coins from a UTXO snapshot.
|
||||
* NOT FOR GENERAL USE. Used when loading coins from a UTXO snapshot, and
|
||||
* in the InputFetcher.
|
||||
* @sa ChainstateManager::PopulateAndValidateSnapshot()
|
||||
*/
|
||||
void EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin);
|
||||
void EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin, bool set_dirty = true);
|
||||
|
||||
/**
|
||||
* Spend a coin. Pass moveto in order to get the deleted data.
|
||||
|
246
src/inputfetcher.h
Normal file
246
src/inputfetcher.h
Normal file
@ -0,0 +1,246 @@
|
||||
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_INPUTFETCHER_H
|
||||
#define BITCOIN_INPUTFETCHER_H
|
||||
|
||||
#include <coins.h>
|
||||
#include <sync.h>
|
||||
#include <tinyformat.h>
|
||||
#include <txdb.h>
|
||||
#include <util/hasher.h>
|
||||
#include <util/threadnames.h>
|
||||
#include <util/transaction_identifier.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <stdexcept>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
/**
|
||||
* Input fetcher for fetching inputs from the CoinsDB and inserting
|
||||
* into the CoinsTip.
|
||||
*
|
||||
* The main thread loops through the block and writes all input prevouts to a
|
||||
* global vector. It then wakes all workers and starts working as well. Each
|
||||
* thread assigns itself a range of outpoints from the shared vector, and
|
||||
* fetches the coins from disk. The outpoint and coin pairs are written to a
|
||||
* thread local vector of pairs. Once all outpoints are fetched, the main thread
|
||||
* loops through all thread local vectors and writes the pairs to the cache.
|
||||
*/
|
||||
class InputFetcher
|
||||
{
|
||||
private:
|
||||
//! Mutex to protect the inner state
|
||||
Mutex m_mutex{};
|
||||
//! Worker threads block on this when out of work
|
||||
std::condition_variable m_worker_cv{};
|
||||
//! Main thread blocks on this when out of work
|
||||
std::condition_variable m_main_cv{};
|
||||
|
||||
/**
|
||||
* The outpoints to be fetched from disk.
|
||||
* This is written to on the main thread, then read from all worker
|
||||
* threads only after the main thread is done writing. Hence, it doesn't
|
||||
* need to be guarded by a lock.
|
||||
*/
|
||||
std::vector<COutPoint> m_outpoints{};
|
||||
/**
|
||||
* The index of the last outpoint that is being fetched. Workers assign
|
||||
* themselves a range of outpoints to fetch from m_outpoints. They will use
|
||||
* this index as the end of their range, and then set this index to the
|
||||
* beginning of the range they take for the next worker. Once it gets to
|
||||
* zero, all outpoints have been assigned and the next worker will wait.
|
||||
*/
|
||||
size_t m_last_outpoint_index GUARDED_BY(m_mutex){0};
|
||||
|
||||
//! The set of txids of the transactions in the current block being fetched.
|
||||
std::unordered_set<Txid, SaltedTxidHasher> m_txids{};
|
||||
//! The vector of thread local vectors of pairs to be written to the cache.
|
||||
std::vector<std::vector<std::pair<COutPoint, Coin>>> m_pairs{};
|
||||
|
||||
/**
|
||||
* Number of outpoint fetches that haven't completed yet.
|
||||
* This includes outpoints that have already been assigned, but are still in
|
||||
* the worker's own batches.
|
||||
*/
|
||||
int32_t m_in_flight_outpoints_count GUARDED_BY(m_mutex){0};
|
||||
//! The number of worker threads that are waiting on m_worker_cv
|
||||
int32_t m_idle_worker_count GUARDED_BY(m_mutex){0};
|
||||
//! The maximum number of outpoints to be assigned in one batch
|
||||
const int32_t m_batch_size;
|
||||
//! DB coins view to fetch from.
|
||||
const CCoinsView* m_db{nullptr};
|
||||
//! The cache to check if we already have this input.
|
||||
const CCoinsViewCache* m_cache{nullptr};
|
||||
|
||||
std::vector<std::thread> m_worker_threads;
|
||||
bool m_request_stop GUARDED_BY(m_mutex){false};
|
||||
|
||||
//! Internal function that does the fetching from disk.
|
||||
void Loop(int32_t index, bool is_main_thread = false) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||
{
|
||||
auto local_batch_size{0};
|
||||
auto end_index{0};
|
||||
auto& cond{is_main_thread ? m_main_cv : m_worker_cv};
|
||||
do {
|
||||
{
|
||||
WAIT_LOCK(m_mutex, lock);
|
||||
// first do the clean-up of the previous loop run (allowing us to do
|
||||
// it in the same critsect) local_batch_size will only be
|
||||
// truthy after first run.
|
||||
if (local_batch_size) {
|
||||
m_in_flight_outpoints_count -= local_batch_size;
|
||||
if (!is_main_thread && m_in_flight_outpoints_count == 0) {
|
||||
m_main_cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
// logically, the do loop starts here
|
||||
while (m_last_outpoint_index == 0) {
|
||||
if ((is_main_thread && m_in_flight_outpoints_count == 0) || m_request_stop) {
|
||||
return;
|
||||
}
|
||||
++m_idle_worker_count;
|
||||
cond.wait(lock);
|
||||
--m_idle_worker_count;
|
||||
}
|
||||
|
||||
// Assign a batch of outpoints to this thread
|
||||
local_batch_size = std::max(1, std::min(m_batch_size,
|
||||
static_cast<int32_t>(m_last_outpoint_index /
|
||||
(m_worker_threads.size() + 1 + m_idle_worker_count))));
|
||||
end_index = m_last_outpoint_index;
|
||||
m_last_outpoint_index -= local_batch_size;
|
||||
}
|
||||
|
||||
auto& local_pairs{m_pairs[index]};
|
||||
local_pairs.reserve(local_pairs.size() + local_batch_size);
|
||||
try {
|
||||
for (auto i{end_index - local_batch_size}; i < end_index; ++i) {
|
||||
const auto& outpoint{m_outpoints[i]};
|
||||
// If an input spends an outpoint from earlier in the
|
||||
// block, it won't be in the cache yet but it also won't be
|
||||
// in the db either.
|
||||
if (m_txids.contains(outpoint.hash)) {
|
||||
continue;
|
||||
}
|
||||
if (m_cache->HaveCoinInCache(outpoint)) {
|
||||
continue;
|
||||
}
|
||||
if (auto coin{m_db->GetCoin(outpoint)}; coin) {
|
||||
local_pairs.emplace_back(outpoint, std::move(*coin));
|
||||
} else {
|
||||
// Missing an input. This block will fail validation.
|
||||
// Skip remaining outpoints and continue so main thread
|
||||
// can proceed.
|
||||
LOCK(m_mutex);
|
||||
m_in_flight_outpoints_count -= m_last_outpoint_index;
|
||||
m_last_outpoint_index = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (const std::runtime_error&) {
|
||||
// Database error. This will be handled later in validation.
|
||||
// Skip remaining outpoints and continue so main thread
|
||||
// can proceed.
|
||||
LOCK(m_mutex);
|
||||
m_in_flight_outpoints_count -= m_last_outpoint_index;
|
||||
m_last_outpoint_index = 0;
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
//! Create a new input fetcher
|
||||
explicit InputFetcher(int32_t batch_size, int32_t worker_thread_count) noexcept
|
||||
: m_batch_size(batch_size)
|
||||
{
|
||||
if (worker_thread_count < 1) {
|
||||
// Don't do anything if there are no worker threads.
|
||||
return;
|
||||
}
|
||||
m_pairs.reserve(worker_thread_count + 1);
|
||||
for (auto n{0}; n < worker_thread_count + 1; ++n) {
|
||||
m_pairs.emplace_back();
|
||||
}
|
||||
m_worker_threads.reserve(worker_thread_count);
|
||||
for (auto n{0}; n < worker_thread_count; ++n) {
|
||||
m_worker_threads.emplace_back([this, n]() {
|
||||
util::ThreadRename(strprintf("inputfetch.%i", n));
|
||||
Loop(n);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Since this class manages its own resources, which is a thread
|
||||
// pool `m_worker_threads`, copy and move operations are not appropriate.
|
||||
InputFetcher(const InputFetcher&) = delete;
|
||||
InputFetcher& operator=(const InputFetcher&) = delete;
|
||||
InputFetcher(InputFetcher&&) = delete;
|
||||
InputFetcher& operator=(InputFetcher&&) = delete;
|
||||
|
||||
//! Fetch all block inputs from db, and insert into cache.
|
||||
void FetchInputs(CCoinsViewCache& cache,
|
||||
const CCoinsView& db,
|
||||
const CBlock& block) noexcept
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||
{
|
||||
if (m_worker_threads.empty() || block.vtx.size() <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Set the db and cache to use for this block.
|
||||
m_db = &db;
|
||||
m_cache = &cache;
|
||||
|
||||
// Loop through the inputs of the block and add them to the queue
|
||||
m_txids.reserve(block.vtx.size() - 1);
|
||||
for (const auto& tx : block.vtx) {
|
||||
if (tx->IsCoinBase()) {
|
||||
continue;
|
||||
}
|
||||
m_outpoints.reserve(m_outpoints.size() + tx->vin.size());
|
||||
for (const auto& in : tx->vin) {
|
||||
m_outpoints.emplace_back(in.prevout);
|
||||
}
|
||||
m_txids.emplace(tx->GetHash());
|
||||
}
|
||||
{
|
||||
LOCK(m_mutex);
|
||||
m_in_flight_outpoints_count = m_outpoints.size();
|
||||
m_last_outpoint_index = m_outpoints.size();
|
||||
}
|
||||
m_worker_cv.notify_all();
|
||||
|
||||
// Have the main thread work too while we wait for other threads
|
||||
Loop(m_worker_threads.size(), /*is_main_thread=*/true);
|
||||
|
||||
// At this point all threads are done writing to m_pairs, so we can
|
||||
// safely read from it and insert the fetched coins into the cache.
|
||||
for (auto& local_pairs : m_pairs) {
|
||||
for (auto&& [outpoint, coin] : local_pairs) {
|
||||
cache.EmplaceCoinInternalDANGER(std::move(outpoint),
|
||||
std::move(coin),
|
||||
/*set_dirty=*/false);
|
||||
}
|
||||
local_pairs.clear();
|
||||
}
|
||||
m_txids.clear();
|
||||
m_outpoints.clear();
|
||||
}
|
||||
|
||||
~InputFetcher()
|
||||
{
|
||||
WITH_LOCK(m_mutex, m_request_stop = true);
|
||||
m_worker_cv.notify_all();
|
||||
for (std::thread& t : m_worker_threads) {
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
#endif // BITCOIN_INPUTFETCHER_H
|
@ -47,6 +47,7 @@ add_executable(test_bitcoin
|
||||
headers_sync_chainwork_tests.cpp
|
||||
httpserver_tests.cpp
|
||||
i2p_tests.cpp
|
||||
inputfetcher_tests.cpp
|
||||
interfaces_tests.cpp
|
||||
key_io_tests.cpp
|
||||
key_tests.cpp
|
||||
|
@ -53,6 +53,7 @@ add_executable(fuzz
|
||||
hex.cpp
|
||||
http_request.cpp
|
||||
i2p.cpp
|
||||
inputfetcher.cpp
|
||||
integer.cpp
|
||||
key.cpp
|
||||
key_io.cpp
|
||||
|
151
src/test/fuzz/inputfetcher.cpp
Normal file
151
src/test/fuzz/inputfetcher.cpp
Normal file
@ -0,0 +1,151 @@
|
||||
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include <consensus/amount.h>
|
||||
#include <inputfetcher.h>
|
||||
#include <test/fuzz/FuzzedDataProvider.h>
|
||||
#include <test/fuzz/fuzz.h>
|
||||
#include <test/fuzz/util.h>
|
||||
#include <util/transaction_identifier.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <utility>
|
||||
|
||||
using DbMap = std::map<const COutPoint, std::pair<std::optional<const Coin>, bool>>;
|
||||
|
||||
class DbCoinsView : public CCoinsView
|
||||
{
|
||||
private:
|
||||
DbMap& m_map;
|
||||
|
||||
public:
|
||||
DbCoinsView(DbMap& map) : m_map(map) {}
|
||||
|
||||
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
|
||||
{
|
||||
const auto it{m_map.find(outpoint)};
|
||||
assert(it != m_map.end());
|
||||
const auto [coin, err] = it->second;
|
||||
if (err) {
|
||||
throw std::runtime_error("database error");
|
||||
}
|
||||
return coin;
|
||||
}
|
||||
};
|
||||
|
||||
class NoAccessCoinsView : public CCoinsView
|
||||
{
|
||||
public:
|
||||
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
|
||||
{
|
||||
abort();
|
||||
}
|
||||
};
|
||||
|
||||
FUZZ_TARGET(inputfetcher)
|
||||
{
|
||||
FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());
|
||||
|
||||
const auto batch_size{
|
||||
fuzzed_data_provider.ConsumeIntegralInRange<int32_t>(0, 1024)};
|
||||
const auto worker_threads{
|
||||
fuzzed_data_provider.ConsumeIntegralInRange<int32_t>(2, 4)};
|
||||
InputFetcher fetcher{batch_size, worker_threads};
|
||||
|
||||
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) {
|
||||
CBlock block;
|
||||
Txid prevhash{Txid::FromUint256(ConsumeUInt256(fuzzed_data_provider))};
|
||||
|
||||
DbMap db_map{};
|
||||
std::map<const COutPoint, const Coin> cache_map{};
|
||||
|
||||
DbCoinsView db(db_map);
|
||||
|
||||
NoAccessCoinsView back;
|
||||
CCoinsViewCache cache(&back);
|
||||
|
||||
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), static_cast<uint32_t>(batch_size * worker_threads * 2)) {
|
||||
CMutableTransaction tx;
|
||||
|
||||
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10) {
|
||||
const auto txid{fuzzed_data_provider.ConsumeBool()
|
||||
? Txid::FromUint256(ConsumeUInt256(fuzzed_data_provider))
|
||||
: prevhash};
|
||||
const auto index{fuzzed_data_provider.ConsumeIntegral<uint32_t>()};
|
||||
const COutPoint outpoint(txid, index);
|
||||
|
||||
tx.vin.emplace_back(outpoint);
|
||||
|
||||
std::optional<Coin> maybe_coin;
|
||||
if (fuzzed_data_provider.ConsumeBool()) {
|
||||
Coin coin{};
|
||||
coin.fCoinBase = fuzzed_data_provider.ConsumeBool();
|
||||
coin.nHeight =
|
||||
fuzzed_data_provider.ConsumeIntegralInRange<int32_t>(
|
||||
0, std::numeric_limits<int32_t>::max());
|
||||
coin.out.nValue = ConsumeMoney(fuzzed_data_provider);
|
||||
maybe_coin = coin;
|
||||
} else {
|
||||
maybe_coin = std::nullopt;
|
||||
}
|
||||
db_map.try_emplace(outpoint, std::make_pair(
|
||||
maybe_coin,
|
||||
fuzzed_data_provider.ConsumeBool()));
|
||||
|
||||
// Add the coin to the cache
|
||||
if (fuzzed_data_provider.ConsumeBool()) {
|
||||
Coin coin{};
|
||||
coin.fCoinBase = fuzzed_data_provider.ConsumeBool();
|
||||
coin.nHeight =
|
||||
fuzzed_data_provider.ConsumeIntegralInRange<int32_t>(
|
||||
0, std::numeric_limits<int32_t>::max());
|
||||
coin.out.nValue =
|
||||
fuzzed_data_provider.ConsumeIntegralInRange<int64_t>(
|
||||
-1, MAX_MONEY);
|
||||
cache_map.try_emplace(outpoint, coin);
|
||||
cache.EmplaceCoinInternalDANGER(
|
||||
COutPoint(outpoint),
|
||||
std::move(coin),
|
||||
/*set_dirty=*/fuzzed_data_provider.ConsumeBool());
|
||||
}
|
||||
}
|
||||
|
||||
prevhash = tx.GetHash();
|
||||
block.vtx.push_back(MakeTransactionRef(tx));
|
||||
}
|
||||
|
||||
fetcher.FetchInputs(cache, db, block);
|
||||
|
||||
for (const auto& [outpoint, pair] : db_map) {
|
||||
// Check pre-existing coins in the cache have not been updated
|
||||
const auto it{cache_map.find(outpoint)};
|
||||
if (it != cache_map.end()) {
|
||||
const auto& cache_coin{it->second};
|
||||
const auto& coin{cache.AccessCoin(outpoint)};
|
||||
assert(coin.IsSpent() == cache_coin.IsSpent());
|
||||
assert(coin.fCoinBase == cache_coin.fCoinBase);
|
||||
assert(coin.nHeight == cache_coin.nHeight);
|
||||
assert(coin.out == cache_coin.out);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!cache.HaveCoinInCache(outpoint)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto& [maybe_coin, err] = pair;
|
||||
assert(maybe_coin && !err);
|
||||
|
||||
// Check any newly added coins in the cache are the same as the db
|
||||
const auto& coin{cache.AccessCoin(outpoint)};
|
||||
assert(!coin.IsSpent());
|
||||
assert(coin.fCoinBase == (*maybe_coin).fCoinBase);
|
||||
assert(coin.nHeight == (*maybe_coin).nHeight);
|
||||
assert(coin.out == (*maybe_coin).out);
|
||||
}
|
||||
}
|
||||
}
|
191
src/test/inputfetcher_tests.cpp
Normal file
191
src/test/inputfetcher_tests.cpp
Normal file
@ -0,0 +1,191 @@
|
||||
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include <coins.h>
|
||||
#include <common/system.h>
|
||||
#include <inputfetcher.h>
|
||||
#include <primitives/block.h>
|
||||
#include <primitives/transaction.h>
|
||||
#include <test/util/random.h>
|
||||
#include <test/util/setup_common.h>
|
||||
#include <uint256.h>
|
||||
#include <util/transaction_identifier.h>
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(inputfetcher_tests)
|
||||
|
||||
struct InputFetcherTest : BasicTestingSetup {
|
||||
private:
|
||||
std::unique_ptr<InputFetcher> m_fetcher{nullptr};
|
||||
std::unique_ptr<CBlock> m_block{nullptr};
|
||||
|
||||
CBlock CreateBlock(int32_t num_txs)
|
||||
{
|
||||
CBlock block;
|
||||
CMutableTransaction coinbase;
|
||||
coinbase.vin.emplace_back();
|
||||
block.vtx.push_back(MakeTransactionRef(coinbase));
|
||||
|
||||
Txid prevhash{Txid::FromUint256(uint256(1))};
|
||||
|
||||
for (auto i{1}; i < num_txs; ++i) {
|
||||
CMutableTransaction tx;
|
||||
const auto txid{m_rng.randbool() ? Txid::FromUint256(uint256(i)) : prevhash};
|
||||
tx.vin.emplace_back(COutPoint(txid, 0));
|
||||
prevhash = tx.GetHash();
|
||||
block.vtx.push_back(MakeTransactionRef(tx));
|
||||
}
|
||||
|
||||
return block;
|
||||
}
|
||||
|
||||
public:
|
||||
explicit InputFetcherTest(const ChainType chainType = ChainType::MAIN,
|
||||
TestOpts opts = {})
|
||||
: BasicTestingSetup{chainType, opts}
|
||||
{
|
||||
SeedRandomForTest(SeedRand::ZEROS);
|
||||
|
||||
const auto cores{GetNumCores()};
|
||||
const auto num_txs{m_rng.randrange(cores * 10)};
|
||||
m_block = std::make_unique<CBlock>(CreateBlock(num_txs));
|
||||
const auto batch_size{m_rng.randrange<int32_t>(m_block->vtx.size() * 2)};
|
||||
const auto worker_threads{m_rng.randrange(cores * 2) + 1};
|
||||
m_fetcher = std::make_unique<InputFetcher>(batch_size, worker_threads);
|
||||
}
|
||||
|
||||
InputFetcher& getFetcher() { return *m_fetcher; }
|
||||
const CBlock& getBlock() { return *m_block; }
|
||||
};
|
||||
|
||||
BOOST_FIXTURE_TEST_CASE(fetch_inputs, InputFetcherTest)
|
||||
{
|
||||
const auto& block{getBlock()};
|
||||
for (auto i{0}; i < 3; ++i) {
|
||||
CCoinsView dummy;
|
||||
CCoinsViewCache db(&dummy);
|
||||
|
||||
for (const auto& tx : block.vtx) {
|
||||
for (const auto& in : tx->vin) {
|
||||
auto outpoint{in.prevout};
|
||||
Coin coin{};
|
||||
coin.out.nValue = 1;
|
||||
db.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin));
|
||||
}
|
||||
}
|
||||
|
||||
CCoinsViewCache cache(&db);
|
||||
getFetcher().FetchInputs(cache, db, block);
|
||||
|
||||
std::unordered_set<Txid, SaltedTxidHasher> txids{};
|
||||
txids.reserve(block.vtx.size() - 1);
|
||||
|
||||
for (const auto& tx : block.vtx) {
|
||||
if (tx->IsCoinBase()) {
|
||||
BOOST_CHECK(!cache.HaveCoinInCache(tx->vin[0].prevout));
|
||||
} else {
|
||||
for (const auto& in : tx->vin) {
|
||||
const auto& outpoint{in.prevout};
|
||||
const auto have{cache.HaveCoinInCache(outpoint)};
|
||||
const auto should_have{!txids.contains(outpoint.hash)};
|
||||
BOOST_CHECK(should_have ? have : !have);
|
||||
}
|
||||
txids.emplace(tx->GetHash());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test for the case where a block spends coins that are spent in the cache, but
|
||||
// the spentness has not been flushed to the db. So the input fetcher will fetch
|
||||
// the coin from the db since HaveCoinInCache will return false for an existing
|
||||
// but spent coin. However, the fetched coin will fail to be inserted into the
|
||||
// cache because the emplace call in EmplaceCoinInternalDANGER will not insert
|
||||
// the unspent coin due to the collision with the already spent coin in the map.
|
||||
BOOST_FIXTURE_TEST_CASE(fetch_no_double_spend, InputFetcherTest)
|
||||
{
|
||||
const auto& block{getBlock()};
|
||||
for (auto i{0}; i < 3; ++i) {
|
||||
CCoinsView dummy;
|
||||
CCoinsViewCache db(&dummy);
|
||||
|
||||
for (const auto& tx : block.vtx) {
|
||||
for (const auto& in : tx->vin) {
|
||||
auto outpoint{in.prevout};
|
||||
Coin coin{};
|
||||
coin.out.nValue = 1;
|
||||
db.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin));
|
||||
}
|
||||
}
|
||||
|
||||
CCoinsViewCache cache(&db);
|
||||
|
||||
// Add all inputs as spent already in cache
|
||||
for (const auto& tx : block.vtx) {
|
||||
for (const auto& in : tx->vin) {
|
||||
auto outpoint{in.prevout};
|
||||
Coin coin{};
|
||||
cache.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin));
|
||||
}
|
||||
}
|
||||
|
||||
getFetcher().FetchInputs(cache, db, block);
|
||||
|
||||
// Coins are still spent, even though they exist unspent in the parent db
|
||||
for (const auto& tx : block.vtx) {
|
||||
for (const auto& in : tx->vin) {
|
||||
BOOST_CHECK(!cache.HaveCoinInCache(in.prevout));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_FIXTURE_TEST_CASE(fetch_no_inputs, InputFetcherTest)
|
||||
{
|
||||
const auto& block{getBlock()};
|
||||
for (auto i{0}; i < 3; ++i) {
|
||||
CCoinsView db;
|
||||
CCoinsViewCache cache(&db);
|
||||
getFetcher().FetchInputs(cache, db, block);
|
||||
|
||||
for (const auto& tx : block.vtx) {
|
||||
for (const auto& in : tx->vin) {
|
||||
BOOST_CHECK(!cache.HaveCoinInCache(in.prevout));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ThrowCoinsView : public CCoinsView
|
||||
{
|
||||
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
|
||||
{
|
||||
throw std::runtime_error("database error");
|
||||
}
|
||||
};
|
||||
|
||||
BOOST_FIXTURE_TEST_CASE(fetch_input_exceptions, InputFetcherTest)
|
||||
{
|
||||
const auto& block{getBlock()};
|
||||
for (auto i{0}; i < 3; ++i) {
|
||||
ThrowCoinsView db;
|
||||
CCoinsViewCache cache(&db);
|
||||
getFetcher().FetchInputs(cache, db, block);
|
||||
|
||||
for (const auto& tx : block.vtx) {
|
||||
for (const auto& in : tx->vin) {
|
||||
BOOST_CHECK(!cache.HaveCoinInCache(in.prevout));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
@ -3196,6 +3196,8 @@ bool Chainstate::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew,
|
||||
LogDebug(BCLog::BENCH, " - Load block from disk: %.2fms\n",
|
||||
Ticks<MillisecondsDouble>(time_2 - time_1));
|
||||
{
|
||||
m_chainman.GetInputFetcher().FetchInputs(CoinsTip(), CoinsDB(), blockConnecting);
|
||||
|
||||
CCoinsViewCache view(&CoinsTip());
|
||||
bool rv = ConnectBlock(blockConnecting, state, pindexNew, view);
|
||||
if (m_chainman.m_options.signals) {
|
||||
@ -6296,6 +6298,7 @@ static ChainstateManager::Options&& Flatten(ChainstateManager::Options&& opts)
|
||||
|
||||
ChainstateManager::ChainstateManager(const util::SignalInterrupt& interrupt, Options options, node::BlockManager::Options blockman_options)
|
||||
: m_script_check_queue{/*batch_size=*/128, std::clamp(options.worker_threads_num, 0, MAX_SCRIPTCHECK_THREADS)},
|
||||
m_input_fetcher{/*batch_size=*/128, std::clamp(options.worker_threads_num, 0, MAX_SCRIPTCHECK_THREADS)},
|
||||
m_interrupt{interrupt},
|
||||
m_options{Flatten(std::move(options))},
|
||||
m_blockman{interrupt, std::move(blockman_options)},
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <consensus/amount.h>
|
||||
#include <cuckoocache.h>
|
||||
#include <deploymentstatus.h>
|
||||
#include <inputfetcher.h>
|
||||
#include <kernel/chain.h>
|
||||
#include <kernel/chainparams.h>
|
||||
#include <kernel/chainstatemanager_opts.h>
|
||||
@ -948,6 +949,7 @@ private:
|
||||
|
||||
//! A queue for script verifications that have to be performed by worker threads.
|
||||
CCheckQueue<CScriptCheck> m_script_check_queue;
|
||||
InputFetcher m_input_fetcher;
|
||||
|
||||
//! Timers and counters used for benchmarking validation in both background
|
||||
//! and active chainstates.
|
||||
@ -1323,6 +1325,7 @@ public:
|
||||
void RecalculateBestHeader() EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
|
||||
|
||||
CCheckQueue<CScriptCheck>& GetCheckQueue() { return m_script_check_queue; }
|
||||
InputFetcher& GetInputFetcher() { return m_input_fetcher; }
|
||||
|
||||
~ChainstateManager();
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user