2
0
mirror of https://github.com/boostorg/log.git synced 2026-01-19 04:22:09 +00:00

Reimplemented adaptive_mutex using Boost.Atomic.

adaptive_mutex now uses Boost.Atomic on all platforms that support
lock-free atomics native waiting/notifying operations for unsigned int
and pthreads on the other platforms. Windows always uses Boost.Atomic.

The new implementation blocks the thread after a number of attempts
to acquire the lock with spinning backoff. This should reduce CPU load
if for some reason the thread that owns the lock is holding it for a
long time (which typically should not happen, since adaptive_mutex is
currently only used in threadsafe_queue, which doesn't hold the lock
for too long, unless the holder gets preempted).

The change may be significant on POSIX systems, which previously always
used pthreads, and performance characteristics of pthread_mutex_t could
have been different from our Boost.Atomic-based implementation.
This commit is contained in:
Andrey Semashev
2025-06-12 00:22:37 +03:00
parent 48a33d9eb0
commit fdcd3a233c

View File

@@ -1,5 +1,5 @@
/*
* Copyright Andrey Semashev 2007 - 2015.
* Copyright Andrey Semashev 2007 - 2025.
* Distributed under the Boost Software License, Version 1.0.
* (See accompanying file LICENSE_1_0.txt or copy at
* http://www.boost.org/LICENSE_1_0.txt)
@@ -24,36 +24,19 @@
#ifndef BOOST_LOG_NO_THREADS
#if defined(BOOST_THREAD_POSIX) // This one can be defined by users, so it should go first
#define BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD
#elif defined(BOOST_WINDOWS)
#define BOOST_LOG_ADAPTIVE_MUTEX_USE_WINAPI
#elif defined(BOOST_HAS_PTHREADS)
#define BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD
#include <boost/atomic/capabilities.hpp>
#if defined(BOOST_WINDOWS) || (BOOST_ATOMIC_INT_LOCK_FREE == 2) && (BOOST_ATOMIC_HAS_NATIVE_INT_WAIT_NOTIFY == 2)
#define BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_ATOMIC
#else
#define BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_PTHREAD
#endif
#if defined(BOOST_LOG_ADAPTIVE_MUTEX_USE_WINAPI)
#include <boost/log/detail/pause.hpp>
#include <boost/winapi/thread.hpp>
#include <boost/detail/interlocked.hpp>
#if defined(__INTEL_COMPILER) || defined(_MSC_VER)
# if defined(__INTEL_COMPILER)
# define BOOST_LOG_COMPILER_BARRIER __memory_barrier()
# elif defined(__clang__) // clang-win also defines _MSC_VER
# define BOOST_LOG_COMPILER_BARRIER __atomic_signal_fence(__ATOMIC_SEQ_CST)
# else
extern "C" void _ReadWriteBarrier(void);
# if defined(BOOST_MSVC)
# pragma intrinsic(_ReadWriteBarrier)
# endif
# define BOOST_LOG_COMPILER_BARRIER _ReadWriteBarrier()
# endif
#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))
# define BOOST_LOG_COMPILER_BARRIER __asm__ __volatile__("" : : : "memory")
#endif
#if defined(BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_ATOMIC)
#include <boost/memory_order.hpp>
#include <boost/atomic/atomic.hpp>
#include <boost/atomic/thread_pause.hpp>
#include <boost/log/detail/header.hpp>
namespace boost {
@@ -62,73 +45,97 @@ BOOST_LOG_OPEN_NAMESPACE
namespace aux {
//! A mutex that performs spinning or thread yielding in case of contention
//! A mutex that performs spinning or thread blocking in case of contention
class adaptive_mutex
{
private:
enum state
enum state : unsigned int
{
initial_pause = 2,
max_pause = 16
locked_bit = 1u,
waiters_one = 1u << 1u
};
long m_State;
enum pause_constants : unsigned int
{
initial_pause = 1u,
max_pause = 16u
};
//! Mutex state. Lowest bit indicates whether the mutex is locked or not, all higher bits store the number of blocked waiters. See \c state enum.
boost::atomics::atomic< unsigned int > m_state;
public:
adaptive_mutex() : m_State(0) {}
adaptive_mutex() noexcept : m_state(0u) {}
bool try_lock()
// Non-copyable
adaptive_mutex(adaptive_mutex const&) = delete;
adaptive_mutex& operator= (adaptive_mutex const&) = delete;
bool try_lock() noexcept
{
return (BOOST_INTERLOCKED_COMPARE_EXCHANGE(&m_State, 1L, 0L) == 0L);
unsigned int old_state = m_state.load(boost::memory_order::relaxed);
return (old_state & locked_bit) == 0u &&
m_state.compare_exchange_strong(old_state, old_state | locked_bit, boost::memory_order::acquire, boost::memory_order::relaxed);
}
void lock()
void lock() noexcept
{
#if defined(BOOST_LOG_AUX_PAUSE)
unsigned int pause_count = initial_pause;
#endif
while (!try_lock())
unsigned int waiter_added = 0u;
unsigned int old_state = m_state.load(boost::memory_order::relaxed);
while (true)
{
#if defined(BOOST_LOG_AUX_PAUSE)
if ((old_state & locked_bit) == 0u)
{
unsigned int new_state = (old_state - waiter_added) | locked_bit;
if (m_state.compare_exchange_weak(old_state, new_state, boost::memory_order::acquire, boost::memory_order::relaxed))
break;
continue;
}
if (pause_count < max_pause)
{
for (unsigned int i = 0; i < pause_count; ++i)
if (waiter_added != 0u)
{
BOOST_LOG_AUX_PAUSE;
old_state = m_state.sub(waiters_one, boost::memory_order::relaxed);
waiter_added = 0u;
}
pause_count += pause_count;
else
{
for (unsigned int i = 0u; i < pause_count; ++i)
{
boost::atomics::thread_pause();
}
pause_count += pause_count;
old_state = m_state.load(boost::memory_order::relaxed);
}
}
else if (waiter_added == 0u)
{
old_state = m_state.add(waiters_one, boost::memory_order::relaxed);
waiter_added = waiters_one;
}
else
{
// Restart spinning after waking up this thread
pause_count = initial_pause;
boost::winapi::SwitchToThread();
old_state = m_state.wait(old_state, boost::memory_order::relaxed);
}
#else
boost::winapi::SwitchToThread();
#endif
}
}
void unlock()
void unlock() noexcept
{
#if (defined(_M_IX86) || defined(_M_AMD64)) && defined(BOOST_LOG_COMPILER_BARRIER)
BOOST_LOG_COMPILER_BARRIER;
m_State = 0L;
BOOST_LOG_COMPILER_BARRIER;
#else
BOOST_INTERLOCKED_EXCHANGE(&m_State, 0L);
#endif
if (m_state.and_and_test(~static_cast< unsigned int >(locked_bit), boost::memory_order::release))
{
// If the resulting state is non-zero then there are blocked waiters
m_state.notify_one();
}
}
// Non-copyable
BOOST_DELETED_FUNCTION(adaptive_mutex(adaptive_mutex const&))
BOOST_DELETED_FUNCTION(adaptive_mutex& operator= (adaptive_mutex const&))
};
#undef BOOST_LOG_AUX_PAUSE
#undef BOOST_LOG_COMPILER_BARRIER
} // namespace aux
BOOST_LOG_CLOSE_NAMESPACE // namespace log
@@ -137,7 +144,7 @@ BOOST_LOG_CLOSE_NAMESPACE // namespace log
#include <boost/log/detail/footer.hpp>
#elif defined(BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD)
#elif defined(BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_PTHREAD)
#include <pthread.h>
#include <cerrno>
@@ -147,7 +154,7 @@ BOOST_LOG_CLOSE_NAMESPACE // namespace log
#include <boost/log/detail/header.hpp>
#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
#define BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD_MUTEX_ADAPTIVE_NP
#define BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_PTHREAD_MUTEX_ADAPTIVE_NP
#endif
namespace boost {
@@ -156,37 +163,41 @@ BOOST_LOG_OPEN_NAMESPACE
namespace aux {
//! A mutex that performs spinning or thread yielding in case of contention
//! A mutex that performs spinning or thread blocking in case of contention
class adaptive_mutex
{
private:
pthread_mutex_t m_State;
pthread_mutex_t m_state;
public:
adaptive_mutex()
{
#if defined(BOOST_LOG_ADAPTIVE_MUTEX_USE_PTHREAD_MUTEX_ADAPTIVE_NP)
#if defined(BOOST_LOG_AUX_ADAPTIVE_MUTEX_USE_PTHREAD_MUTEX_ADAPTIVE_NP)
pthread_mutexattr_t attrs;
pthread_mutexattr_init(&attrs);
pthread_mutexattr_settype(&attrs, PTHREAD_MUTEX_ADAPTIVE_NP);
const int err = pthread_mutex_init(&m_State, &attrs);
const int err = pthread_mutex_init(&m_state, &attrs);
pthread_mutexattr_destroy(&attrs);
#else
const int err = pthread_mutex_init(&m_State, NULL);
const int err = pthread_mutex_init(&m_state, nullptr);
#endif
if (BOOST_UNLIKELY(err != 0))
throw_system_error(err, "Failed to initialize an adaptive mutex", "adaptive_mutex::adaptive_mutex()", __FILE__, __LINE__);
}
// Non-copyable
adaptive_mutex(adaptive_mutex const&) = delete;
adaptive_mutex& operator= (adaptive_mutex const&) = delete;
~adaptive_mutex()
{
BOOST_VERIFY(pthread_mutex_destroy(&m_State) == 0);
BOOST_VERIFY(pthread_mutex_destroy(&m_state) == 0);
}
bool try_lock()
{
const int err = pthread_mutex_trylock(&m_State);
const int err = pthread_mutex_trylock(&m_state);
if (err == 0)
return true;
if (BOOST_UNLIKELY(err != EBUSY))
@@ -196,20 +207,16 @@ public:
void lock()
{
const int err = pthread_mutex_lock(&m_State);
const int err = pthread_mutex_lock(&m_state);
if (BOOST_UNLIKELY(err != 0))
throw_system_error(err, "Failed to lock an adaptive mutex", "adaptive_mutex::lock()", __FILE__, __LINE__);
}
void unlock()
void unlock() noexcept
{
BOOST_VERIFY(pthread_mutex_unlock(&m_State) == 0);
BOOST_VERIFY(pthread_mutex_unlock(&m_state) == 0);
}
// Non-copyable
BOOST_DELETED_FUNCTION(adaptive_mutex(adaptive_mutex const&))
BOOST_DELETED_FUNCTION(adaptive_mutex& operator= (adaptive_mutex const&))
private:
static BOOST_NOINLINE BOOST_LOG_NORETURN void throw_system_error(int err, const char* descr, const char* func, const char* file, int line)
{