fuzz: target concurrent leveldb reads

This commit is contained in:
Andrew Toth
2026-06-02 09:45:32 -04:00
parent 6609088fe6
commit 8cb8653a22

View File

@@ -4,12 +4,16 @@
#include <dbwrapper.h>
#include <compat/byteswap.h>
#include <random.h>
#include <sync.h>
#include <test/fuzz/FuzzedDataProvider.h>
#include <test/fuzz/fuzz.h>
#include <test/fuzz/util.h>
#include <test/util/random.h>
#include <test/util/setup_common.h>
#include <util/byte_units.h>
#include <util/check.h>
#include <util/threadpool.h>
#include <leveldb/env.h>
#include <leveldb/helpers/memenv/memenv.h>
@@ -18,12 +22,16 @@
#include <cassert>
#include <cstdint>
#include <deque>
#include <functional>
#include <future>
#include <latch>
#include <map>
#include <memory>
#include <numeric>
#include <optional>
#include <set>
#include <string>
#include <tuple>
#include <vector>
namespace {
@@ -57,29 +65,35 @@ class DeterministicEnv final : public leveldb::EnvWrapper
void* arg;
};
std::deque<Work> m_queue;
Mutex m_mutex;
std::deque<Work> m_queue GUARDED_BY(m_mutex);
public:
explicit DeterministicEnv(leveldb::Env* base) : EnvWrapper(base) {}
void Schedule(WorkFunction function, void* arg) override
void Schedule(WorkFunction function, void* arg) override EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
m_queue.push_back({function, arg});
}
/** Execute one pending background task. The task may schedule a
* successor which is left pending for a later call. */
bool RunOne()
bool RunOne() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
if (m_queue.empty()) return false;
const Work work{m_queue.front()};
m_queue.pop_front();
Work work;
{
LOCK(m_mutex);
if (m_queue.empty()) return false;
work = m_queue.front();
m_queue.pop_front();
}
work.function(work.arg);
return true;
}
/** Execute pending background tasks until none remain. */
void DrainWork() { while (RunOne()) {} }
void DrainWork() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { while (RunOne()) {} }
};
constexpr size_t MAX_VALUE_LEN{4096};
@@ -152,6 +166,9 @@ void VerifyIterator(CDBWrapper& dbw, const Oracle& oracle,
assert(oracle_it == oracle.end());
}
/** Maximum number of concurrent reader threads in dbwrapper_concurrent_reads. */
constexpr size_t MAX_READ_WORKERS{16};
/** Build randomized DBParams from the fuzz input, shared by all targets. */
DBParams ConsumeDBParams(FuzzedDataProvider& provider, leveldb::Env* testing_env,
bool obfuscate, DBOptions options = {})
@@ -168,6 +185,39 @@ DBParams ConsumeDBParams(FuzzedDataProvider& provider, leveldb::Env* testing_env
};
}
/** A single read-only operation run concurrently in dbwrapper_concurrent_reads. */
enum class ReadOp { Read, Exists, IteratorSeek };
using ReadQuery = std::tuple<ReadOp, uint16_t>;
using Results = std::vector<std::optional<std::string>>;
Results RunReadQueries(CDBWrapper& db, const std::vector<ReadQuery>& queries, FastRandomContext& rng)
{
std::vector<size_t> order(queries.size());
std::iota(order.begin(), order.end(), size_t{0});
std::shuffle(order.begin(), order.end(), rng);
Results results(queries.size());
for (const auto i : order) {
const auto& [op, key] = queries[i];
std::string v;
switch (op) {
case ReadOp::Read:
if (db.Read(key, v)) results[i] = std::move(v);
break;
case ReadOp::Exists:
if (db.Exists(key)) results[i] = std::move(v);
break;
case ReadOp::IteratorSeek: {
const std::unique_ptr<CDBIterator> it{db.NewIterator()};
it->Seek(key);
if (it->Valid() && it->GetValue(v)) results[i] = std::move(v);
break;
}
}
}
return results;
}
template <typename DrainWorkFn, typename RunOneFn>
void TestDbWrapper(FuzzedDataProvider& provider,
leveldb::Env* testing_env,
@@ -330,3 +380,74 @@ FUZZ_TARGET(dbwrapper_threaded, .init = [] { static auto setup{MakeNoLogFileCont
/*run_one=*/[] { return false; },
/*allow_force_compact=*/true);
}
FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLogFileContext<>()}; })
{
SeedRandomStateForTest(SeedRand::ZEROS);
FuzzedDataProvider provider{buffer.data(), buffer.size()};
const auto memenv{std::unique_ptr<leveldb::Env>{leveldb::NewMemEnv(leveldb::Env::Default())}};
DeterministicEnv det_env{memenv.get()};
CDBWrapper db{ConsumeDBParams(provider, &det_env, /*obfuscate=*/provider.ConsumeBool())};
// Seed the DB. Drain work after small batches so we don't deadlock on a
// scheduled compaction.
const size_t num_entries{provider.ConsumeIntegralInRange<size_t>(100, 5'000)};
std::vector<uint16_t> keys;
keys.reserve(num_entries);
constexpr size_t SEED_BATCH_SIZE{400};
for (size_t start{0}; start < num_entries; start += SEED_BATCH_SIZE) {
CDBBatch batch{db};
const size_t end{std::min(start + SEED_BATCH_SIZE, num_entries)};
for (size_t i{start}; i < end; ++i) {
const auto k{ConsumeKey(provider)};
batch.Write(k, MakeValue(k, ConsumeValueSize(provider)));
keys.push_back(k);
}
det_env.DrainWork();
db.WriteBatch(batch, /*fSync=*/true);
}
while (provider.ConsumeBool() && det_env.RunOne()) {}
// Build query list from seeded and random keys.
const size_t num_queries{provider.ConsumeIntegralInRange<size_t>(1, 2'000)};
std::vector<ReadQuery> queries;
queries.reserve(num_queries);
for (size_t i{0}; i < num_queries; ++i) {
const auto op{static_cast<ReadOp>(provider.ConsumeIntegralInRange<int>(0, 2))};
const uint16_t key{provider.ConsumeBool()
? keys[provider.ConsumeIntegralInRange<size_t>(0, keys.size() - 1)]
: ConsumeKey(provider)};
queries.emplace_back(op, key);
}
// Baseline read on a single thread
FastRandomContext rng{ConsumeUInt256(provider)};
const Results baseline{RunReadQueries(db, queries, rng)};
ThreadPool pool{"dbfuzz"};
pool.Start(MAX_READ_WORKERS);
// Workers + main thread synchronize on the latch so all reads start together.
std::latch start_latch{static_cast<ptrdiff_t>(MAX_READ_WORKERS + 1)};
std::vector<std::function<Results()>> tasks(MAX_READ_WORKERS);
std::generate(tasks.begin(), tasks.end(), [&] {
return [&, seed = rng.rand256()]() -> Results {
FastRandomContext thread_rng{seed};
start_latch.arrive_and_wait();
return RunReadQueries(db, queries, thread_rng);
};
});
auto futures{*Assert(pool.Submit(std::move(tasks)))};
// Release the workers and immediately run the queued compaction on this
// thread, so compaction races against the concurrent reads.
start_latch.arrive_and_wait();
det_env.DrainWork();
for (auto& fut : futures) assert(fut.get() == baseline);
det_env.DrainWork();
}