diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 04965c13..41cd4ef5 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -430,6 +430,8 @@ public: return policy_; } + bool worker_is_linked() const noexcept; + bool ready_is_linked() const noexcept; bool sleep_is_linked() const noexcept; @@ -438,45 +440,50 @@ public: bool wait_is_linked() const noexcept; - bool worker_is_linked() const noexcept; + template< typename List > + void worker_link( List & lst) noexcept { + static_assert( std::is_same< typename List::value_traits::hook_type, detail::worker_hook >::value, "not a worker-queue"); + BOOST_ASSERT( ! worker_is_linked() ); + lst.push_back( * this); + } template< typename List > void ready_link( List & lst) noexcept { static_assert( std::is_same< typename List::value_traits::hook_type, detail::ready_hook >::value, "not a ready-queue"); + BOOST_ASSERT( ! ready_is_linked() ); lst.push_back( * this); } template< typename Set > void sleep_link( Set & set) noexcept { static_assert( std::is_same< typename Set::value_traits::hook_type,detail::sleep_hook >::value, "not a sleep-queue"); + BOOST_ASSERT( ! sleep_is_linked() ); set.insert( * this); } template< typename List > void terminated_link( List & lst) noexcept { static_assert( std::is_same< typename List::value_traits::hook_type, detail::terminated_hook >::value, "not a terminated-queue"); + BOOST_ASSERT( ! terminated_is_linked() ); lst.push_back( * this); } template< typename List > void wait_link( List & lst) noexcept { static_assert( std::is_same< typename List::value_traits::hook_type, detail::wait_hook >::value, "not a wait-queue"); + BOOST_ASSERT( ! wait_is_linked() ); lst.push_back( * this); } - template< typename List > - void worker_link( List & lst) noexcept { - static_assert( std::is_same< typename List::value_traits::hook_type, detail::worker_hook >::value, "not a worker-queue"); - lst.push_back( * this); - } + void worker_unlink() noexcept; void ready_unlink() noexcept; void sleep_unlink() noexcept; - void wait_unlink() noexcept; + void terminated_unlink() noexcept; - void worker_unlink() noexcept; + void wait_unlink() noexcept; void detach() noexcept; diff --git a/src/context.cpp b/src/context.cpp index 9b8a4119..e210e014 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -479,11 +479,6 @@ context::worker_is_linked() const noexcept { return worker_hook_.is_linked(); } -bool -context::terminated_is_linked() const noexcept { - return terminated_hook_.is_linked(); -} - bool context::ready_is_linked() const noexcept { return ready_hook_.is_linked(); @@ -494,6 +489,11 @@ context::sleep_is_linked() const noexcept { return sleep_hook_.is_linked(); } +bool +context::terminated_is_linked() const noexcept { + return terminated_hook_.is_linked(); +} + bool context::wait_is_linked() const noexcept { return wait_hook_.is_linked(); @@ -517,6 +517,12 @@ context::sleep_unlink() noexcept { sleep_hook_.unlink(); } +void +context::terminated_unlink() noexcept { + BOOST_ASSERT( terminated_is_linked() ); + terminated_hook_.unlink(); +} + void context::wait_unlink() noexcept { BOOST_ASSERT( wait_is_linked() ); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index a6bb2baf..96cd8421 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -28,16 +28,15 @@ scheduler::release_terminated_() noexcept { for ( terminated_queue_t::iterator i( terminated_queue_.begin() ); i != e;) { context * ctx = & ( * i); - BOOST_ASSERT( ! ctx->is_context( type::main_context) ); - BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) ); - BOOST_ASSERT( ctx->worker_is_linked() ); - BOOST_ASSERT( ctx->is_terminated() ); - BOOST_ASSERT( ! ctx->ready_is_linked() ); - BOOST_ASSERT( ! ctx->sleep_is_linked() ); - // remove context from worker-queue - ctx->worker_unlink(); // remove context from terminated-queue i = terminated_queue_.erase( i); + BOOST_ASSERT( ctx->is_context( type::worker_context) ); + BOOST_ASSERT( ! ctx->is_context( type::pinned_context) ); + BOOST_ASSERT( ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->worker_is_linked() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); // if last reference, e.g. fiber::join() or fiber::detach() // have been already called, this will call ~context(), // the context is automatically removeid from worker-queue @@ -67,13 +66,14 @@ scheduler::sleep2ready_() noexcept { sleep_queue_t::iterator e = sleep_queue_.end(); for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) { context * ctx = & ( * i); + // dipatcher context must never be pushed to sleep-queue BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) ); - //BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() ); + BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() ); BOOST_ASSERT( ! ctx->is_terminated() ); BOOST_ASSERT( ! ctx->ready_is_linked() ); - BOOST_ASSERT( ctx->sleep_is_linked() ); - // ctx->wait_is_linked() might return true if - // context is waiting in time_mutex::try_lock_until() + BOOST_ASSERT( ! ctx->terminated_is_linked() ); + // no test for wait-queue because ctx + // might be waiting in time_mutex::try_lock_until() // set fiber to state_ready if deadline was reached if ( ctx->tp_ <= now) { // remove context from sleep-queue @@ -121,11 +121,10 @@ void scheduler::dispatch() noexcept { BOOST_ASSERT( context::active() == dispatcher_ctx_); for (;;) { - bool no_worker = worker_queue_.empty(); if ( shutdown_) { // notify sched-algorithm about termination algo_->notify(); - if ( no_worker) { + if ( worker_queue_.empty() ) { break; } } @@ -168,11 +167,10 @@ boost::context::continuation scheduler::dispatch() noexcept { BOOST_ASSERT( context::active() == dispatcher_ctx_); for (;;) { - bool no_worker = worker_queue_.empty(); if ( shutdown_) { // notify sched-algorithm about termination algo_->notify(); - if ( no_worker) { + if ( worker_queue_.empty() ) { break; } } @@ -216,10 +214,9 @@ void scheduler::set_ready( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( ! ctx->is_terminated() ); - // we do not test for wait-queue because - // context::wait_is_linked() is not synchronized - // with other threads - //BOOST_ASSERT( active_ctx->wait_is_linked() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->terminated_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); // remove context ctx from sleep-queue // (might happen if blocked in timed_mutex::try_lock_until()) if ( ctx->sleep_is_linked() ) { @@ -234,14 +231,14 @@ scheduler::set_ready( context * ctx) noexcept { void scheduler::set_remote_ready( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); + // another thread might signal the main-context of this thread BOOST_ASSERT( ! ctx->is_context( type::dispatcher_context) ); BOOST_ASSERT( this == ctx->get_scheduler() ); - // another thread might signal the main-context - // from this thread - //BOOST_ASSERT( ! ctx->is_context( type::main_context) ); - // context ctx might in wait-/ready-/sleep-queue - // we do not test this in this function - // scheduler::dispatcher() has to take care + BOOST_ASSERT( ! ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->terminated_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); // push new context to remote ready-queue remote_ready_queue_.push( ctx); // notify scheduler @@ -251,85 +248,75 @@ scheduler::set_remote_ready( context * ctx) noexcept { #if (BOOST_EXECUTION_CONTEXT==1) void -scheduler::set_terminated( context * active_ctx) noexcept { - BOOST_ASSERT( nullptr != active_ctx); - BOOST_ASSERT( context::active() == active_ctx); - BOOST_ASSERT( ! active_ctx->is_context( type::main_context) ); - BOOST_ASSERT( ! active_ctx->is_context( type::dispatcher_context) ); - //BOOST_ASSERT( active_ctx->worker_is_linked() ); - BOOST_ASSERT( active_ctx->is_terminated() ); - BOOST_ASSERT( ! active_ctx->ready_is_linked() ); - BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); - BOOST_ASSERT( ! active_ctx->wait_is_linked() ); +scheduler::set_terminated( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( context::active() == ctx); + BOOST_ASSERT( ctx->is_context( type::worker_context) ); + BOOST_ASSERT( ! ctx->is_context( type::pinned_context) ); + BOOST_ASSERT( ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->terminated_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); // store the terminated fiber in the terminated-queue - // the dispatcher-context will call + // the dispatcher-context will call // intrusive_ptr_release( ctx); - active_ctx->terminated_link( terminated_queue_); + ctx->terminated_link( terminated_queue_); + // remove from the worker-queue + ctx->worker_unlink(); // resume another fiber algo_->pick_next()->resume(); } #else boost::context::continuation -scheduler::set_terminated( context * active_ctx) noexcept { - BOOST_ASSERT( nullptr != active_ctx); - BOOST_ASSERT( context::active() == active_ctx); - BOOST_ASSERT( ! active_ctx->is_context( type::main_context) ); - BOOST_ASSERT( ! active_ctx->is_context( type::dispatcher_context) ); - //BOOST_ASSERT( active_ctx->worker_is_linked() ); - BOOST_ASSERT( active_ctx->is_terminated() ); - BOOST_ASSERT( ! active_ctx->ready_is_linked() ); - BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); - BOOST_ASSERT( ! active_ctx->wait_is_linked() ); +scheduler::set_terminated( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( context::active() == ctx); + BOOST_ASSERT( ctx->is_context( type::worker_context) ); + BOOST_ASSERT( ! ctx->is_context( type::pinned_context) ); + BOOST_ASSERT( ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->terminated_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); // store the terminated fiber in the terminated-queue - // the dispatcher-context will call + // the dispatcher-context will call // intrusive_ptr_release( ctx); - active_ctx->terminated_link( terminated_queue_); + ctx->terminated_link( terminated_queue_); + // remove from the worker-queue + ctx->worker_unlink(); // resume another fiber return algo_->pick_next()->suspend_with_cc(); } #endif void -scheduler::yield( context * active_ctx) noexcept { - BOOST_ASSERT( nullptr != active_ctx); - //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); - BOOST_ASSERT( ! active_ctx->is_terminated() ); - BOOST_ASSERT( ! active_ctx->ready_is_linked() ); - BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); - // we do not test for wait-queue because - // context::wait_is_linked() is not sychronized - // with other threads - // defer passing active context to set_ready() - // in work-sharing context (multiple threads read - // from one ready-queue) the context must be - // already suspended until another thread resumes it - // (== maked as ready) +scheduler::yield( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( context::active() == ctx); + BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) ); + BOOST_ASSERT( ! ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->terminated_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); // resume another fiber - algo_->pick_next()->resume( active_ctx); + algo_->pick_next()->resume( ctx); } bool -scheduler::wait_until( context * active_ctx, +scheduler::wait_until( context * ctx, std::chrono::steady_clock::time_point const& sleep_tp) noexcept { - BOOST_ASSERT( nullptr != active_ctx); - //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); - BOOST_ASSERT( ! active_ctx->is_terminated() ); - // if the active-fiber running in this thread calls - // condition_variable:wait() and code in another thread calls - // condition_variable::notify_one(), it might happen that the - // other thread pushes the fiber to remote ready-queue first - // the dispatcher-context migh have been moved the fiber from - // the remote ready-queue to the local ready-queue - // so we do not check - //BOOST_ASSERT( active_ctx->ready_is_linked() ); - BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); - // active_ctx->wait_is_linked() might return true - // if context was locked inside timed_mutex::try_lock_until() - // context::wait_is_linked() is not sychronized - // with other threads - // push active context to sleep-queue - active_ctx->tp_ = sleep_tp; - active_ctx->sleep_link( sleep_queue_); + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( context::active() == ctx); + BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) ); + BOOST_ASSERT( ! ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->terminated_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->tp_ = sleep_tp; + ctx->sleep_link( sleep_queue_); // resume another context algo_->pick_next()->resume(); // context has been resumed @@ -338,28 +325,21 @@ scheduler::wait_until( context * active_ctx, } bool -scheduler::wait_until( context * active_ctx, +scheduler::wait_until( context * ctx, std::chrono::steady_clock::time_point const& sleep_tp, detail::spinlock_lock & lk) noexcept { - BOOST_ASSERT( nullptr != active_ctx); - //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); - BOOST_ASSERT( ! active_ctx->is_terminated() ); - // if the active-fiber running in this thread calls - // condition_variable:wait() and code in another thread calls - // condition_variable::notify_one(), it might happen that the - // other thread pushes the fiber to remote ready-queue first - // the dispatcher-context migh have been moved the fiber from - // the remote ready-queue to the local ready-queue - // so we do not check - //BOOST_ASSERT( active_ctx->ready_is_linked() ); - BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); - // active_ctx->wait_is_linked() might return true + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( context::active() == ctx); + BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) ); + BOOST_ASSERT( ! ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->terminated_is_linked() ); + // ctx->wait_is_linked() might return true // if context was locked inside timed_mutex::try_lock_until() - // context::wait_is_linked() is not sychronized - // with other threads // push active context to sleep-queue - active_ctx->tp_ = sleep_tp; - active_ctx->sleep_link( sleep_queue_); + ctx->tp_ = sleep_tp; + ctx->sleep_link( sleep_queue_); // resume another context algo_->pick_next()->resume( lk); // context has been resumed @@ -394,12 +374,12 @@ scheduler::set_algo( std::unique_ptr< algo::algorithm > algo) noexcept { } void -scheduler::attach_main_context( context * main_ctx) noexcept { - BOOST_ASSERT( nullptr != main_ctx); +scheduler::attach_main_context( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); // main-context represents the execution context created // by the system, e.g. main()- or thread-context // should not be in worker-queue - main_ctx_ = main_ctx; + main_ctx_ = ctx; #if ! defined(BOOST_FIBERS_NO_ATOMICS) main_ctx_->scheduler_.store( this, std::memory_order_relaxed); #else @@ -408,15 +388,15 @@ scheduler::attach_main_context( context * main_ctx) noexcept { } void -scheduler::attach_dispatcher_context( intrusive_ptr< context > dispatcher_ctx) noexcept { - BOOST_ASSERT( dispatcher_ctx); +scheduler::attach_dispatcher_context( intrusive_ptr< context > ctx) noexcept { + BOOST_ASSERT( ctx); // dispatcher context has to handle // - remote ready context' // - sleeping context' // - extern event-loops // - suspending the thread if ready-queue is empty (waiting on external event) // should not be in worker-queue - dispatcher_ctx_.swap( dispatcher_ctx); + dispatcher_ctx_.swap( ctx); // add dispatcher-context to ready-queue // so it is the first element in the ready-queue // if the main context tries to suspend the first time