Merge bitcoin/bitcoin#35455: fuzz: improve dbwrapper_concurrent_reads performance

1ce9e26239 fuzz: improve dbwrapper_concurrent_reads performance (Andrew Toth)

Pull request description:

  The recently merged fuzz harness targeting concurrent reads suffers from poor performance and memory leaks (https://github.com/bitcoin/bitcoin/pull/34866#issuecomment-4614323925).

  Fix this by
  - using a global thread pool instead of a local one per iteration
  - reduce thread count to 8 from 16
  - use a std::map oracle to check results inline instead of reading from the db to get a baseline and storing results

ACKs for top commit:
  marcofleon:
    reACK 1ce9e26239
  l0rinc:
    ACK 1ce9e26239
  sedited:
    ACK 1ce9e26239

Tree-SHA512: 2e532caf246f389105e4a9b487496386d1fe9add7b27fba9ecbbf51a432ef493765ad7095288dd7e0a896860ff150d89ecb6afb8baf311a4af94d8e01b77dba5
This commit is contained in:
merge-script
2026-06-11 09:47:12 +02:00

View File

@@ -167,7 +167,16 @@ void VerifyIterator(CDBWrapper& dbw, const Oracle& oracle,
}
/** Maximum number of concurrent reader threads in dbwrapper_concurrent_reads. */
constexpr size_t MAX_READ_WORKERS{16};
constexpr size_t MAX_READ_WORKERS{8};
ThreadPool g_read_pool{"dbfuzz"};
Mutex g_read_pool_mutex;
void StartReadPoolIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(!g_read_pool_mutex)
{
LOCK(g_read_pool_mutex);
if (!g_read_pool.WorkersCount()) g_read_pool.Start(MAX_READ_WORKERS);
}
/** Build randomized DBParams from the fuzz input, shared by all targets. */
DBParams ConsumeDBParams(FuzzedDataProvider& provider, leveldb::Env* testing_env,
@@ -185,39 +194,6 @@ DBParams ConsumeDBParams(FuzzedDataProvider& provider, leveldb::Env* testing_env
};
}
/** A single read-only operation run concurrently in dbwrapper_concurrent_reads. */
enum class ReadOp { Read, Exists, IteratorSeek };
using ReadQuery = std::tuple<ReadOp, uint16_t>;
using Results = std::vector<std::optional<std::string>>;
Results RunReadQueries(CDBWrapper& db, const std::vector<ReadQuery>& queries, FastRandomContext& rng)
{
std::vector<size_t> order(queries.size());
std::iota(order.begin(), order.end(), size_t{0});
std::shuffle(order.begin(), order.end(), rng);
Results results(queries.size());
for (const auto i : order) {
const auto& [op, key] = queries[i];
std::string v;
switch (op) {
case ReadOp::Read:
if (db.Read(key, v)) results[i] = std::move(v);
break;
case ReadOp::Exists:
if (db.Exists(key)) results[i] = std::move(v);
break;
case ReadOp::IteratorSeek: {
const std::unique_ptr<CDBIterator> it{db.NewIterator()};
it->Seek(key);
if (it->Valid() && it->GetValue(v)) results[i] = std::move(v);
break;
}
}
}
return results;
}
template <typename DrainWorkFn, typename RunOneFn>
void TestDbWrapper(FuzzedDataProvider& provider,
leveldb::Env* testing_env,
@@ -381,8 +357,9 @@ FUZZ_TARGET(dbwrapper_threaded, .init = [] { static auto setup{MakeNoLogFileCont
/*allow_force_compact=*/true);
}
FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLogFileContext<>()}; })
FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLogFileContext<>()}; }) EXCLUSIVE_LOCKS_REQUIRED(!g_read_pool_mutex)
{
StartReadPoolIfNeeded();
SeedRandomStateForTest(SeedRand::ZEROS);
FuzzedDataProvider provider{buffer.data(), buffer.size()};
@@ -394,17 +371,20 @@ FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLog
// Seed the DB. Drain work after small batches so we don't deadlock on a
// scheduled compaction.
const size_t num_entries{provider.ConsumeIntegralInRange<size_t>(100, 5'000)};
const size_t num_entries{provider.ConsumeIntegralInRange<size_t>(100, 3'000)};
std::vector<uint16_t> keys;
keys.reserve(num_entries);
Oracle oracle;
constexpr size_t SEED_BATCH_SIZE{400};
for (size_t start{0}; start < num_entries; start += SEED_BATCH_SIZE) {
CDBBatch batch{db};
const size_t end{std::min(start + SEED_BATCH_SIZE, num_entries)};
for (size_t i{start}; i < end; ++i) {
const auto k{ConsumeKey(provider)};
batch.Write(k, MakeValue(k, ConsumeValueSize(provider)));
const auto size{ConsumeValueSize(provider)};
batch.Write(k, MakeValue(k, size));
keys.push_back(k);
oracle[k] = size;
}
det_env.DrainWork();
db.WriteBatch(batch, /*fSync=*/true);
@@ -414,40 +394,71 @@ FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLog
// Build query list from seeded and random keys.
const size_t num_queries{provider.ConsumeIntegralInRange<size_t>(1, 2'000)};
std::vector<ReadQuery> queries;
enum class ReadOp { Read, Exists, IteratorSeek };
std::vector<std::tuple<ReadOp, uint16_t>> queries;
queries.reserve(num_queries);
for (size_t i{0}; i < num_queries; ++i) {
const auto op{static_cast<ReadOp>(provider.ConsumeIntegralInRange<int>(0, 2))};
const auto op{provider.PickValueInArray({ReadOp::Read, ReadOp::Exists, ReadOp::IteratorSeek})};
const uint16_t key{provider.ConsumeBool()
? keys[provider.ConsumeIntegralInRange<size_t>(0, keys.size() - 1)]
: ConsumeKey(provider)};
queries.emplace_back(op, key);
}
// Baseline read on a single thread
FastRandomContext rng{ConsumeUInt256(provider)};
const Results baseline{RunReadQueries(db, queries, rng)};
ThreadPool pool{"dbfuzz"};
pool.Start(MAX_READ_WORKERS);
// Workers + main thread synchronize on the latch so all reads start together.
std::latch start_latch{static_cast<ptrdiff_t>(MAX_READ_WORKERS + 1)};
std::vector<std::function<Results()>> tasks(MAX_READ_WORKERS);
std::generate(tasks.begin(), tasks.end(), [&] {
return [&, seed = rng.rand256()]() -> Results {
std::vector<std::function<void()>> tasks(MAX_READ_WORKERS);
FastRandomContext rng{ConsumeUInt256(provider)};
std::ranges::generate(tasks, [&] {
return [&, seed = rng.rand256()] {
FastRandomContext thread_rng{seed};
std::vector<size_t> order(queries.size());
std::iota(order.begin(), order.end(), size_t{0});
std::ranges::shuffle(order, thread_rng);
std::vector<uint8_t> v;
std::string key_str;
start_latch.arrive_and_wait();
return RunReadQueries(db, queries, thread_rng);
const std::unique_ptr<CDBIterator> it{db.NewIterator()};
// Every read must agree with the oracle, the source of truth.
for (const auto i : order) {
const auto& [op, key] = queries[i];
switch (op) {
case ReadOp::Read:
if (const auto oit{oracle.find(key)}; oit != oracle.end()) {
assert(db.Read(key, v) && v == MakeValue(key, oit->second));
} else {
assert(!db.Read(key, v));
}
break;
case ReadOp::Exists:
assert(db.Exists(key) == oracle.contains(key));
break;
case ReadOp::IteratorSeek:
it->Seek(key);
// Skip the obfuscation metadata entry (a non-uint16_t key) if we land
// on it, so the result matches the oracle, which only tracks user keys.
if (it->Valid() && it->GetKey(key_str) && key_str == OBFUSCATION_KEY) it->Next();
if (const auto oit{oracle.lower_bound(key)}; oit != oracle.end()) {
assert(it->Valid());
uint16_t actual_key;
assert(it->GetKey(actual_key) && actual_key == oit->first);
assert(it->GetValue(v) && v == MakeValue(actual_key, oit->second));
} else {
assert(!it->Valid());
}
break;
}
}
};
});
auto futures{*Assert(pool.Submit(std::move(tasks)))};
auto futures{*Assert(g_read_pool.Submit(std::move(tasks)))};
// Release the workers and immediately run the queued compaction on this
// thread, so compaction races against the concurrent reads.
start_latch.arrive_and_wait();
det_env.DrainWork();
for (auto& fut : futures) assert(fut.get() == baseline);
for (auto& fut : futures) fut.get();
det_env.DrainWork();
}