diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 5bde49be..ac7e05b4 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -219,6 +219,7 @@ public: detail::terminated_hook terminated_hook_{}; detail::wait_hook wait_hook_{}; detail::worker_hook worker_hook_{}; + std::atomic< context * > remote_nxt_{ nullptr }; std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() }; typedef intrusive::list< diff --git a/include/boost/fiber/detail/config.hpp b/include/boost/fiber/detail/config.hpp index 5def61bd..047e6ef6 100644 --- a/include/boost/fiber/detail/config.hpp +++ b/include/boost/fiber/detail/config.hpp @@ -44,5 +44,4 @@ static constexpr std::size_t cache_alignment{ 64 }; static constexpr std::size_t cacheline_length{ 64 }; - #endif // BOOST_FIBERS_DETAIL_CONFIG_H diff --git a/include/boost/fiber/detail/context_mpsc_queue.hpp b/include/boost/fiber/detail/context_mpsc_queue.hpp new file mode 100644 index 00000000..44a1b6b1 --- /dev/null +++ b/include/boost/fiber/detail/context_mpsc_queue.hpp @@ -0,0 +1,96 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +// based on Dmitry Vyukov's intrusive MPSC queue +// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue + +#ifndef BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H +#define BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H + +#include +#include +#include + +#include +#include + +#include +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace detail { + +// a MPSC queue +// multiple threads push ready fibers (belonging to local scheduler) +// (thread) local scheduler pops fibers +class context_mpsc_queue { +private: + // not default constructor for context - use aligned_storage instead + alignas(cache_alignment) std::aligned_storage< sizeof( context), alignof( context) >::type storage_{}; + context * dummy_; + alignas(cache_alignment) std::atomic< context * > head_; + alignas(cache_alignment) context * tail_; + char pad_[cacheline_length]; + +public: + context_mpsc_queue() : + dummy_{ reinterpret_cast< context * >( std::addressof( storage_) ) }, + head_{ dummy_ }, + tail_{ dummy_ } { + dummy_->remote_nxt_.store( nullptr, std::memory_order_release); + } + + context_mpsc_queue( context_mpsc_queue const&) = delete; + context_mpsc_queue & operator=( context_mpsc_queue const&) = delete; + + void push( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + ctx->remote_nxt_.store( nullptr, std::memory_order_release); + context * prev = head_.exchange( ctx, std::memory_order_acq_rel); + prev->remote_nxt_.store( ctx, std::memory_order_release); + } + + context * pop() noexcept { + context * tail = tail_; + context * next = tail->remote_nxt_.load( std::memory_order_acquire); + if ( dummy_ == tail) { + if ( nullptr == next) { + return nullptr; + } + tail_ = next; + tail = next; + next = next->remote_nxt_.load( std::memory_order_acquire);; + } + if ( nullptr != next) { + tail_ = next; + return tail; + } + context * head = head_.load( std::memory_order_acquire); + if ( tail != head) { + return nullptr; + } + push( dummy_); + next = tail->remote_nxt_.load( std::memory_order_acquire); + if ( nullptr != next) { + tail_= next; + return tail; + } + return nullptr; + } +}; + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index 5d8bdc2e..26e64baa 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -51,7 +52,6 @@ public: intrusive::constant_time_size< false > > ready_queue_t; private: #if ! defined(BOOST_FIBERS_NO_ATOMICS) - typedef std::vector< context * > remote_ready_queue_t; #endif typedef intrusive::set< context, @@ -82,10 +82,9 @@ private: #if ! defined(BOOST_FIBERS_NO_ATOMICS) // remote ready-queue contains context' signaled by schedulers // running in other threads - remote_ready_queue_t remote_ready_queue_{}; - std::mutex remote_ready_mtx_{}; + detail::context_mpsc_queue remote_ready_queue_{}; + // sleep-queue contains context' which have been called #endif - // sleep-queue cotnains context' whic hahve been called // scheduler::wait_until() sleep_queue_t sleep_queue_{}; bool shutdown_{ false }; diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 43c091a9..44c07238 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -56,14 +56,12 @@ scheduler::release_terminated_() noexcept { #if ! defined(BOOST_FIBERS_NO_ATOMICS) void scheduler::remote_ready2ready_() noexcept { - // protect for concurrent access - std::unique_lock< std::mutex > lk( remote_ready_mtx_); + context * ctx = nullptr; // get context from remote ready-queue - for ( context * ctx : remote_ready_queue_) { + while ( nullptr != ( ctx = remote_ready_queue_.pop() ) ) { // store context in local queues set_ready( ctx); } - remote_ready_queue_.clear(); } #endif @@ -115,7 +113,6 @@ scheduler::~scheduler() { BOOST_ASSERT( worker_queue_.empty() ); BOOST_ASSERT( terminated_queue_.empty() ); #if ! defined(BOOST_FIBERS_NO_ATOMICS) - BOOST_ASSERT( remote_ready_queue_.empty() ); #endif BOOST_ASSERT( sleep_queue_.empty() ); // set active context to nullptr @@ -215,11 +212,8 @@ scheduler::set_remote_ready( context * ctx) noexcept { // context ctx might in wait-/ready-/sleep-queue // we do not test this in this function // scheduler::dispatcher() has to take care - // protect for concurrent access - std::unique_lock< std::mutex > lk( remote_ready_mtx_); // push new context to remote ready-queue - remote_ready_queue_.push_back( ctx); - lk.unlock(); + remote_ready_queue_.push( ctx); // notify scheduler algo_->notify(); }