From fdcd3a233c466e4bb51496460ae32c74b66825c8 Mon Sep 17 00:00:00 2001 From: Andrey Semashev Date: Thu, 12 Jun 2025 00:22:37 +0300 Subject: [PATCH] Reimplemented adaptive_mutex using Boost.Atomic. adaptive_mutex now uses Boost.Atomic on all platforms that support lock-free atomics native waiting/notifying operations for unsigned int and pthreads on the other platforms. Windows always uses Boost.Atomic. The new implementation blocks the thread after a number of attempts to acquire the lock with spinning backoff. This should reduce CPU load if for some reason the thread that owns the lock is holding it for a long time (which typically should not happen, since adaptive_mutex is currently only used in threadsafe_queue, which doesn't hold the lock for too long, unless the holder gets preempted). The change may be significant on POSIX systems, which previously always used pthreads, and performance characteristics of pthread_mutex_t could have been different from our Boost.Atomic-based implementation. --- include/boost/log/detail/adaptive_mutex.hpp | 165 ++++++++++---------- 1 file changed, 86 insertions(+), 79 deletions(-) diff --git a/include/boost/log/detail/adaptive_mutex.hpp b/include/boost/log/detail/adaptive_mutex.hpp index fa06a0b..e642a2a 100644 --- a/include/boost/log/detail/adaptive_mutex.hpp +++ b/include/boost/log/detail/adaptive_mutex.hpp @@ -1,5 +1,5 @@ /* - * Copyright Andrey Semashev 2007 - 2015. + * Copyright Andrey Semashev 2007 - 2025. * Distributed under the Boost Software License, Version 1.0. * (See accompanying file LICENSE_1_0.txt or copy at * http://www.boost.org/LICENSE_1_0.txt) @@ -24,36 +24,19 @@ #ifndef BOOST_LOG_NO_THREADS -#if defined(BOOST_THREAD_POSIX) // This one can be defined by users, so it should go first -#define BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD -#elif defined(BOOST_WINDOWS) -#define BOOST_LOG_ADAPTIVE_MUTEX_USE_WINAPI -#elif defined(BOOST_HAS_PTHREADS) -#define BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD +#include + +#if defined(BOOST_WINDOWS) || (BOOST_ATOMIC_INT_LOCK_FREE == 2) && (BOOST_ATOMIC_HAS_NATIVE_INT_WAIT_NOTIFY == 2) +#define BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_ATOMIC +#else +#define BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_PTHREAD #endif -#if defined(BOOST_LOG_ADAPTIVE_MUTEX_USE_WINAPI) - -#include -#include -#include - -#if defined(__INTEL_COMPILER) || defined(_MSC_VER) -# if defined(__INTEL_COMPILER) -# define BOOST_LOG_COMPILER_BARRIER __memory_barrier() -# elif defined(__clang__) // clang-win also defines _MSC_VER -# define BOOST_LOG_COMPILER_BARRIER __atomic_signal_fence(__ATOMIC_SEQ_CST) -# else -extern "C" void _ReadWriteBarrier(void); -# if defined(BOOST_MSVC) -# pragma intrinsic(_ReadWriteBarrier) -# endif -# define BOOST_LOG_COMPILER_BARRIER _ReadWriteBarrier() -# endif -#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__)) -# define BOOST_LOG_COMPILER_BARRIER __asm__ __volatile__("" : : : "memory") -#endif +#if defined(BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_ATOMIC) +#include +#include +#include #include namespace boost { @@ -62,73 +45,97 @@ BOOST_LOG_OPEN_NAMESPACE namespace aux { -//! A mutex that performs spinning or thread yielding in case of contention +//! A mutex that performs spinning or thread blocking in case of contention class adaptive_mutex { private: - enum state + enum state : unsigned int { - initial_pause = 2, - max_pause = 16 + locked_bit = 1u, + waiters_one = 1u << 1u }; - long m_State; + enum pause_constants : unsigned int + { + initial_pause = 1u, + max_pause = 16u + }; + + //! Mutex state. Lowest bit indicates whether the mutex is locked or not, all higher bits store the number of blocked waiters. See \c state enum. + boost::atomics::atomic< unsigned int > m_state; public: - adaptive_mutex() : m_State(0) {} + adaptive_mutex() noexcept : m_state(0u) {} - bool try_lock() + // Non-copyable + adaptive_mutex(adaptive_mutex const&) = delete; + adaptive_mutex& operator= (adaptive_mutex const&) = delete; + + bool try_lock() noexcept { - return (BOOST_INTERLOCKED_COMPARE_EXCHANGE(&m_State, 1L, 0L) == 0L); + unsigned int old_state = m_state.load(boost::memory_order::relaxed); + return (old_state & locked_bit) == 0u && + m_state.compare_exchange_strong(old_state, old_state | locked_bit, boost::memory_order::acquire, boost::memory_order::relaxed); } - void lock() + void lock() noexcept { -#if defined(BOOST_LOG_AUX_PAUSE) unsigned int pause_count = initial_pause; -#endif - while (!try_lock()) + unsigned int waiter_added = 0u; + unsigned int old_state = m_state.load(boost::memory_order::relaxed); + while (true) { -#if defined(BOOST_LOG_AUX_PAUSE) + if ((old_state & locked_bit) == 0u) + { + unsigned int new_state = (old_state - waiter_added) | locked_bit; + if (m_state.compare_exchange_weak(old_state, new_state, boost::memory_order::acquire, boost::memory_order::relaxed)) + break; + + continue; + } + if (pause_count < max_pause) { - for (unsigned int i = 0; i < pause_count; ++i) + if (waiter_added != 0u) { - BOOST_LOG_AUX_PAUSE; + old_state = m_state.sub(waiters_one, boost::memory_order::relaxed); + waiter_added = 0u; } - pause_count += pause_count; + else + { + for (unsigned int i = 0u; i < pause_count; ++i) + { + boost::atomics::thread_pause(); + } + pause_count += pause_count; + + old_state = m_state.load(boost::memory_order::relaxed); + } + } + else if (waiter_added == 0u) + { + old_state = m_state.add(waiters_one, boost::memory_order::relaxed); + waiter_added = waiters_one; } else { // Restart spinning after waking up this thread pause_count = initial_pause; - boost::winapi::SwitchToThread(); + old_state = m_state.wait(old_state, boost::memory_order::relaxed); } -#else - boost::winapi::SwitchToThread(); -#endif } } - void unlock() + void unlock() noexcept { -#if (defined(_M_IX86) || defined(_M_AMD64)) && defined(BOOST_LOG_COMPILER_BARRIER) - BOOST_LOG_COMPILER_BARRIER; - m_State = 0L; - BOOST_LOG_COMPILER_BARRIER; -#else - BOOST_INTERLOCKED_EXCHANGE(&m_State, 0L); -#endif + if (m_state.and_and_test(~static_cast< unsigned int >(locked_bit), boost::memory_order::release)) + { + // If the resulting state is non-zero then there are blocked waiters + m_state.notify_one(); + } } - - // Non-copyable - BOOST_DELETED_FUNCTION(adaptive_mutex(adaptive_mutex const&)) - BOOST_DELETED_FUNCTION(adaptive_mutex& operator= (adaptive_mutex const&)) }; -#undef BOOST_LOG_AUX_PAUSE -#undef BOOST_LOG_COMPILER_BARRIER - } // namespace aux BOOST_LOG_CLOSE_NAMESPACE // namespace log @@ -137,7 +144,7 @@ BOOST_LOG_CLOSE_NAMESPACE // namespace log #include -#elif defined(BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD) +#elif defined(BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_PTHREAD) #include #include @@ -147,7 +154,7 @@ BOOST_LOG_CLOSE_NAMESPACE // namespace log #include #if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) -#define BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD_MUTEX_ADAPTIVE_NP +#define BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_PTHREAD_MUTEX_ADAPTIVE_NP #endif namespace boost { @@ -156,37 +163,41 @@ BOOST_LOG_OPEN_NAMESPACE namespace aux { -//! A mutex that performs spinning or thread yielding in case of contention +//! A mutex that performs spinning or thread blocking in case of contention class adaptive_mutex { private: - pthread_mutex_t m_State; + pthread_mutex_t m_state; public: adaptive_mutex() { -#if defined(BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD_MUTEX_ADAPTIVE_NP) +#if defined(BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_PTHREAD_MUTEX_ADAPTIVE_NP) pthread_mutexattr_t attrs; pthread_mutexattr_init(&attrs); pthread_mutexattr_settype(&attrs, PTHREAD_MUTEX_ADAPTIVE_NP); - const int err = pthread_mutex_init(&m_State, &attrs); + const int err = pthread_mutex_init(&m_state, &attrs); pthread_mutexattr_destroy(&attrs); #else - const int err = pthread_mutex_init(&m_State, NULL); + const int err = pthread_mutex_init(&m_state, nullptr); #endif if (BOOST_UNLIKELY(err != 0)) throw_system_error(err, "Failed to initialize an adaptive mutex", "adaptive_mutex::adaptive_mutex()", __FILE__, __LINE__); } + // Non-copyable + adaptive_mutex(adaptive_mutex const&) = delete; + adaptive_mutex& operator= (adaptive_mutex const&) = delete; + ~adaptive_mutex() { - BOOST_VERIFY(pthread_mutex_destroy(&m_State) == 0); + BOOST_VERIFY(pthread_mutex_destroy(&m_state) == 0); } bool try_lock() { - const int err = pthread_mutex_trylock(&m_State); + const int err = pthread_mutex_trylock(&m_state); if (err == 0) return true; if (BOOST_UNLIKELY(err != EBUSY)) @@ -196,20 +207,16 @@ public: void lock() { - const int err = pthread_mutex_lock(&m_State); + const int err = pthread_mutex_lock(&m_state); if (BOOST_UNLIKELY(err != 0)) throw_system_error(err, "Failed to lock an adaptive mutex", "adaptive_mutex::lock()", __FILE__, __LINE__); } - void unlock() + void unlock() noexcept { - BOOST_VERIFY(pthread_mutex_unlock(&m_State) == 0); + BOOST_VERIFY(pthread_mutex_unlock(&m_state) == 0); } - // Non-copyable - BOOST_DELETED_FUNCTION(adaptive_mutex(adaptive_mutex const&)) - BOOST_DELETED_FUNCTION(adaptive_mutex& operator= (adaptive_mutex const&)) - private: static BOOST_NOINLINE BOOST_LOG_NORETURN void throw_system_error(int err, const char* descr, const char* func, const char* file, int line) {