diff --git a/src/test/fuzz/dbwrapper.cpp b/src/test/fuzz/dbwrapper.cpp index 13fff136aee..784b696d816 100644 --- a/src/test/fuzz/dbwrapper.cpp +++ b/src/test/fuzz/dbwrapper.cpp @@ -4,12 +4,16 @@ #include #include +#include +#include #include #include #include #include #include #include +#include +#include #include #include @@ -18,12 +22,16 @@ #include #include #include +#include +#include +#include #include #include #include #include #include #include +#include #include namespace { @@ -57,29 +65,35 @@ class DeterministicEnv final : public leveldb::EnvWrapper void* arg; }; - std::deque m_queue; + Mutex m_mutex; + std::deque m_queue GUARDED_BY(m_mutex); public: explicit DeterministicEnv(leveldb::Env* base) : EnvWrapper(base) {} - void Schedule(WorkFunction function, void* arg) override + void Schedule(WorkFunction function, void* arg) override EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { + LOCK(m_mutex); m_queue.push_back({function, arg}); } /** Execute one pending background task. The task may schedule a * successor which is left pending for a later call. */ - bool RunOne() + bool RunOne() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { - if (m_queue.empty()) return false; - const Work work{m_queue.front()}; - m_queue.pop_front(); + Work work; + { + LOCK(m_mutex); + if (m_queue.empty()) return false; + work = m_queue.front(); + m_queue.pop_front(); + } work.function(work.arg); return true; } /** Execute pending background tasks until none remain. */ - void DrainWork() { while (RunOne()) {} } + void DrainWork() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { while (RunOne()) {} } }; constexpr size_t MAX_VALUE_LEN{4096}; @@ -152,6 +166,58 @@ void VerifyIterator(CDBWrapper& dbw, const Oracle& oracle, assert(oracle_it == oracle.end()); } +/** Maximum number of concurrent reader threads in dbwrapper_concurrent_reads. */ +constexpr size_t MAX_READ_WORKERS{16}; + +/** Build randomized DBParams from the fuzz input, shared by all targets. */ +DBParams ConsumeDBParams(FuzzedDataProvider& provider, leveldb::Env* testing_env, + bool obfuscate, DBOptions options = {}) +{ + return DBParams{ + .path = "dbwrapper_fuzz", + .cache_bytes = provider.ConsumeIntegralInRange(64 << 10, 1_MiB), + .obfuscate = obfuscate, + .options = options, + .testing_env = testing_env, + .max_file_size = provider.ConsumeBool() + ? DBWRAPPER_MAX_FILE_SIZE + : provider.ConsumeIntegralInRange(1_MiB, 4_MiB), + }; +} + +/** A single read-only operation run concurrently in dbwrapper_concurrent_reads. */ +enum class ReadOp { Read, Exists, IteratorSeek }; +using ReadQuery = std::tuple; +using Results = std::vector>; + +Results RunReadQueries(CDBWrapper& db, const std::vector& queries, FastRandomContext& rng) +{ + std::vector order(queries.size()); + std::iota(order.begin(), order.end(), size_t{0}); + std::shuffle(order.begin(), order.end(), rng); + + Results results(queries.size()); + for (const auto i : order) { + const auto& [op, key] = queries[i]; + std::string v; + switch (op) { + case ReadOp::Read: + if (db.Read(key, v)) results[i] = std::move(v); + break; + case ReadOp::Exists: + if (db.Exists(key)) results[i] = std::move(v); + break; + case ReadOp::IteratorSeek: { + const std::unique_ptr it{db.NewIterator()}; + it->Seek(key); + if (it->Valid() && it->GetValue(v)) results[i] = std::move(v); + break; + } + } + } + return results; +} + template void TestDbWrapper(FuzzedDataProvider& provider, leveldb::Env* testing_env, @@ -164,16 +230,7 @@ void TestDbWrapper(FuzzedDataProvider& provider, const bool obfuscate{provider.ConsumeBool()}; const auto make_db{[&](DBOptions options = {}) { - return std::make_unique(DBParams{ - .path = "dbwrapper_fuzz", - .cache_bytes = provider.ConsumeIntegralInRange(64 << 10, 1_MiB), - .obfuscate = obfuscate, - .options = options, - .testing_env = testing_env, - .max_file_size = provider.ConsumeBool() - ? DBWRAPPER_MAX_FILE_SIZE - : provider.ConsumeIntegralInRange(1_MiB, 4_MiB), - }); + return std::make_unique(ConsumeDBParams(provider, testing_env, obfuscate, options)); }}; std::unique_ptr dbw{make_db()}; @@ -323,3 +380,74 @@ FUZZ_TARGET(dbwrapper_threaded, .init = [] { static auto setup{MakeNoLogFileCont /*run_one=*/[] { return false; }, /*allow_force_compact=*/true); } + +FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLogFileContext<>()}; }) +{ + SeedRandomStateForTest(SeedRand::ZEROS); + + FuzzedDataProvider provider{buffer.data(), buffer.size()}; + + const auto memenv{std::unique_ptr{leveldb::NewMemEnv(leveldb::Env::Default())}}; + DeterministicEnv det_env{memenv.get()}; + + CDBWrapper db{ConsumeDBParams(provider, &det_env, /*obfuscate=*/provider.ConsumeBool())}; + + // Seed the DB. Drain work after small batches so we don't deadlock on a + // scheduled compaction. + const size_t num_entries{provider.ConsumeIntegralInRange(100, 5'000)}; + std::vector keys; + keys.reserve(num_entries); + constexpr size_t SEED_BATCH_SIZE{400}; + for (size_t start{0}; start < num_entries; start += SEED_BATCH_SIZE) { + CDBBatch batch{db}; + const size_t end{std::min(start + SEED_BATCH_SIZE, num_entries)}; + for (size_t i{start}; i < end; ++i) { + const auto k{ConsumeKey(provider)}; + batch.Write(k, MakeValue(k, ConsumeValueSize(provider))); + keys.push_back(k); + } + det_env.DrainWork(); + db.WriteBatch(batch, /*fSync=*/true); + } + + while (provider.ConsumeBool() && det_env.RunOne()) {} + + // Build query list from seeded and random keys. + const size_t num_queries{provider.ConsumeIntegralInRange(1, 2'000)}; + std::vector queries; + queries.reserve(num_queries); + for (size_t i{0}; i < num_queries; ++i) { + const auto op{static_cast(provider.ConsumeIntegralInRange(0, 2))}; + const uint16_t key{provider.ConsumeBool() + ? keys[provider.ConsumeIntegralInRange(0, keys.size() - 1)] + : ConsumeKey(provider)}; + queries.emplace_back(op, key); + } + + // Baseline read on a single thread + FastRandomContext rng{ConsumeUInt256(provider)}; + const Results baseline{RunReadQueries(db, queries, rng)}; + + ThreadPool pool{"dbfuzz"}; + pool.Start(MAX_READ_WORKERS); + + // Workers + main thread synchronize on the latch so all reads start together. + std::latch start_latch{static_cast(MAX_READ_WORKERS + 1)}; + std::vector> tasks(MAX_READ_WORKERS); + std::generate(tasks.begin(), tasks.end(), [&] { + return [&, seed = rng.rand256()]() -> Results { + FastRandomContext thread_rng{seed}; + start_latch.arrive_and_wait(); + return RunReadQueries(db, queries, thread_rng); + }; + }); + auto futures{*Assert(pool.Submit(std::move(tasks)))}; + + // Release the workers and immediately run the queued compaction on this + // thread, so compaction races against the concurrent reads. + start_latch.arrive_and_wait(); + det_env.DrainWork(); + + for (auto& fut : futures) assert(fut.get() == baseline); + det_env.DrainWork(); +}