diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 9e7b3134..2a4757e7 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -85,6 +85,14 @@ typedef intrusive::list_member_hook< > > remote_ready_hook; +struct yield_tag; +typedef intrusive::list_member_hook< + intrusive::tag< yield_tag >, + intrusive::link_mode< + intrusive::auto_unlink + > +> yield_hook; + struct sleep_tag; typedef intrusive::set_member_hook< intrusive::tag< sleep_tag >, @@ -175,6 +183,7 @@ public: detail::terminated_hook terminated_hook_; detail::ready_hook ready_hook_; detail::remote_ready_hook remote_ready_hook_; + detail::yield_hook yield_hook_; detail::sleep_hook sleep_hook_; detail::wait_hook wait_hook_; std::chrono::steady_clock::time_point tp_; @@ -302,6 +311,7 @@ public: terminated_hook_(), ready_hook_(), remote_ready_hook_(), + yield_hook_(), sleep_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), @@ -391,6 +401,8 @@ public: bool remote_ready_is_linked(); + bool yield_is_linked(); + bool sleep_is_linked(); bool wait_is_linked(); @@ -419,6 +431,12 @@ public: lst.push_back( * this); } + template< typename List > + void yield_link( List & lst) { + std::unique_lock< detail::spinlock > lk( hook_splk_); + lst.push_back( * this); + } + template< typename Set > void sleep_link( Set & set) { std::unique_lock< detail::spinlock > lk( hook_splk_); @@ -437,6 +455,8 @@ public: void remote_ready_unlink(); + void yield_unlink(); + void sleep_unlink(); void wait_unlink(); diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index 16454ed3..0d2bfd17 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -44,6 +44,11 @@ public: intrusive::member_hook< context, detail::remote_ready_hook, & context::remote_ready_hook_ >, intrusive::constant_time_size< false > > remote_ready_queue_t; + typedef intrusive::list< + context, + intrusive::member_hook< + context, detail::yield_hook, & context::yield_hook_ >, + intrusive::constant_time_size< false > > yield_queue_t; typedef intrusive::set< context, intrusive::member_hook< @@ -80,9 +85,11 @@ private: remote_ready_queue_t remote_ready_queue_; // sleep-queue cotnains context' whic hahve been called // scheduler::wait_until() + yield_queue_t yield_queue_; sleep_queue_t sleep_queue_; bool shutdown_; detail::spinlock remote_ready_splk_; + std::mutex mtx_; void resume_( context *, context *); @@ -92,6 +99,8 @@ private: void remote_ready2ready_(); + void yield2ready_(); + void sleep2ready_() noexcept; public: diff --git a/src/context.cpp b/src/context.cpp index 532bfb05..459f1e4f 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -170,6 +170,7 @@ context::context( main_context_t) : terminated_hook_(), ready_hook_(), remote_ready_hook_(), + yield_hook_(), sleep_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), @@ -197,6 +198,7 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall terminated_hook_(), ready_hook_(), remote_ready_hook_(), + yield_hook_(), sleep_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), @@ -418,6 +420,12 @@ context::remote_ready_is_linked() { return remote_ready_hook_.is_linked(); } +bool +context::yield_is_linked() { + std::unique_lock< detail::spinlock > lk( hook_splk_); + return yield_hook_.is_linked(); +} + bool context::sleep_is_linked() { std::unique_lock< detail::spinlock > lk( hook_splk_); @@ -448,6 +456,12 @@ context::remote_ready_unlink() { remote_ready_hook_.unlink(); } +void +context::yield_unlink() { + std::unique_lock< detail::spinlock > lk( hook_splk_); + yield_hook_.unlink(); +} + void context::sleep_unlink() { std::unique_lock< detail::spinlock > lk( hook_splk_); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 8c1e6d6e..694535ee 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -48,6 +48,9 @@ scheduler::resume_( context * active_ctx, context * ctx) { BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); + // move yielded context' to ready-queue + yield2ready_(); + // check if unwinding was requested if ( active_ctx->unwinding_requested() ) { throw forced_unwind(); } @@ -55,7 +58,14 @@ scheduler::resume_( context * active_ctx, context * ctx) { context * scheduler::get_next_() noexcept { - return sched_algo_->pick_next(); + context * ctx = sched_algo_->pick_next(); + if ( nullptr != ctx && + ! ctx->worker_is_linked() && + ! ctx->is_main_context() && + ! ctx->is_dispatcher_context() ) { + ctx->worker_link( worker_queue_); + } + return ctx; } void @@ -94,6 +104,18 @@ scheduler::remote_ready2ready_() { } } +void +scheduler::yield2ready_() { + std::unique_lock< std::mutex > lk( mtx_); + // get context from yield-queue + while ( ! yield_queue_.empty() ) { + context * ctx = & yield_queue_.front(); + yield_queue_.pop_front(); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + set_ready( ctx); + } +} + void scheduler::sleep2ready_() noexcept { // move context which the deadline has reached @@ -133,6 +155,7 @@ scheduler::scheduler() noexcept : worker_queue_(), terminated_queue_(), remote_ready_queue_(), + yield_queue_(), sleep_queue_(), shutdown_( false), remote_ready_splk_() { @@ -151,6 +174,7 @@ scheduler::~scheduler() noexcept { BOOST_ASSERT( terminated_queue_.empty() ); BOOST_ASSERT( ! sched_algo_->has_ready_fibers() ); BOOST_ASSERT( remote_ready_queue_.empty() ); + BOOST_ASSERT( yield_queue_.empty() ); BOOST_ASSERT( sleep_queue_.empty() ); // set active context to nullptr context::reset_active(); @@ -193,6 +217,8 @@ void scheduler::dispatch() { BOOST_ASSERT( context::active() == dispatcher_ctx_); while ( ! shutdown_) { + // move yielded context' to ready-queue + yield2ready_(); // release termianted context' release_terminated_(); // get sleeping context' @@ -247,8 +273,10 @@ void scheduler::set_ready( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( ! ctx->is_terminated() ); + // dispatcher-context will never be passed to set_ready() + BOOST_ASSERT( ! ctx->is_dispatcher_context() ); // we do not test for wait-queue because - // context::wait_is_linked() is not sychronized + // context::wait_is_linked() is not synchronized // with other threads //BOOST_ASSERT( active_ctx->wait_is_linked() ); // handle newly created context @@ -260,7 +288,7 @@ scheduler::set_ready( context * ctx) noexcept { ctx->worker_link( worker_queue_); } } else { - // sanity checks, main-contxt might by signaled + // sanity checks, main-context might by signaled // from another thread BOOST_ASSERT( main_ctx_ == ctx); BOOST_ASSERT( this == ctx->get_scheduler() ); @@ -272,13 +300,12 @@ scheduler::set_ready( context * ctx) noexcept { // unlink it from sleep-queue ctx->sleep_unlink(); } - // if context is already in ready-queue, do return - // this might happend if a newly created fiber was + // for safety unlink it from ready-queue + // this might happen if a newly created fiber was // signaled to interrupt - if ( ! ctx->ready_is_linked() ) { - // push new context to ready-queue - sched_algo_->awakened( ctx); - } + ctx->ready_unlink(); + // push new context to ready-queue + sched_algo_->awakened( ctx); } void @@ -329,8 +356,13 @@ scheduler::yield( context * active_ctx) noexcept { // we do not test for wait-queue because // context::wait_is_linked() is not sychronized // with other threads - // push active context to ready-queue - sched_algo_->awakened( active_ctx); + // push active context to yield-queue + // in work-sharing context (multiple threads read + // from one ready-queue) the context must be + // already suspended until another thread resumes it + std::unique_lock< std::mutex > lk( mtx_); + active_ctx->yield_link( yield_queue_); + lk.unlock(); // resume another fiber resume_( active_ctx, get_next_() ); }