diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 82b43ce1..2dc50d4f 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -18,6 +19,7 @@ #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -40,7 +42,7 @@ typedef intrusive::list_member_hook< > > wait_hook; // declaration of the functor that converts between -// the context class and the hook +// the context class and the wait-hook struct wait_functor { // required types typedef wait_hook hook_type; @@ -65,44 +67,60 @@ typedef intrusive::list_member_hook< > > ready_hook; +struct terminated_tag; +typedef intrusive::list_member_hook< + intrusive::tag< terminated_tag >, + intrusive::link_mode< + intrusive::auto_unlink + > +> terminated_hook; + } +struct main_context_t {}; +constexpr main_context_t main_context = main_context_t(); + +struct dispatcher_context_t {}; +constexpr dispatcher_context_t dispatcher_context = dispatcher_context_t(); + +struct worker_context_t {}; +constexpr worker_context_t worker_context = worker_context_t(); + class BOOST_FIBERS_DECL context { public: + detail::ready_hook ready_hook_; + detail::terminated_hook terminated_hook_; detail::wait_hook wait_hook_; - typedef intrusive::list< context, - intrusive::function_hook< detail::wait_functor >, - intrusive::constant_time_size< false > > wait_queue_t; + typedef intrusive::list< + context, + intrusive::function_hook< detail::wait_functor >, + intrusive::constant_time_size< false > > wait_queue_t; private: - enum class status { - ready = 0, - running, - waiting, - terminated - }; - enum flag_t { flag_main_context = 1 << 1, - flag_dispatcher_context = 1 << 2 + flag_dispatcher_context = 1 << 2, + flag_worker_context = 1 << 3 }; static thread_local context * active_; #if ! defined(BOOST_FIBERS_NO_ATOMICS) std::atomic< std::size_t > use_count_; - std::atomic< status > state_; std::atomic< int > flags_; #else std::size_t use_count_; - status state_; int flags_; #endif scheduler * scheduler_; boost::context::execution_context ctx_; wait_queue_t wait_queue_; + void set_terminated_() noexcept; + + void suspend_() noexcept; + protected: virtual void deallocate() { } @@ -128,19 +146,19 @@ public: bool operator!=( id const& other) const noexcept { return impl_ != other.impl_; } - + bool operator<( id const& other) const noexcept { return impl_ < other.impl_; } - + bool operator>( id const& other) const noexcept { return other.impl_ < impl_; } - + bool operator<=( id const& other) const noexcept { return ! ( * this > other); } - + bool operator>=( id const& other) const noexcept { return ! ( * this < other); } @@ -164,50 +182,43 @@ public: } }; - detail::ready_hook ready_hook_; - static context * active() noexcept; static void active( context * active) noexcept; - // main fiber - context() : - use_count_( 1), // allocated on main- or thread-stack - state_( status::running), - flags_( flag_main_context), - scheduler_( nullptr), - ctx_( boost::context::execution_context::current() ), - wait_queue_(), - ready_hook_() { - } + // main fiber context + context( main_context_t); - // worker fiber + // dispatcher fiber context + context( dispatcher_context_t, boost::context::preallocated const&, + fixedsize_stack const&, scheduler *); + + // worker fiber context template< typename StackAlloc, typename Fn, typename ... Args > - context( boost::context::preallocated palloc, StackAlloc salloc, + context( worker_context_t, + boost::context::preallocated palloc, StackAlloc salloc, Fn && fn, Args && ... args) : - use_count_( 1), // allocated on fiber-stack - state_( status::ready), - flags_( 0), + ready_hook_(), + terminated_hook_(), + wait_hook_(), + use_count_( 1), // fiber instance or scheduler owner + flags_( flag_worker_context), scheduler_( nullptr), ctx_( palloc, salloc, - // lambda, executed in execution context // mutable: generated operator() is not const -> enables std::move( fn) // std::make_tuple: stores decayed copies of its args, implicitly unwraps std::reference_wrapper [=,fn=std::forward< Fn >( fn),tpl=std::make_tuple( std::forward< Args >( args) ...)] () mutable -> void { - //FIXME: invoke function - + // invoke fiber function + boost::context::detail::invoke_helper( std::move( fn), std::move( tpl) ); // mark fiber as terminated - set_terminated(); - + set_terminated_(); // notify waiting (joining) fibers release(); - - // FIXME: switch to another fiber - + // switch to another fiber + suspend_(); BOOST_ASSERT_MSG( false, "fiber already terminated"); }), - wait_queue_(), - ready_hook_() { + wait_queue_() { } virtual ~context(); @@ -218,96 +229,46 @@ public: id get_id() const noexcept; - bool is_main_context() const noexcept { - return 0 != ( flags_ & flag_main_context); - } - - bool is_terminated() const noexcept { - return status::terminated == state_; - } - - bool is_ready() const noexcept { - return status::ready == state_; - } - - bool is_running() const noexcept { - return status::running == state_; - } - - bool is_waiting() const noexcept { - return status::waiting == state_; - } - - void set_terminated() noexcept { -#if ! defined(BOOST_FIBERS_NO_ATOMICS) - status previous = state_.exchange( status::terminated); -#else - status previous = state_; - state_ = status::terminated; -#endif - BOOST_ASSERT( status::running == previous); - (void)previous; - } - - void set_ready() noexcept { -#if ! defined(BOOST_FIBERS_NO_ATOMICS) - status previous = state_.exchange( status::ready); -#else - status previous = state_; - state_ = status::ready; -#endif - BOOST_ASSERT( status::waiting == previous || status::running == previous || status::ready == previous); - (void)previous; - } - - void set_running() noexcept { -#if ! defined(BOOST_FIBERS_NO_ATOMICS) - status previous = state_.exchange( status::running); -#else - status previous = state_; - state_ = status::running; -#endif - BOOST_ASSERT( status::ready == previous); - (void)previous; - } - - void set_waiting() noexcept { -#if ! defined(BOOST_FIBERS_NO_ATOMICS) - status previous = state_.exchange( status::waiting); -#else - status previous = state_; - state_ = status::waiting; -#endif - BOOST_ASSERT( status::waiting == previous || status::running == previous); - (void)previous; - } - void resume(); void release() noexcept; - bool wait_is_linked(); + bool is_main_context() const noexcept { + return 0 != ( flags_ & flag_main_context); + } - void wait_unlink(); + bool is_dispatcher_context() const noexcept { + return 0 != ( flags_ & flag_dispatcher_context); + } + + bool is_worker_context() const noexcept { + return 0 != ( flags_ & flag_worker_context); + } + + bool wait_is_linked(); bool ready_is_linked(); - friend void intrusive_ptr_add_ref( context * f) { - BOOST_ASSERT( nullptr != f); - ++f->use_count_; + bool terminated_is_linked(); + + void wait_unlink(); + + friend void intrusive_ptr_add_ref( context * ctx) { + BOOST_ASSERT( nullptr != ctx); + ++ctx->use_count_; } - friend void intrusive_ptr_release( context * f) { - BOOST_ASSERT( nullptr != f); - if ( 0 == --f->use_count_) { - BOOST_ASSERT( f->is_terminated() ); - f->~context(); + friend void intrusive_ptr_release( context * ctx) { + BOOST_ASSERT( nullptr != ctx); + if ( 0 == --ctx->use_count_) { + ctx->~context(); } } }; -template< typename StackAlloc, typename Fn, typename ... Args > -static intrusive_ptr< context > make_context( StackAlloc salloc, Fn && fn, Args && ... args) { +inline +static intrusive_ptr< context > make_dispatcher_context( scheduler * sched) { + fixedsize_stack salloc; // use default satck-size boost::context::stack_context sctx( salloc.allocate() ); #if defined(BOOST_NO_CXX14_CONSTEXPR) || defined(BOOST_NO_CXX11_STD_ALIGN) // reserve space for control structure @@ -328,6 +289,35 @@ static intrusive_ptr< context > make_context( StackAlloc salloc, Fn && fn, Args // placement new of context on top of fiber's stack return intrusive_ptr< context >( new ( sp) context( + dispatcher_context, + boost::context::preallocated( sp, size, sctx), + salloc, + sched) ); +} + +template< typename StackAlloc, typename Fn, typename ... Args > +static intrusive_ptr< context > make_worker_context( StackAlloc salloc, Fn && fn, Args && ... args) { + boost::context::stack_context sctx( salloc.allocate() ); +#if defined(BOOST_NO_CXX14_CONSTEXPR) || defined(BOOST_NO_CXX11_STD_ALIGN) + // reserve space for control structure + std::size_t size = sctx.size - sizeof( context); + void * sp = static_cast< char * >( sctx.sp) - sizeof( context); +#else + constexpr std::size_t func_alignment = 64; // alignof( context); + constexpr std::size_t func_size = sizeof( context); + // reserve space on stack + void * sp = static_cast< char * >( sctx.sp) - func_size - func_alignment; + // align sp pointer + std::size_t space = func_size + func_alignment; + sp = std::align( func_alignment, func_size, sp, space); + BOOST_ASSERT( nullptr != sp); + // calculate remaining size + std::size_t size = sctx.size - ( static_cast< char * >( sctx.sp) - static_cast< char * >( sp) ); +#endif + // placement new of context on top of fiber's stack + return intrusive_ptr< context >( + new ( sp) context( + worker_context, boost::context::preallocated( sp, size, sctx), salloc, std::forward< Fn >( fn), diff --git a/include/boost/fiber/fiber.hpp b/include/boost/fiber/fiber.hpp index 12d79dcd..87b1ecc8 100644 --- a/include/boost/fiber/fiber.hpp +++ b/include/boost/fiber/fiber.hpp @@ -52,7 +52,7 @@ public: template< typename StackAllocator, typename Fn, typename ... Args > explicit fiber( std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) : - impl_( make_context( salloc, std::forward< Fn >( fn), std::forward< Args >( args) ... ) ) { + impl_( make_worker_context( salloc, std::forward< Fn >( fn), std::forward< Args >( args) ... ) ) { start_(); } @@ -81,11 +81,11 @@ public: } explicit operator bool() const noexcept { - return impl_ && ! impl_->is_terminated(); + return nullptr != impl_.get(); } bool operator!() const noexcept { - return ! impl_ || impl_->is_terminated(); + return nullptr == impl_.get(); } void swap( fiber & other) noexcept { diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index 5c2e96e5..3faaf2ae 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -20,8 +20,6 @@ namespace boost { namespace fibers { -class context; - class BOOST_FIBERS_DECL scheduler { private: typedef intrusive::list< @@ -29,13 +27,24 @@ private: intrusive::member_hook< context, detail::ready_hook, & context::ready_hook_ >, intrusive::constant_time_size< false > > ready_queue_t; + typedef intrusive::list< + context, + intrusive::member_hook< + context, detail::terminated_hook, & context::terminated_hook_ >, + intrusive::constant_time_size< false > > terminated_queue_t; context * main_ctx_; intrusive_ptr< context > dispatching_ctx_; ready_queue_t ready_queue_; + terminated_queue_t terminated_queue_; + bool shutdown_; void resume_( context *, context *); + context * get_next_() noexcept; + + void release_terminated_(); + public: scheduler() noexcept; @@ -44,11 +53,15 @@ public: virtual ~scheduler() noexcept; - void main_context( context *) noexcept; + void set_main_context( context *) noexcept; - void dispatching_context( intrusive_ptr< context >) noexcept; + void set_dispatching_context( intrusive_ptr< context >) noexcept; - void dispatch() noexcept; + void dispatch(); + + void set_terminated( context *) noexcept; + + void re_schedule( context *) noexcept; }; }} diff --git a/src/context.cpp b/src/context.cpp index 5c5ad989..f25e9e62 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -6,7 +6,6 @@ #include "boost/fiber/context.hpp" -#include "boost/fiber/fixedsize_stack.hpp" #include "boost/fiber/scheduler.hpp" #ifdef BOOST_HAS_ABI_HEADERS @@ -16,25 +15,23 @@ namespace boost { namespace fibers { -static context * main_context() { +static context * make_main_context() { // main fiber context for this thread - static thread_local context main_ctx; + static thread_local context main_ctx( main_context); // scheduler for this thread static thread_local scheduler sched; // attach scheduler to main fiber context main_ctx.set_scheduler( & sched); // attach main fiber context to scheduler - sched.main_context( & main_ctx); - // attach dispatching fiber context to scheduler - sched.dispatching_context( - make_context( - fixedsize_stack(), - & scheduler::dispatch, & sched) ); + sched.set_main_context( & main_ctx); + // create and attach dispatching fiber context to scheduler + sched.set_dispatching_context( + make_dispatcher_context( & sched) ); return & main_ctx; } thread_local context * -context::active_ = main_context(); +context::active_ = make_main_context(); context * context::active() noexcept { @@ -47,6 +44,47 @@ context::active( context * active) noexcept { active_ = active; } +void +context::set_terminated_() noexcept { + scheduler_->set_terminated( this); +} + +void +context::suspend_() noexcept { + scheduler_->re_schedule( this); +} + +// main fiber context +context::context( main_context_t) : + ready_hook_(), + terminated_hook_(), + wait_hook_(), + use_count_( 1), // allocated on main- or thread-stack + flags_( flag_main_context), + scheduler_( nullptr), + ctx_( boost::context::execution_context::current() ), + wait_queue_() { +} + +// dispatcher fiber context +context::context( dispatcher_context_t, boost::context::preallocated const& palloc, + fixedsize_stack const& salloc, scheduler * sched) : + ready_hook_(), + terminated_hook_(), + wait_hook_(), + use_count_( 0), // scheduler will own dispatcher context + flags_( flag_dispatcher_context), + scheduler_( nullptr), + ctx_( palloc, salloc, + [=] () -> void { + // execute scheduler::dispatch() + sched->dispatch(); + // dispatcher context should never return from scheduler::dispatch() + BOOST_ASSERT_MSG( false, "disatcher fiber already terminated"); + }), + wait_queue_() { +} + context::~context() { BOOST_ASSERT( ! wait_is_linked() ); BOOST_ASSERT( wait_queue_.empty() ); @@ -71,13 +109,12 @@ context::get_id() const noexcept { void context::resume() { - BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm ctx_(); } void context::release() noexcept { - BOOST_ASSERT( is_terminated() ); + BOOST_ASSERT( terminated_is_linked() ); // notify all waiting fibers wait_queue_t::iterator e = wait_queue_.end(); @@ -93,16 +130,21 @@ context::wait_is_linked() { return wait_hook_.is_linked(); } -void -context::wait_unlink() { - wait_hook_.unlink(); -} - bool context::ready_is_linked() { return ready_hook_.is_linked(); } +bool +context::terminated_is_linked() { + return terminated_hook_.is_linked(); +} + +void +context::wait_unlink() { + wait_hook_.unlink(); +} + }} #ifdef BOOST_HAS_ABI_HEADERS diff --git a/src/fiber.cpp b/src/fiber.cpp index d914c3a4..5c0fafc5 100644 --- a/src/fiber.cpp +++ b/src/fiber.cpp @@ -19,7 +19,6 @@ namespace fibers { void fiber::start_() { - impl_->set_ready(); //FIXME: spawn new fiber-context } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 57a3cda1..8518c10f 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -18,15 +18,12 @@ namespace boost { namespace fibers { void -scheduler::resume_( context * af, context * f) { - BOOST_ASSERT( nullptr != af); - BOOST_ASSERT( nullptr != f); - BOOST_ASSERT( f->is_ready() ); - // set fiber to state running - f->set_running(); +scheduler::resume_( context * actx, context * ctx) { + BOOST_ASSERT( nullptr != actx); + BOOST_ASSERT( nullptr != ctx); // fiber next-to-run is same as current active-fiber // this might happen in context of this_fiber::yield() - if ( f == af) { + if ( actx == ctx) { return; } // pass new fiber the scheduler of the current active fiber @@ -35,40 +32,63 @@ scheduler::resume_( context * af, context * f) { // FIXME: mabye better don in the sched-algorithm (knows // if fiber was migrated) // -> performance issue? - f->set_scheduler( af->get_scheduler() ); + ctx->set_scheduler( actx->get_scheduler() ); // assign new fiber to active-fiber - context::active( f); - // resume active-fiber == f - f->resume(); + context::active( ctx); + // resume active-fiber == ctx + ctx->resume(); +} + +context * +scheduler::get_next_() noexcept { + BOOST_ASSERT( ! ready_queue_.empty() ); + context * ctx( & ready_queue_.front() ); + ready_queue_.pop_front(); + return ctx; +} + +void +scheduler::release_terminated_() { + terminated_queue_t::iterator e( terminated_queue_.end() ); + for ( terminated_queue_t::iterator i( terminated_queue_.begin() ); + i != e;) { + context * ctx( & ( * i) ); + i = terminated_queue_.erase( i); + intrusive_ptr_release( ctx); + } } scheduler::scheduler() noexcept : main_ctx_( nullptr), - dispatching_ctx_( nullptr), - ready_queue_() { + dispatching_ctx_(), + ready_queue_(), + terminated_queue_(), + shutdown_( false) { } scheduler::~scheduler() noexcept { BOOST_ASSERT( nullptr != main_ctx_); - BOOST_ASSERT( dispatching_ctx_); + BOOST_ASSERT( nullptr != dispatching_ctx_.get() ); BOOST_ASSERT( context::active() == main_ctx_); - // FIXME: signal dispatching context termination - // wait till dispatching context agrees on termination - - // reset intrusive-pointer to dispatching context - // should destroy dispatching context stack + // signal dispatching context termination + shutdown_ = true; + // resume pending fibers + resume_( main_ctx_, get_next_() ); + // deallocate dispatching-context dispatching_ctx_.reset(); + // set main-context to nullptr + main_ctx_ = nullptr; } void -scheduler::main_context( context * main_ctx) noexcept { +scheduler::set_main_context( context * main_ctx) noexcept { BOOST_ASSERT( nullptr != main_ctx); main_ctx_ = main_ctx; } void -scheduler::dispatching_context( intrusive_ptr< context > dispatching_ctx) noexcept { +scheduler::set_dispatching_context( intrusive_ptr< context > dispatching_ctx) noexcept { BOOST_ASSERT( dispatching_ctx); dispatching_ctx_.swap( dispatching_ctx); // add dispatching context to ready-queue @@ -80,10 +100,26 @@ scheduler::dispatching_context( intrusive_ptr< context > dispatching_ctx) noexce } void -scheduler::dispatch() noexcept { - if ( ready_queue.empty() ) { - // FIXME: sleep +scheduler::dispatch() { + while ( ! shutdown_) { + release_terminated_(); } + release_terminated_(); + resume_( dispatching_ctx_.get(), main_ctx_); +} + +void +scheduler::set_terminated( context * ctx) noexcept { + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); + terminated_queue_.push_back( * ctx); +} + +void +scheduler::re_schedule( context * actx) noexcept { + BOOST_ASSERT( nullptr != actx); + resume_( actx, get_next_() ); } }}