diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 55ae6b91..053f5544 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -70,6 +70,14 @@ typedef intrusive::list_member_hook< > > ready_hook; +struct remote_ready_tag; +typedef intrusive::list_member_hook< + intrusive::tag< remote_ready_tag >, + intrusive::link_mode< + intrusive::safe_link + > +> remote_ready_hook; + struct sleep_tag; typedef intrusive::set_member_hook< intrusive::tag< sleep_tag >, @@ -120,6 +128,7 @@ private: public: detail::ready_hook ready_hook_; + detail::remote_ready_hook remote_ready_hook_; detail::sleep_hook sleep_hook_; detail::terminated_hook terminated_hook_; detail::wait_hook wait_hook_; @@ -227,6 +236,7 @@ public: BOOST_ASSERT_MSG( false, "fiber already terminated"); }), ready_hook_(), + remote_ready_hook_(), terminated_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), @@ -272,12 +282,14 @@ public: return 0 != ( flags_ & flag_terminated); } - bool wait_is_linked(); - bool ready_is_linked(); + bool remote_ready_is_linked(); + bool sleep_is_linked(); + bool wait_is_linked(); + void sleep_unlink(); void wait_unlink(); diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index c8b40a3c..2d5da7d3 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -38,6 +38,11 @@ private: intrusive::member_hook< context, detail::ready_hook, & context::ready_hook_ >, intrusive::constant_time_size< false > > ready_queue_t; + typedef intrusive::list< + context, + intrusive::member_hook< + context, detail::remote_ready_hook, & context::remote_ready_hook_ >, + intrusive::constant_time_size< false > > remote_ready_queue_t; typedef intrusive::set< context, intrusive::member_hook< @@ -55,7 +60,7 @@ private: context * main_ctx_; intrusive_ptr< context > dispatcher_ctx_; ready_queue_t ready_queue_; - ready_queue_t remote_ready_queue_; + remote_ready_queue_t remote_ready_queue_; sleep_queue_t sleep_queue_; terminated_queue_t terminated_queue_; bool shutdown_; @@ -68,6 +73,8 @@ private: void release_terminated_(); + void move_from_remote_(); + void woken_up_() noexcept; public: diff --git a/src/context.cpp b/src/context.cpp index 1f0b2e14..872fe46b 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -57,6 +57,7 @@ context::context( main_context_t) : scheduler_( nullptr), ctx_( boost::context::execution_context::current() ), ready_hook_(), + remote_ready_hook_(), terminated_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), @@ -78,6 +79,7 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall BOOST_ASSERT_MSG( false, "disatcher fiber already terminated"); }), ready_hook_(), + remote_ready_hook_(), terminated_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), @@ -86,9 +88,11 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall } context::~context() { - BOOST_ASSERT( ! wait_is_linked() ); BOOST_ASSERT( wait_queue_.empty() ); BOOST_ASSERT( ! ready_is_linked() ); + BOOST_ASSERT( ! remote_ready_is_linked() ); + BOOST_ASSERT( ! sleep_is_linked() ); + BOOST_ASSERT( ! wait_is_linked() ); } void @@ -187,21 +191,26 @@ context::set_ready( context * ctx) noexcept { } } -bool -context::wait_is_linked() { - return wait_hook_.is_linked(); -} - bool context::ready_is_linked() { return ready_hook_.is_linked(); } +bool +context::remote_ready_is_linked() { + return remote_ready_hook_.is_linked(); +} + bool context::sleep_is_linked() { return sleep_hook_.is_linked(); } +bool +context::wait_is_linked() { + return wait_hook_.is_linked(); +} + void context::sleep_unlink() { sleep_hook_.unlink(); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 6a53a12d..747dcf8b 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -58,32 +58,50 @@ scheduler::release_terminated_() { } void -scheduler::woken_up_() noexcept { - // move context which the deadline has reached - // to ready-queue - // sleep-queue is sorted (ascending) - std::chrono::steady_clock::time_point now = - std::chrono::steady_clock::now(); - sleep_queue_t::iterator e = sleep_queue_.end(); - for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) { - context * ctx = & ( * i); - BOOST_ASSERT( ! ctx->is_terminated() ); - BOOST_ASSERT( ! ctx->ready_is_linked() ); - BOOST_ASSERT( ctx->sleep_is_linked() ); - // ctx->wait_is_linked() might return true if - // context is waiting in time_mutex::try_lock_until() - // set fiber to state_ready if deadline was reached - if ( ctx->tp_ <= now) { - // remove context from sleep-queue - i = sleep_queue_.erase( i); - // reset sleep-tp - ctx->tp_ = (std::chrono::steady_clock::time_point::max)(); - // push new context to ready-queue - ready_queue_.push_back( * ctx); - } else { - break; // first element with ctx->tp_ > now, leave for-loop - } +scheduler::move_from_remote_() { + // protect for concurrent access + std::unique_lock< detail::spinlock > lk( remote_ready_splk_); + // get from remote ready-queue + remote_ready_queue_t::iterator e = remote_ready_queue_.end(); + for ( remote_ready_queue_t::iterator i = remote_ready_queue_.begin(); i != e;) { + context * ctx = & ( * i); + i = remote_ready_queue_.erase( i); + // context must no be contained in ready-queue + // no need to check agains active context + // because we are in scheduler::dispatch() -> dispatcher-context + if ( ! ctx->ready_is_linked() ) { + ready_queue_.push_back( * ctx); } + } +} + +void +scheduler::woken_up_() noexcept { + // move context which the deadline has reached + // to ready-queue + // sleep-queue is sorted (ascending) + std::chrono::steady_clock::time_point now = + std::chrono::steady_clock::now(); + sleep_queue_t::iterator e = sleep_queue_.end(); + for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) { + context * ctx = & ( * i); + BOOST_ASSERT( ! ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ctx->sleep_is_linked() ); + // ctx->wait_is_linked() might return true if + // context is waiting in time_mutex::try_lock_until() + // set fiber to state_ready if deadline was reached + if ( ctx->tp_ <= now) { + // remove context from sleep-queue + i = sleep_queue_.erase( i); + // reset sleep-tp + ctx->tp_ = (std::chrono::steady_clock::time_point::max)(); + // push new context to ready-queue + ready_queue_.push_back( * ctx); + } else { + break; // first element with ctx->tp_ > now, leave for-loop + } + } } scheduler::scheduler() noexcept : @@ -138,19 +156,13 @@ scheduler::dispatch() { release_terminated_(); // get sleeping context' woken_up_(); - // protect for concurrent access - std::unique_lock< detail::spinlock > lk( remote_ready_splk_); - // get from remote ready-queue - ready_queue_.splice( ready_queue_.end(), remote_ready_queue_); - lk.unlock(); + // get context' from remote ready-queue + move_from_remote_(); context * ctx = nullptr; // loop till we get next ready context while ( nullptr == ( ctx = get_next_() ) ) { - // protect for concurrent access - lk.lock(); - // get from remote ready-queue - ready_queue_.splice( ready_queue_.end(), remote_ready_queue_); - lk.unlock(); + // get context' from remote ready-queue + move_from_remote_(); // no ready context, wait till signaled // set deadline to highest value std::chrono::steady_clock::time_point tp =