diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index 96a17c33..ae522418 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -81,6 +81,7 @@ private: sleep_queue_t sleep_queue_; bool shutdown_; detail::spinlock remote_ready_splk_; + detail::spinlock worker_splk_; void resume_( context *, context *, std::function< void() > *); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 81f35cd1..5dd1e1a6 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -26,12 +26,7 @@ void scheduler::resume_( context * active_ctx, context * ctx, std::function< void() > * func) { BOOST_ASSERT( nullptr != active_ctx); BOOST_ASSERT( nullptr != ctx); - BOOST_ASSERT( main_ctx_ == active_ctx || - dispatcher_ctx_.get() == active_ctx || - active_ctx->worker_is_linked() ); - BOOST_ASSERT( main_ctx_ == ctx || - dispatcher_ctx_.get() == ctx || - ctx->worker_is_linked() ); + //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); BOOST_ASSERT( this == active_ctx->get_scheduler() ); BOOST_ASSERT( this == ctx->get_scheduler() ); BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() ); @@ -41,9 +36,6 @@ scheduler::resume_( context * active_ctx, context * ctx, std::function< void() > // resume active-fiber == ctx func = ctx->resume( func); BOOST_ASSERT( context::active() == active_ctx); - BOOST_ASSERT( main_ctx_ == active_ctx || - dispatcher_ctx_.get() == active_ctx || - active_ctx->worker_is_linked() ); if ( nullptr != func) { ( * func)(); } @@ -67,12 +59,14 @@ scheduler::release_terminated_() { context * ctx = & ( * i); BOOST_ASSERT( ! ctx->is_main_context() ); BOOST_ASSERT( ! ctx->is_dispatcher_context() ); - BOOST_ASSERT( ctx->worker_is_linked() ); + //BOOST_ASSERT( ctx->worker_is_linked() ); BOOST_ASSERT( ctx->is_terminated() ); BOOST_ASSERT( ! ctx->ready_is_linked() ); BOOST_ASSERT( ! ctx->sleep_is_linked() ); // remove context from worker-queue + std::unique_lock< detail::spinlock > lk( worker_splk_); ctx->worker_unlink(); + lk.unlock(); // remove context from terminated-queue i = terminated_queue_.erase( i); // if last reference, e.g. fiber::join() or fiber::detach() @@ -106,8 +100,7 @@ scheduler::sleep2ready_() noexcept { for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) { context * ctx = & ( * i); BOOST_ASSERT( ! ctx->is_dispatcher_context() ); - BOOST_ASSERT( main_ctx_ == ctx || - ctx->worker_is_linked() ); + //BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() ); BOOST_ASSERT( ! ctx->is_terminated() ); BOOST_ASSERT( ! ctx->ready_is_linked() ); BOOST_ASSERT( ctx->sleep_is_linked() ); @@ -136,7 +129,8 @@ scheduler::scheduler() noexcept : remote_ready_queue_(), sleep_queue_(), shutdown_( false), - remote_ready_splk_() { + remote_ready_splk_(), + worker_splk_() { } scheduler::~scheduler() noexcept { @@ -148,7 +142,7 @@ scheduler::~scheduler() noexcept { // resume pending fibers resume_( main_ctx_, get_next_(), nullptr); // no context' in worker-queue - BOOST_ASSERT( worker_queue_.empty() ); + //BOOST_ASSERT( worker_queue_.empty() ); BOOST_ASSERT( terminated_queue_.empty() ); BOOST_ASSERT( ! sched_algo_->has_ready_fibers() ); BOOST_ASSERT( remote_ready_queue_.empty() ); @@ -201,8 +195,8 @@ scheduler::dispatch() { BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); } // loop till all context' have been terminated + std::unique_lock< detail::spinlock > lk( worker_splk_); while ( ! worker_queue_.empty() ) { - release_terminated_(); // force unwinding of all context' in worker-queue worker_queue_t::iterator e = worker_queue_.end(); for ( worker_queue_t::iterator i = worker_queue_.begin(); i != e;) { @@ -225,6 +219,7 @@ scheduler::dispatch() { BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); } } + lk.unlock(); // release termianted context' release_terminated_(); // return to main-context @@ -281,7 +276,7 @@ scheduler::set_terminated( context * active_ctx) noexcept { BOOST_ASSERT( context::active() == active_ctx); BOOST_ASSERT( ! active_ctx->is_main_context() ); BOOST_ASSERT( ! active_ctx->is_dispatcher_context() ); - BOOST_ASSERT( active_ctx->worker_is_linked() ); + //BOOST_ASSERT( active_ctx->worker_is_linked() ); BOOST_ASSERT( active_ctx->is_terminated() ); BOOST_ASSERT( ! active_ctx->ready_is_linked() ); BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); @@ -297,9 +292,7 @@ scheduler::set_terminated( context * active_ctx) noexcept { void scheduler::yield( context * active_ctx) noexcept { BOOST_ASSERT( nullptr != active_ctx); - BOOST_ASSERT( main_ctx_ == active_ctx || - dispatcher_ctx_.get() == active_ctx || - active_ctx->worker_is_linked() ); + //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); BOOST_ASSERT( ! active_ctx->is_terminated() ); BOOST_ASSERT( ! active_ctx->ready_is_linked() ); BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); @@ -323,9 +316,7 @@ scheduler::wait_until( context * active_ctx, std::chrono::steady_clock::time_point const& sleep_tp, std::function< void() > * func) noexcept { BOOST_ASSERT( nullptr != active_ctx); - BOOST_ASSERT( main_ctx_ == active_ctx || - dispatcher_ctx_.get() == active_ctx || - active_ctx->worker_is_linked() ); + //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); BOOST_ASSERT( ! active_ctx->is_terminated() ); // if the active-fiber running in this thread calls // condition:wait() and code in another thread calls @@ -354,9 +345,7 @@ void scheduler::suspend( context * active_ctx, std::function< void() > * func) noexcept { BOOST_ASSERT( nullptr != active_ctx); - BOOST_ASSERT( main_ctx_ == active_ctx || - dispatcher_ctx_.get() == active_ctx || - active_ctx->worker_is_linked() ); + //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); // resume another context resume_( active_ctx, get_next_(), func); } @@ -412,7 +401,8 @@ scheduler::attach_worker_context( context * ctx) noexcept { BOOST_ASSERT( ! ctx->sleep_is_linked() ); BOOST_ASSERT( ! ctx->terminated_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); - BOOST_ASSERT( ! ctx->worker_is_linked() ); + //BOOST_ASSERT( ! ctx->worker_is_linked() ); + std::unique_lock< detail::spinlock > lk( worker_splk_); ctx->worker_link( worker_queue_); ctx->scheduler_ = this; } @@ -426,6 +416,7 @@ scheduler::detach_worker_context( context * ctx) noexcept { BOOST_ASSERT( ! ctx->terminated_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); + std::unique_lock< detail::spinlock > lk( worker_splk_); ctx->worker_unlink(); }