2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-20 14:42:21 +00:00

use MPSC queue for remote-ready-queue

This commit is contained in:
Oliver Kowalke
2016-10-17 19:21:57 +02:00
parent b53e167a68
commit 38f1bc5945
5 changed files with 103 additions and 14 deletions

View File

@@ -219,6 +219,7 @@ public:
detail::terminated_hook terminated_hook_{};
detail::wait_hook wait_hook_{};
detail::worker_hook worker_hook_{};
std::atomic< context * > remote_nxt_{ nullptr };
std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() };
typedef intrusive::list<

View File

@@ -44,5 +44,4 @@
static constexpr std::size_t cache_alignment{ 64 };
static constexpr std::size_t cacheline_length{ 64 };
#endif // BOOST_FIBERS_DETAIL_CONFIG_H

View File

@@ -0,0 +1,96 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
//
// based on Dmitry Vyukov's intrusive MPSC queue
// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
#ifndef BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H
#define BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H
#include <atomic>
#include <memory>
#include <type_traits>
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace detail {
// a MPSC queue
// multiple threads push ready fibers (belonging to local scheduler)
// (thread) local scheduler pops fibers
class context_mpsc_queue {
private:
// not default constructor for context - use aligned_storage instead
alignas(cache_alignment) std::aligned_storage< sizeof( context), alignof( context) >::type storage_{};
context * dummy_;
alignas(cache_alignment) std::atomic< context * > head_;
alignas(cache_alignment) context * tail_;
char pad_[cacheline_length];
public:
context_mpsc_queue() :
dummy_{ reinterpret_cast< context * >( std::addressof( storage_) ) },
head_{ dummy_ },
tail_{ dummy_ } {
dummy_->remote_nxt_.store( nullptr, std::memory_order_release);
}
context_mpsc_queue( context_mpsc_queue const&) = delete;
context_mpsc_queue & operator=( context_mpsc_queue const&) = delete;
void push( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
ctx->remote_nxt_.store( nullptr, std::memory_order_release);
context * prev = head_.exchange( ctx, std::memory_order_acq_rel);
prev->remote_nxt_.store( ctx, std::memory_order_release);
}
context * pop() noexcept {
context * tail = tail_;
context * next = tail->remote_nxt_.load( std::memory_order_acquire);
if ( dummy_ == tail) {
if ( nullptr == next) {
return nullptr;
}
tail_ = next;
tail = next;
next = next->remote_nxt_.load( std::memory_order_acquire);;
}
if ( nullptr != next) {
tail_ = next;
return tail;
}
context * head = head_.load( std::memory_order_acquire);
if ( tail != head) {
return nullptr;
}
push( dummy_);
next = tail->remote_nxt_.load( std::memory_order_acquire);
if ( nullptr != next) {
tail_= next;
return tail;
}
return nullptr;
}
};
}}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H

View File

@@ -21,6 +21,7 @@
#include <boost/fiber/algo/algorithm.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/context_mpsc_queue.hpp>
#include <boost/fiber/detail/data.hpp>
#include <boost/fiber/detail/spinlock.hpp>
@@ -51,7 +52,6 @@ public:
intrusive::constant_time_size< false > > ready_queue_t;
private:
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
typedef std::vector< context * > remote_ready_queue_t;
#endif
typedef intrusive::set<
context,
@@ -82,10 +82,9 @@ private:
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
// remote ready-queue contains context' signaled by schedulers
// running in other threads
remote_ready_queue_t remote_ready_queue_{};
std::mutex remote_ready_mtx_{};
detail::context_mpsc_queue remote_ready_queue_{};
// sleep-queue contains context' which have been called
#endif
// sleep-queue cotnains context' whic hahve been called
// scheduler::wait_until()
sleep_queue_t sleep_queue_{};
bool shutdown_{ false };

View File

@@ -56,14 +56,12 @@ scheduler::release_terminated_() noexcept {
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
void
scheduler::remote_ready2ready_() noexcept {
// protect for concurrent access
std::unique_lock< std::mutex > lk( remote_ready_mtx_);
context * ctx = nullptr;
// get context from remote ready-queue
for ( context * ctx : remote_ready_queue_) {
while ( nullptr != ( ctx = remote_ready_queue_.pop() ) ) {
// store context in local queues
set_ready( ctx);
}
remote_ready_queue_.clear();
}
#endif
@@ -115,7 +113,6 @@ scheduler::~scheduler() {
BOOST_ASSERT( worker_queue_.empty() );
BOOST_ASSERT( terminated_queue_.empty() );
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
BOOST_ASSERT( remote_ready_queue_.empty() );
#endif
BOOST_ASSERT( sleep_queue_.empty() );
// set active context to nullptr
@@ -215,11 +212,8 @@ scheduler::set_remote_ready( context * ctx) noexcept {
// context ctx might in wait-/ready-/sleep-queue
// we do not test this in this function
// scheduler::dispatcher() has to take care
// protect for concurrent access
std::unique_lock< std::mutex > lk( remote_ready_mtx_);
// push new context to remote ready-queue
remote_ready_queue_.push_back( ctx);
lk.unlock();
remote_ready_queue_.push( ctx);
// notify scheduler
algo_->notify();
}