From 41262cc4d53389ddadc59573e4eb246e085268ce Mon Sep 17 00:00:00 2001 From: Eugene Siegel Date: Thu, 5 Jun 2025 12:19:28 -0400 Subject: [PATCH] log: introduce LogRateLimiter, LogLimitStats, Status LogRateLimiter will be used to keep track of source locations and our current time-based logging window. It contains an unordered_map and a m_suppressions_active bool to track source locations. The map is keyed by std::source_location, so a custom Hash function (SourceLocationHasher) and custom KeyEqual function (SourceLocationEqual) is provided. SourceLocationHasher uses CSipHasher(0,0) under the hood to get a uniform distribution. A public Reset method is provided so that a scheduler (e.g. the "b-scheduler" thread) can periodically reset LogRateLimiter's state when the time window has elapsed. The LogRateLimiter::Consume method checks if we have enough available bytes in our rate limiting budget to log an additional string. It returns a Status enum that denotes the rate limiting status and can be used by the caller to emit a warning, skip logging, etc. The Status enum has three states: - UNSUPPRESSED (logging was successful) - NEWLY_SUPPRESSED (logging was succcesful, next log will be suppressed) - STILL_SUPPRESSED (logging was unsuccessful) LogLimitStats counts the available bytes left for logging per source location for the current logging window. It does not track actual source locations; it is used as a value in m_source_locations. Also exposes a SuppressionsActive() method so the logger can use that in a later commit to prefix [*] to logs whenenever suppressions are active. Co-Authored-By: Niklas Gogge Co-Authored-By: stickies-v Github-Pull: #32604 Rebased-From: afb9e39ec5552e598a5febaa81820d5509b7c5d2 --- src/logging.cpp | 55 ++++++++++++++++++++++ src/logging.h | 95 ++++++++++++++++++++++++++++++++++++++ src/test/logging_tests.cpp | 77 ++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+) diff --git a/src/logging.cpp b/src/logging.cpp index 5f055566ef5..eca6eac6727 100644 --- a/src/logging.cpp +++ b/src/logging.cpp @@ -367,6 +367,30 @@ static size_t MemUsage(const BCLog::Logger::BufferedLog& buflog) return buflog.str.size() + buflog.logging_function.size() + buflog.source_file.size() + buflog.threadname.size() + memusage::MallocUsage(sizeof(memusage::list_node)); } +BCLog::LogRateLimiter::LogRateLimiter( + SchedulerFunction scheduler_func, + uint64_t max_bytes, + std::chrono::seconds reset_window) : m_max_bytes{max_bytes}, m_reset_window{reset_window} +{ + scheduler_func([this] { Reset(); }, reset_window); +} + +BCLog::LogRateLimiter::Status BCLog::LogRateLimiter::Consume( + const std::source_location& source_loc, + const std::string& str) +{ + StdLockGuard scoped_lock(m_mutex); + auto& counter{m_source_locations.try_emplace(source_loc, m_max_bytes).first->second}; + Status status{counter.GetDroppedBytes() > 0 ? Status::STILL_SUPPRESSED : Status::UNSUPPRESSED}; + + if (!counter.Consume(str.size()) && status == Status::UNSUPPRESSED) { + status = Status::NEWLY_SUPPRESSED; + m_suppression_active = true; + } + + return status; +} + void BCLog::Logger::FormatLogStrInPlace(std::string& str, BCLog::LogFlags category, BCLog::Level level, std::string_view source_file, int source_line, std::string_view logging_function, std::string_view threadname, SystemClock::time_point now, std::chrono::seconds mocktime) const { if (!str.ends_with('\n')) str.push_back('\n'); @@ -492,6 +516,37 @@ void BCLog::Logger::ShrinkDebugFile() fclose(file); } +void BCLog::LogRateLimiter::Reset() +{ + decltype(m_source_locations) source_locations; + { + StdLockGuard scoped_lock(m_mutex); + source_locations.swap(m_source_locations); + m_suppression_active = false; + } + for (const auto& [source_loc, counter] : source_locations) { + uint64_t dropped_bytes{counter.GetDroppedBytes()}; + if (dropped_bytes == 0) continue; + LogPrintLevel_( + LogFlags::ALL, Level::Info, + "Restarting logging from %s:%d (%s): %d bytes were dropped during the last %ss.\n", + source_loc.file_name(), source_loc.line(), source_loc.function_name(), + dropped_bytes, Ticks(m_reset_window)); + } +} + +bool BCLog::LogLimitStats::Consume(uint64_t bytes) +{ + if (bytes > m_available_bytes) { + m_dropped_bytes += bytes; + m_available_bytes = 0; + return false; + } + + m_available_bytes -= bytes; + return true; +} + bool BCLog::Logger::SetLogLevel(std::string_view level_str) { const auto level = GetLogLevel(level_str); diff --git a/src/logging.h b/src/logging.h index fdc12c79b32..7411d33f5f0 100644 --- a/src/logging.h +++ b/src/logging.h @@ -6,6 +6,7 @@ #ifndef BITCOIN_LOGGING_H #define BITCOIN_LOGGING_H +#include #include #include #include @@ -14,11 +15,14 @@ #include #include +#include #include #include #include +#include #include #include +#include #include static const bool DEFAULT_LOGTIMEMICROS = false; @@ -31,6 +35,24 @@ extern const char * const DEFAULT_DEBUGLOGFILE; extern bool fLogIPs; +struct SourceLocationEqual { + bool operator()(const std::source_location& lhs, const std::source_location& rhs) const noexcept + { + return lhs.line() == rhs.line() && std::string_view(lhs.file_name()) == std::string_view(rhs.file_name()); + } +}; + +struct SourceLocationHasher { + size_t operator()(const std::source_location& s) const noexcept + { + // Use CSipHasher(0, 0) as a simple way to get uniform distribution. + return static_cast(CSipHasher(0, 0) + .Write(std::hash{}(s.file_name())) + .Write(s.line()) + .Finalize()); + } +}; + struct LogCategory { std::string category; bool active; @@ -82,6 +104,79 @@ namespace BCLog { }; constexpr auto DEFAULT_LOG_LEVEL{Level::Debug}; constexpr size_t DEFAULT_MAX_LOG_BUFFER{1'000'000}; // buffer up to 1MB of log data prior to StartLogging + constexpr uint64_t RATELIMIT_MAX_BYTES{1024 * 1024}; // maximum number of bytes that can be logged within one window + + //! Keeps track of an individual source location and how many available bytes are left for logging from it. + class LogLimitStats + { + private: + //! Remaining bytes in the current window interval. + uint64_t m_available_bytes; + //! Number of bytes that were not consumed within the current window. + uint64_t m_dropped_bytes{0}; + + public: + LogLimitStats(uint64_t max_bytes) : m_available_bytes{max_bytes} {} + //! Consume bytes from the window if enough bytes are available. + //! + //! Returns whether enough bytes were available. + bool Consume(uint64_t bytes); + + uint64_t GetAvailableBytes() const + { + return m_available_bytes; + } + + uint64_t GetDroppedBytes() const + { + return m_dropped_bytes; + } + }; + + /** + * Fixed window rate limiter for logging. + */ + class LogRateLimiter + { + private: + mutable StdMutex m_mutex; + + //! Counters for each source location that has attempted to log something. + std::unordered_map m_source_locations GUARDED_BY(m_mutex); + //! True if at least one log location is suppressed. Cached view on m_source_locations for performance reasons. + std::atomic m_suppression_active{false}; + + public: + using SchedulerFunction = std::function, std::chrono::milliseconds)>; + /** + * @param scheduler_func Callable object used to schedule resetting the window. The first + * parameter is the function to be executed, and the second is the + * reset_window interval. + * @param max_bytes Maximum number of bytes that can be logged for each source + * location. + * @param reset_window Time window after which the byte counters are reset. + */ + LogRateLimiter(SchedulerFunction scheduler_func, uint64_t max_bytes, std::chrono::seconds reset_window); + //! Maximum number of bytes logged per location per window. + const uint64_t m_max_bytes; + //! Interval after which the window is reset. + const std::chrono::seconds m_reset_window; + //! Suppression status of a source log location. + enum class Status { + UNSUPPRESSED, // string fits within the limit + NEWLY_SUPPRESSED, // suppression has started since this string + STILL_SUPPRESSED, // suppression is still ongoing + }; + //! Consumes `source_loc`'s available bytes corresponding to the size of the (formatted) + //! `str` and returns its status. + [[nodiscard]] Status Consume( + const std::source_location& source_loc, + const std::string& str) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + //! Resets all usage to zero. Called periodically by the scheduler. + void Reset() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + //! Returns true if any log locations are currently being suppressed. + bool SuppressionsActive() const { return m_suppression_active; } + }; class Logger { diff --git a/src/test/logging_tests.cpp b/src/test/logging_tests.cpp index 77ec81e5977..5d24ad771c2 100644 --- a/src/test/logging_tests.cpp +++ b/src/test/logging_tests.cpp @@ -5,11 +5,13 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -276,4 +278,79 @@ BOOST_FIXTURE_TEST_CASE(logging_Conf, LogSetup) } } +void MockForwardAndSync(CScheduler& scheduler, std::chrono::seconds duration) +{ + scheduler.MockForward(duration); + std::promise promise; + scheduler.scheduleFromNow([&promise] { promise.set_value(); }, 0ms); + promise.get_future().wait(); +} + +BOOST_AUTO_TEST_CASE(logging_log_rate_limiter) +{ + CScheduler scheduler{}; + scheduler.m_service_thread = std::thread([&scheduler] { scheduler.serviceQueue(); }); + uint64_t max_bytes{1024}; + auto reset_window{1min}; + auto sched_func = [&scheduler](auto func, auto window) { scheduler.scheduleEvery(std::move(func), window); }; + BCLog::LogRateLimiter limiter{sched_func, max_bytes, reset_window}; + + using Status = BCLog::LogRateLimiter::Status; + auto source_loc_1{std::source_location::current()}; + auto source_loc_2{std::source_location::current()}; + + // A fresh limiter should not have any suppressions + BOOST_CHECK(!limiter.SuppressionsActive()); + + // Resetting an unused limiter is fine + limiter.Reset(); + BOOST_CHECK(!limiter.SuppressionsActive()); + + // No suppression should happen until more than max_bytes have been consumed + BOOST_CHECK_EQUAL(limiter.Consume(source_loc_1, std::string(max_bytes - 1, 'a')), Status::UNSUPPRESSED); + BOOST_CHECK_EQUAL(limiter.Consume(source_loc_1, "a"), Status::UNSUPPRESSED); + BOOST_CHECK(!limiter.SuppressionsActive()); + BOOST_CHECK_EQUAL(limiter.Consume(source_loc_1, "a"), Status::NEWLY_SUPPRESSED); + BOOST_CHECK(limiter.SuppressionsActive()); + BOOST_CHECK_EQUAL(limiter.Consume(source_loc_1, "a"), Status::STILL_SUPPRESSED); + BOOST_CHECK(limiter.SuppressionsActive()); + + // Location 2 should not be affected by location 1's suppression + BOOST_CHECK_EQUAL(limiter.Consume(source_loc_2, std::string(max_bytes, 'a')), Status::UNSUPPRESSED); + BOOST_CHECK_EQUAL(limiter.Consume(source_loc_2, "a"), Status::NEWLY_SUPPRESSED); + BOOST_CHECK(limiter.SuppressionsActive()); + + // After reset_window time has passed, all suppressions should be cleared. + MockForwardAndSync(scheduler, reset_window); + + BOOST_CHECK(!limiter.SuppressionsActive()); + BOOST_CHECK_EQUAL(limiter.Consume(source_loc_1, std::string(max_bytes, 'a')), Status::UNSUPPRESSED); + BOOST_CHECK_EQUAL(limiter.Consume(source_loc_2, std::string(max_bytes, 'a')), Status::UNSUPPRESSED); + + scheduler.stop(); +} + +BOOST_AUTO_TEST_CASE(logging_log_limit_stats) +{ + BCLog::LogLimitStats counter{BCLog::RATELIMIT_MAX_BYTES}; + + // Check that counter gets initialized correctly. + BOOST_CHECK_EQUAL(counter.GetAvailableBytes(), BCLog::RATELIMIT_MAX_BYTES); + BOOST_CHECK_EQUAL(counter.GetDroppedBytes(), 0ull); + + const uint64_t MESSAGE_SIZE{512 * 1024}; + BOOST_CHECK(counter.Consume(MESSAGE_SIZE)); + BOOST_CHECK_EQUAL(counter.GetAvailableBytes(), BCLog::RATELIMIT_MAX_BYTES - MESSAGE_SIZE); + BOOST_CHECK_EQUAL(counter.GetDroppedBytes(), 0ull); + + BOOST_CHECK(counter.Consume(MESSAGE_SIZE)); + BOOST_CHECK_EQUAL(counter.GetAvailableBytes(), BCLog::RATELIMIT_MAX_BYTES - MESSAGE_SIZE * 2); + BOOST_CHECK_EQUAL(counter.GetDroppedBytes(), 0ull); + + // Consuming more bytes after already having consumed 1MB should fail. + BOOST_CHECK(!counter.Consume(500)); + BOOST_CHECK_EQUAL(counter.GetAvailableBytes(), 0ull); + BOOST_CHECK_EQUAL(counter.GetDroppedBytes(), 500ull); +} + BOOST_AUTO_TEST_SUITE_END()