diff --git a/src/test/fuzz/dbwrapper.cpp b/src/test/fuzz/dbwrapper.cpp index 784b696d816..1b5cd7b025d 100644 --- a/src/test/fuzz/dbwrapper.cpp +++ b/src/test/fuzz/dbwrapper.cpp @@ -167,7 +167,16 @@ void VerifyIterator(CDBWrapper& dbw, const Oracle& oracle, } /** Maximum number of concurrent reader threads in dbwrapper_concurrent_reads. */ -constexpr size_t MAX_READ_WORKERS{16}; +constexpr size_t MAX_READ_WORKERS{8}; + +ThreadPool g_read_pool{"dbfuzz"}; +Mutex g_read_pool_mutex; + +void StartReadPoolIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(!g_read_pool_mutex) +{ + LOCK(g_read_pool_mutex); + if (!g_read_pool.WorkersCount()) g_read_pool.Start(MAX_READ_WORKERS); +} /** Build randomized DBParams from the fuzz input, shared by all targets. */ DBParams ConsumeDBParams(FuzzedDataProvider& provider, leveldb::Env* testing_env, @@ -185,39 +194,6 @@ DBParams ConsumeDBParams(FuzzedDataProvider& provider, leveldb::Env* testing_env }; } -/** A single read-only operation run concurrently in dbwrapper_concurrent_reads. */ -enum class ReadOp { Read, Exists, IteratorSeek }; -using ReadQuery = std::tuple; -using Results = std::vector>; - -Results RunReadQueries(CDBWrapper& db, const std::vector& queries, FastRandomContext& rng) -{ - std::vector order(queries.size()); - std::iota(order.begin(), order.end(), size_t{0}); - std::shuffle(order.begin(), order.end(), rng); - - Results results(queries.size()); - for (const auto i : order) { - const auto& [op, key] = queries[i]; - std::string v; - switch (op) { - case ReadOp::Read: - if (db.Read(key, v)) results[i] = std::move(v); - break; - case ReadOp::Exists: - if (db.Exists(key)) results[i] = std::move(v); - break; - case ReadOp::IteratorSeek: { - const std::unique_ptr it{db.NewIterator()}; - it->Seek(key); - if (it->Valid() && it->GetValue(v)) results[i] = std::move(v); - break; - } - } - } - return results; -} - template void TestDbWrapper(FuzzedDataProvider& provider, leveldb::Env* testing_env, @@ -381,8 +357,9 @@ FUZZ_TARGET(dbwrapper_threaded, .init = [] { static auto setup{MakeNoLogFileCont /*allow_force_compact=*/true); } -FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLogFileContext<>()}; }) +FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLogFileContext<>()}; }) EXCLUSIVE_LOCKS_REQUIRED(!g_read_pool_mutex) { + StartReadPoolIfNeeded(); SeedRandomStateForTest(SeedRand::ZEROS); FuzzedDataProvider provider{buffer.data(), buffer.size()}; @@ -394,17 +371,20 @@ FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLog // Seed the DB. Drain work after small batches so we don't deadlock on a // scheduled compaction. - const size_t num_entries{provider.ConsumeIntegralInRange(100, 5'000)}; + const size_t num_entries{provider.ConsumeIntegralInRange(100, 3'000)}; std::vector keys; keys.reserve(num_entries); + Oracle oracle; constexpr size_t SEED_BATCH_SIZE{400}; for (size_t start{0}; start < num_entries; start += SEED_BATCH_SIZE) { CDBBatch batch{db}; const size_t end{std::min(start + SEED_BATCH_SIZE, num_entries)}; for (size_t i{start}; i < end; ++i) { const auto k{ConsumeKey(provider)}; - batch.Write(k, MakeValue(k, ConsumeValueSize(provider))); + const auto size{ConsumeValueSize(provider)}; + batch.Write(k, MakeValue(k, size)); keys.push_back(k); + oracle[k] = size; } det_env.DrainWork(); db.WriteBatch(batch, /*fSync=*/true); @@ -414,40 +394,71 @@ FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLog // Build query list from seeded and random keys. const size_t num_queries{provider.ConsumeIntegralInRange(1, 2'000)}; - std::vector queries; + enum class ReadOp { Read, Exists, IteratorSeek }; + std::vector> queries; queries.reserve(num_queries); for (size_t i{0}; i < num_queries; ++i) { - const auto op{static_cast(provider.ConsumeIntegralInRange(0, 2))}; + const auto op{provider.PickValueInArray({ReadOp::Read, ReadOp::Exists, ReadOp::IteratorSeek})}; const uint16_t key{provider.ConsumeBool() ? keys[provider.ConsumeIntegralInRange(0, keys.size() - 1)] : ConsumeKey(provider)}; queries.emplace_back(op, key); } - // Baseline read on a single thread - FastRandomContext rng{ConsumeUInt256(provider)}; - const Results baseline{RunReadQueries(db, queries, rng)}; - - ThreadPool pool{"dbfuzz"}; - pool.Start(MAX_READ_WORKERS); // Workers + main thread synchronize on the latch so all reads start together. std::latch start_latch{static_cast(MAX_READ_WORKERS + 1)}; - std::vector> tasks(MAX_READ_WORKERS); - std::generate(tasks.begin(), tasks.end(), [&] { - return [&, seed = rng.rand256()]() -> Results { + std::vector> tasks(MAX_READ_WORKERS); + FastRandomContext rng{ConsumeUInt256(provider)}; + std::ranges::generate(tasks, [&] { + return [&, seed = rng.rand256()] { FastRandomContext thread_rng{seed}; + std::vector order(queries.size()); + std::iota(order.begin(), order.end(), size_t{0}); + std::ranges::shuffle(order, thread_rng); + std::vector v; + std::string key_str; start_latch.arrive_and_wait(); - return RunReadQueries(db, queries, thread_rng); + const std::unique_ptr it{db.NewIterator()}; + // Every read must agree with the oracle, the source of truth. + for (const auto i : order) { + const auto& [op, key] = queries[i]; + switch (op) { + case ReadOp::Read: + if (const auto oit{oracle.find(key)}; oit != oracle.end()) { + assert(db.Read(key, v) && v == MakeValue(key, oit->second)); + } else { + assert(!db.Read(key, v)); + } + break; + case ReadOp::Exists: + assert(db.Exists(key) == oracle.contains(key)); + break; + case ReadOp::IteratorSeek: + it->Seek(key); + // Skip the obfuscation metadata entry (a non-uint16_t key) if we land + // on it, so the result matches the oracle, which only tracks user keys. + if (it->Valid() && it->GetKey(key_str) && key_str == OBFUSCATION_KEY) it->Next(); + if (const auto oit{oracle.lower_bound(key)}; oit != oracle.end()) { + assert(it->Valid()); + uint16_t actual_key; + assert(it->GetKey(actual_key) && actual_key == oit->first); + assert(it->GetValue(v) && v == MakeValue(actual_key, oit->second)); + } else { + assert(!it->Valid()); + } + break; + } + } }; }); - auto futures{*Assert(pool.Submit(std::move(tasks)))}; + auto futures{*Assert(g_read_pool.Submit(std::move(tasks)))}; // Release the workers and immediately run the queued compaction on this // thread, so compaction races against the concurrent reads. start_latch.arrive_and_wait(); det_env.DrainWork(); - for (auto& fut : futures) assert(fut.get() == baseline); + for (auto& fut : futures) fut.get(); det_env.DrainWork(); }