diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index b25cddfd..b1504941 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -255,6 +255,8 @@ public: bool wait_until( std::chrono::steady_clock::time_point const&) noexcept; + void set_ready( context *) noexcept; + bool is_main_context() const noexcept { return 0 != ( flags_ & flag_main_context); } diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index 0b50c42b..c8b40a3c 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -54,10 +55,12 @@ private: context * main_ctx_; intrusive_ptr< context > dispatcher_ctx_; ready_queue_t ready_queue_; + ready_queue_t remote_ready_queue_; sleep_queue_t sleep_queue_; terminated_queue_t terminated_queue_; bool shutdown_; detail::autoreset_event ready_queue_ev_; + detail::spinlock remote_ready_splk_; void resume_( context *, context *); @@ -83,6 +86,8 @@ public: void set_ready( context *) noexcept; + void set_remote_ready( context *) noexcept; + void set_terminated( context *) noexcept; void yield( context *) noexcept; diff --git a/src/context.cpp b/src/context.cpp index b538132f..29bf954c 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -171,6 +171,22 @@ context::wait_until( std::chrono::steady_clock::time_point const& tp) noexcept { return scheduler_->wait_until( this, tp); } +void +context::set_ready( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( this != ctx); + // FIXME: comparing scheduler address' must be synchronized? + // what if ctx is migrated between threads + // (other scheduler assigned) + if ( scheduler_ == ctx->scheduler_) { + // local + scheduler_->set_ready( ctx); + } else { + // remote + scheduler_->set_remote_ready( ctx); + } +} + bool context::wait_is_linked() { return wait_hook_.is_linked(); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index f09a0c99..95b47f1f 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -7,6 +7,7 @@ #include "boost/fiber/scheduler.hpp" #include +#include #include @@ -88,9 +89,11 @@ scheduler::scheduler() noexcept : main_ctx_( nullptr), dispatcher_ctx_(), ready_queue_(), + remote_ready_queue_(), terminated_queue_(), shutdown_( false), - ready_queue_ev_() { + ready_queue_ev_(), + remote_ready_splk_() { } scheduler::~scheduler() noexcept { @@ -135,11 +138,19 @@ scheduler::dispatch() { release_terminated_(); // get sleeping context' woken_up_(); + // protect for concurrent access + std::unique_lock< detail::spinlock > lk( remote_ready_splk_); + // get from remote ready-queue + ready_queue_.splice( ready_queue_.end(), remote_ready_queue_); + lk.unlock(); context * ctx = nullptr; // loop till we get next ready context while ( nullptr == ( ctx = get_next_() ) ) { - // TODO: move context' from remote ready-queue to local ready-queue - // + // protect for concurrent access + lk.lock(); + // get from remote ready-queue + ready_queue_.splice( ready_queue_.end(), remote_ready_queue_); + lk.unlock(); // no ready context, wait till signaled // set deadline to highest value std::chrono::steady_clock::time_point tp = @@ -181,6 +192,21 @@ scheduler::set_ready( context * ctx) noexcept { ready_queue_.push_back( * ctx); } +void +scheduler::set_remote_ready( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( ! ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); + // set the scheduler for new context + ctx->set_scheduler( this); + // protect for concurrent access + std::unique_lock< detail::spinlock > lk( remote_ready_splk_); + // push new context to remote ready-queue + remote_ready_queue_.push_back( * ctx); +} + void scheduler::set_terminated( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx);