diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 7cb5b553..3eb3d2e4 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -98,6 +98,14 @@ typedef intrusive::list_member_hook< > > terminated_hook; +struct managed_tag; +typedef intrusive::list_member_hook< + intrusive::tag< managed_tag >, + intrusive::link_mode< + intrusive::auto_unlink + > +> worker_hook; + } struct main_context_t {}; @@ -117,7 +125,8 @@ private: flag_worker_context = 1 << 3, flag_terminated = 1 << 4, flag_interruption_blocked = 1 << 5, - flag_interruption_requested = 1 << 6 + flag_interruption_requested = 1 << 6, + flag_forced_unwind = 1 << 7 }; struct BOOST_FIBERS_DECL fss_data { @@ -156,10 +165,11 @@ private: boost::context::execution_context ctx_; public: + detail::worker_hook worker_hook_; + detail::terminated_hook terminated_hook_; detail::ready_hook ready_hook_; detail::remote_ready_hook remote_ready_hook_; detail::sleep_hook sleep_hook_; - detail::terminated_hook terminated_hook_; detail::wait_hook wait_hook_; std::chrono::steady_clock::time_point tp_; @@ -259,6 +269,7 @@ public: // invoke fiber function boost::context::detail::invoke_helper( std::move( fn), std::move( tpl) ); } catch ( fiber_interrupted const&) { + } catch ( forced_unwind const&) { } catch ( ... ) { std::terminate(); } @@ -270,9 +281,11 @@ public: suspend(); BOOST_ASSERT_MSG( false, "fiber already terminated"); }), + worker_hook_(), + terminated_hook_(), ready_hook_(), remote_ready_hook_(), - terminated_hook_(), + sleep_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), @@ -328,6 +341,14 @@ public: return 0 != ( flags_ & flag_interruption_requested); } + void request_interruption( bool req) noexcept; + + bool unwinding_requested() const noexcept { + return 0 != ( flags_ & flag_forced_unwind); + } + + void request_unwinding() noexcept; + void * get_fss_data( void const * vp) const; void set_fss_data( @@ -336,7 +357,7 @@ public: void * data, bool cleanup_existing); - void request_interruption( bool req) noexcept; + bool managed_is_linked(); bool ready_is_linked(); @@ -346,6 +367,8 @@ public: bool wait_is_linked(); + void managed_unlink(); + void sleep_unlink(); void wait_unlink(); diff --git a/include/boost/fiber/exceptions.hpp b/include/boost/fiber/exceptions.hpp index b02d68ae..5af77655 100644 --- a/include/boost/fiber/exceptions.hpp +++ b/include/boost/fiber/exceptions.hpp @@ -24,6 +24,8 @@ namespace boost { namespace fibers { +struct forced_unwind {}; + class fiber_exception : public std::system_error { public: fiber_exception() : diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index dbcb92e5..2402bcaf 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -56,13 +56,30 @@ private: detail::terminated_hook, & context::terminated_hook_ >, intrusive::constant_time_size< false > > terminated_queue_t; + typedef intrusive::list< + context, + intrusive::member_hook< + context, + detail::worker_hook, + & context::worker_hook_ >, + intrusive::constant_time_size< false > > worker_queue_t; context * main_ctx_; intrusive_ptr< context > dispatcher_ctx_; - ready_queue_t ready_queue_; - remote_ready_queue_t remote_ready_queue_; - sleep_queue_t sleep_queue_; + // worker-queue contains all context' mananged by this scheduler + // except main-context and dispatcher-context + // unlink happens on destruction of a context + worker_queue_t worker_queue_; + // terminated-queue contains context' which have been terminated terminated_queue_t terminated_queue_; + // ready-queue contains context' ready to be resumed + ready_queue_t ready_queue_; + // remote ready-queue contains context' signaled by schedulers + // running in other threads + remote_ready_queue_t remote_ready_queue_; + // sleep-queue cotnains context' whic hahve been called + // scheduler::wait_until() + sleep_queue_t sleep_queue_; bool shutdown_; detail::autoreset_event ready_queue_ev_; detail::spinlock remote_ready_splk_; diff --git a/src/context.cpp b/src/context.cpp index 62bb42d7..859dba58 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -58,9 +58,11 @@ context::context( main_context_t) : flags_( flag_main_context), scheduler_( nullptr), ctx_( boost::context::execution_context::current() ), + worker_hook_(), + terminated_hook_(), ready_hook_(), remote_ready_hook_(), - terminated_hook_(), + sleep_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), @@ -81,9 +83,11 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall // dispatcher context should never return from scheduler::dispatch() BOOST_ASSERT_MSG( false, "disatcher fiber already terminated"); }), + worker_hook_(), + terminated_hook_(), ready_hook_(), remote_ready_hook_(), - terminated_hook_(), + sleep_hook_(), wait_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), @@ -196,6 +200,8 @@ void context::set_ready( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( this != ctx); + BOOST_ASSERT( nullptr != scheduler_); + BOOST_ASSERT( nullptr != ctx->scheduler_); // FIXME: comparing scheduler address' must be synchronized? // what if ctx is migrated between threads // (other scheduler assigned) @@ -227,6 +233,13 @@ context::request_interruption( bool req) noexcept { } } +void +context::request_unwinding() noexcept { + BOOST_ASSERT( ! is_main_context() ); + BOOST_ASSERT( ! is_dispatcher_context() ); + flags_ |= flag_forced_unwind; +} + void * context::get_fss_data( void const * vp) const { uintptr_t key( reinterpret_cast< uintptr_t >( vp) ); @@ -263,6 +276,11 @@ context::set_fss_data( void const * vp, } } +bool +context::managed_is_linked() { + return worker_hook_.is_linked(); +} + bool context::ready_is_linked() { return ready_hook_.is_linked(); @@ -283,6 +301,11 @@ context::wait_is_linked() { return wait_hook_.is_linked(); } +void +context::managed_unlink() { + worker_hook_.unlink(); +} + void context::sleep_unlink() { sleep_hook_.unlink(); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 5d6db0e3..66c8f735 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -12,6 +12,7 @@ #include #include "boost/fiber/context.hpp" +#include "boost/fiber/exceptions.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -24,6 +25,12 @@ void scheduler::resume_( context * active_ctx, context * ctx) { BOOST_ASSERT( nullptr != active_ctx); BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( main_ctx_ == active_ctx || + dispatcher_ctx_.get() == active_ctx || + active_ctx->managed_is_linked() ); + BOOST_ASSERT( main_ctx_ == ctx || + dispatcher_ctx_.get() == ctx || + ctx->managed_is_linked() ); BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() ); // fiber next-to-run is same as current active-fiber // this might happen in context of this_fiber::yield() @@ -35,6 +42,12 @@ scheduler::resume_( context * active_ctx, context * ctx) { // resume active-fiber == ctx ctx->resume(); BOOST_ASSERT( context::active() == active_ctx); + BOOST_ASSERT( main_ctx_ == active_ctx || + dispatcher_ctx_.get() == active_ctx || + active_ctx->managed_is_linked() ); + if ( ctx->unwinding_requested() ) { + throw forced_unwind(); + } } context * @@ -53,7 +66,16 @@ scheduler::release_terminated_() { for ( terminated_queue_t::iterator i( terminated_queue_.begin() ); i != e;) { context * ctx = & ( * i); + BOOST_ASSERT( ! ctx->is_main_context() ); + BOOST_ASSERT( ! ctx->is_dispatcher_context() ); + BOOST_ASSERT( ctx->managed_is_linked() ); + BOOST_ASSERT( ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); i = terminated_queue_.erase( i); + // if last reference, e.g. fiber::join() or fiber::detach() + // have been already called, this will call ~context(), + // the context is automatically removeid from worker-queue intrusive_ptr_release( ctx); } } @@ -65,16 +87,12 @@ scheduler::remote_ready2ready_() { std::unique_lock< detail::spinlock > lk( remote_ready_splk_); remote_ready_queue_.swap( tmp); lk.unlock(); - // get from remote ready-queue + // get context from remote ready-queue while ( ! tmp.empty() ) { context * ctx = & tmp.front(); tmp.pop_front(); - // context must no be contained in ready-queue - // no need to check agains active context - // because we are in scheduler::dispatch() -> dispatcher-context - if ( ! ctx->ready_is_linked() ) { - ready_queue_.push_back( * ctx); - } + // store context in local queues + set_ready( ctx); } } @@ -88,6 +106,9 @@ scheduler::sleep2ready_() noexcept { sleep_queue_t::iterator e = sleep_queue_.end(); for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) { context * ctx = & ( * i); + BOOST_ASSERT( ! ctx->is_dispatcher_context() ); + BOOST_ASSERT( main_ctx_ == ctx || + ctx->managed_is_linked() ); BOOST_ASSERT( ! ctx->is_terminated() ); BOOST_ASSERT( ! ctx->ready_is_linked() ); BOOST_ASSERT( ctx->sleep_is_linked() ); @@ -102,7 +123,7 @@ scheduler::sleep2ready_() noexcept { // push new context to ready-queue ready_queue_.push_back( * ctx); } else { - break; // first element with ctx->tp_ > now, leave for-loop + break; // first context with now < deadline } } } @@ -110,9 +131,11 @@ scheduler::sleep2ready_() noexcept { scheduler::scheduler() noexcept : main_ctx_( nullptr), dispatcher_ctx_(), + worker_queue_(), + terminated_queue_(), ready_queue_(), remote_ready_queue_(), - terminated_queue_(), + sleep_queue_(), shutdown_( false), ready_queue_ev_(), remote_ready_splk_() { @@ -126,6 +149,12 @@ scheduler::~scheduler() noexcept { shutdown_ = true; // resume pending fibers resume_( main_ctx_, get_next_() ); + // no context' in worker-queue + BOOST_ASSERT( worker_queue_.empty() ); + BOOST_ASSERT( terminated_queue_.empty() ); + BOOST_ASSERT( ready_queue_.empty() ); + BOOST_ASSERT( remote_ready_queue_.empty() ); + BOOST_ASSERT( sleep_queue_.empty() ); // deallocate dispatcher-context dispatcher_ctx_.reset(); // set main-context to nullptr @@ -135,6 +164,9 @@ scheduler::~scheduler() noexcept { void scheduler::set_main_context( context * main_ctx) noexcept { BOOST_ASSERT( nullptr != main_ctx); + // main-context represents the execution context created + // by the system, e.g. main()- or thread-context + // should not be in worker-queue main_ctx_ = main_ctx; main_ctx_->set_scheduler( this); } @@ -142,6 +174,12 @@ scheduler::set_main_context( context * main_ctx) noexcept { void scheduler::set_dispatcher_context( intrusive_ptr< context > dispatcher_ctx) noexcept { BOOST_ASSERT( dispatcher_ctx); + // dispatcher context has to handle + // - remote ready context' + // - sleeping context' + // - extern event-loops + // - suspending the thread if ready-queue is empty (waiting on external event) + // should not be in worker-queue dispatcher_ctx_.swap( dispatcher_ctx); // add dispatcher-context to ready-queue // so it is the first element in the ready-queue @@ -192,8 +230,17 @@ scheduler::dispatch() { resume_( dispatcher_ctx_.get(), ctx); BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); } - // interrupt all context' in ready- and sleep-queue - release_terminated_(); + // loop till all context' have been terminated + while ( ! worker_queue_.empty() ) { + release_terminated_(); + // force unwinding of all context' in worker-queue + worker_queue_t::iterator e = worker_queue_.end(); + for ( worker_queue_t::iterator i = worker_queue_.begin(); i != e; ++i) { + context * ctx = & ( * i); + ctx->request_unwinding(); + set_ready( ctx); + } + } resume_( dispatcher_ctx_.get(), main_ctx_); } @@ -201,34 +248,51 @@ void scheduler::set_ready( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( ! ctx->is_terminated() ); - // if context is already in ready-queue, return - // this might happend if a newly create fiber is - // signaled to interrupt - if ( ctx->ready_is_linked() ) { - return; - } // we do not test for wait-queue because // context::wait_is_linked() is not sychronized // with other threads + //BOOST_ASSERT( active_ctx->wait_is_linked() ); + // handle newly created context + if ( ! ctx->is_main_context() ) { + if ( ! ctx->managed_is_linked() ) { + // attach context to `this`-scheduler + ctx->set_scheduler( this); + // push to the worker-queue + worker_queue_.push_back( * ctx); + } + } else { + // sanity checks, main-contxt might by signaled + // from another thread + BOOST_ASSERT( main_ctx_ == ctx); + BOOST_ASSERT( this == ctx->get_scheduler() ); + } // remove context ctx from sleep-queue // (might happen if blocked in timed_mutex::try_lock_until()) + // FIXME: mabye better done in scheduler::dispatch() if ( ctx->sleep_is_linked() ) { // unlink it from sleep-queue ctx->sleep_unlink(); } - // set the scheduler for new context - ctx->set_scheduler( this); - // push new context to ready-queue - ready_queue_.push_back( * ctx); + // if context is already in ready-queue, do return + // this might happend if a newly created fiber was + // signaled to interrupt + if ( ! ctx->ready_is_linked() ) { + // push new context to ready-queue + ready_queue_.push_back( * ctx); + } } void scheduler::set_remote_ready( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( ! ctx->is_dispatcher_context() ); + BOOST_ASSERT( this == ctx->get_scheduler() ); + // another thread might signal the main-context + // from this thread + //BOOST_ASSERT( ! ctx->is_main_context() ); // context ctx might in wait-/ready-/sleep-queue // we do not test this in this function // scheduler::dispatcher() has to take care - ctx->set_scheduler( this); // protect for concurrent access std::unique_lock< detail::spinlock > lk( remote_ready_splk_); // push new context to remote ready-queue @@ -238,6 +302,9 @@ scheduler::set_remote_ready( context * ctx) noexcept { void scheduler::set_terminated( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( ! ctx->is_main_context() ); + BOOST_ASSERT( ! ctx->is_dispatcher_context() ); + BOOST_ASSERT( ctx->managed_is_linked() ); BOOST_ASSERT( ctx->is_terminated() ); BOOST_ASSERT( ! ctx->ready_is_linked() ); BOOST_ASSERT( ! ctx->sleep_is_linked() ); @@ -251,6 +318,9 @@ scheduler::set_terminated( context * ctx) noexcept { void scheduler::yield( context * active_ctx) noexcept { BOOST_ASSERT( nullptr != active_ctx); + BOOST_ASSERT( main_ctx_ == active_ctx || + dispatcher_ctx_.get() == active_ctx || + active_ctx->managed_is_linked() ); BOOST_ASSERT( ! active_ctx->is_terminated() ); BOOST_ASSERT( ! active_ctx->ready_is_linked() ); BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); @@ -267,6 +337,9 @@ bool scheduler::wait_until( context * active_ctx, std::chrono::steady_clock::time_point const& sleep_tp) noexcept { BOOST_ASSERT( nullptr != active_ctx); + BOOST_ASSERT( main_ctx_ == active_ctx || + dispatcher_ctx_.get() == active_ctx || + active_ctx->managed_is_linked() ); BOOST_ASSERT( ! active_ctx->is_terminated() ); // if the active-fiber running in this thread calls // condition:wait() and code in another thread calls @@ -294,6 +367,9 @@ scheduler::wait_until( context * active_ctx, void scheduler::re_schedule( context * active_ctx) noexcept { BOOST_ASSERT( nullptr != active_ctx); + BOOST_ASSERT( main_ctx_ == active_ctx || + dispatcher_ctx_.get() == active_ctx || + active_ctx->managed_is_linked() ); // resume another context resume_( active_ctx, get_next_() ); }