From 4c791a3790ebd03f16229286d1a70e437562f5af Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Tue, 29 Sep 2015 17:54:29 +0200 Subject: [PATCH] support for fiber properties + custom scheduler --- build/Jamfile.v2 | 5 +- include/boost/fiber/algorithm.hpp | 107 ++++++++++++++++++++++++++++ include/boost/fiber/all.hpp | 1 + include/boost/fiber/context.hpp | 11 ++- include/boost/fiber/fiber.hpp | 8 +++ include/boost/fiber/operations.hpp | 38 ++++++++++ include/boost/fiber/properties.hpp | 76 ++++++++++++++++++++ include/boost/fiber/round_robin.hpp | 43 +++++++++++ include/boost/fiber/scheduler.hpp | 30 ++++---- src/algorithm.cpp | 34 +++++++++ src/context.cpp | 13 +++- src/properties.cpp | 40 +++++++++++ src/round_robin.cpp | 47 ++++++++++++ src/scheduler.cpp | 34 +++++---- 14 files changed, 458 insertions(+), 29 deletions(-) create mode 100644 include/boost/fiber/algorithm.hpp create mode 100644 include/boost/fiber/properties.hpp create mode 100644 include/boost/fiber/round_robin.hpp create mode 100644 src/algorithm.cpp create mode 100644 src/properties.cpp create mode 100644 src/round_robin.cpp diff --git a/build/Jamfile.v2 b/build/Jamfile.v2 index b366915b..5d5bb816 100644 --- a/build/Jamfile.v2 +++ b/build/Jamfile.v2 @@ -27,7 +27,8 @@ project boost/fiber ; lib boost_fiber - : barrier.cpp + : algorithm.cpp + barrier.cpp condition.cpp context.cpp detail/spinlock.cpp @@ -35,8 +36,10 @@ lib boost_fiber future.cpp interruption.cpp mutex.cpp + properties.cpp recursive_mutex.cpp recursive_timed_mutex.cpp + round_robin.cpp timed_mutex.cpp scheduler.cpp : shared:../../context/build//boost_context diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp new file mode 100644 index 00000000..0487a178 --- /dev/null +++ b/include/boost/fiber/algorithm.hpp @@ -0,0 +1,107 @@ +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_ALGORITHM_H +#define BOOST_FIBERS_ALGORITHM_H + +#include + +#include +#include + +#include +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { + +class context; + +struct BOOST_FIBERS_DECL sched_algorithm { + virtual ~sched_algorithm() {} + + virtual void awakened( context *) = 0; + + virtual context * pick_next() = 0; + + virtual bool has_ready_fibers() const noexcept = 0; +}; + +class BOOST_FIBERS_DECL sched_algorithm_with_properties_base : public sched_algorithm { +public: + // called by fiber_properties::notify() -- don't directly call + virtual void property_change_( context * f, fiber_properties * props) = 0; + +protected: + static fiber_properties* get_properties( context * f) noexcept; + static void set_properties( context * f, fiber_properties * p) noexcept; +}; + +template< typename PROPS > +struct sched_algorithm_with_properties : public sched_algorithm_with_properties_base { + typedef sched_algorithm_with_properties_base super; + + // Mark this override 'final': sched_algorithm_with_properties subclasses + // must override awakened() with properties parameter instead. Otherwise + // you'd have to remember to start every subclass awakened() override + // with: sched_algorithm_with_properties::awakened(fb); + virtual void awakened( context * f) final { + fiber_properties * props = super::get_properties( f); + if ( ! props) { + // TODO: would be great if PROPS could be allocated on the new + // fiber's stack somehow + props = new_properties( f); + // It is not good for new_properties() to return 0. + BOOST_ASSERT_MSG(props, "new_properties() must return non-NULL"); + // new_properties() must return instance of (a subclass of) PROPS + BOOST_ASSERT_MSG(dynamic_cast(props), + "new_properties() must return properties class"); + super::set_properties( f, props); + } + // Set sched_algo_ again every time this fiber becomes READY. That + // handles the case of a fiber migrating to a new thread with a new + // sched_algorithm subclass instance. + props->set_sched_algorithm( this); + + // Okay, now forward the call to subclass override. + awakened( f, properties(f) ); + } + + // subclasses override this method instead of the original awakened() + virtual void awakened( context *, PROPS& ) = 0; + + // used for all internal calls + PROPS& properties( context * f) { + return static_cast< PROPS & >( * super::get_properties( f) ); + } + + // override this to be notified by PROPS::notify() + virtual void property_change( context * f, PROPS & props) { + } + + // implementation for sched_algorithm_with_properties_base method + void property_change_( context * f, fiber_properties * props ) final { + property_change( f, * static_cast< PROPS * >( props) ); + } + + // Override this to customize instantiation of PROPS, e.g. use a different + // allocator. Each PROPS instance is associated with a particular + // context. + virtual fiber_properties * new_properties( context * f) { + return new PROPS( f); + } +}; + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ALGORITHM_H diff --git a/include/boost/fiber/all.hpp b/include/boost/fiber/all.hpp index 7ab008e6..cc1a3cee 100644 --- a/include/boost/fiber/all.hpp +++ b/include/boost/fiber/all.hpp @@ -7,6 +7,7 @@ #ifndef BOOST_FIBERS_H #define BOOST_FIBERS_H +#include #include #include #include diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index aaebba98..371c9190 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -184,6 +185,7 @@ private: fss_data_t fss_data_; wait_queue_t wait_queue_; detail::spinlock splk_; + fiber_properties * properties_; void set_terminated_() noexcept; @@ -299,7 +301,8 @@ public: tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), wait_queue_(), - splk_() { + splk_(), + properties_( nullptr) { // switch for initialization ctx_(); } @@ -368,6 +371,12 @@ public: void * data, bool cleanup_existing); + void set_properties( fiber_properties * props); + + fiber_properties * get_properties() const noexcept { + return properties_; + } + bool worker_is_linked(); bool terminated_is_linked(); diff --git a/include/boost/fiber/fiber.hpp b/include/boost/fiber/fiber.hpp index 98fc9563..40297648 100644 --- a/include/boost/fiber/fiber.hpp +++ b/include/boost/fiber/fiber.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -105,6 +106,13 @@ public: void interrupt() noexcept; void detach(); + + template< typename PROPS > + PROPS & properties() { + fiber_properties * props = impl_->get_properties(); + BOOST_ASSERT_MSG(props, "fiber::properties not set"); + return dynamic_cast< PROPS & >( * props ); + } }; inline diff --git a/include/boost/fiber/operations.hpp b/include/boost/fiber/operations.hpp index b9c64e5f..11ffd1e5 100644 --- a/include/boost/fiber/operations.hpp +++ b/include/boost/fiber/operations.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -50,6 +51,43 @@ void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) { interruption_point(); } +template< typename PROPS > +PROPS & properties() { + fibers::fiber_properties * props = + fibers::context::active()->get_properties(); + if ( ! props) { + // props could be nullptr if the thread's main fiber has not yet + // yielded (not yet passed through sched_algorithm_with_properties:: + // awakened()). Address that by yielding right now. + yield(); + // Try again to obtain the fiber_properties subclass instance ptr. + // Walk through the whole chain again because who knows WHAT might + // have happened while we were yielding! + props = fibers::context::active()->get_properties(); + // Could still be hosed if the running manager isn't a subclass of + // sched_algorithm_with_properties. + BOOST_ASSERT_MSG(props, "this_fiber::properties not set"); + } + return dynamic_cast< PROPS & >( * props ); +} + +} + +namespace fibers { + +inline +bool has_ready_fibers() { + return boost::fibers::context::active()->get_scheduler()->has_ready_fibers(); +} + +template< typename SchedAlgo, typename ... Args > +void use_scheduling_algorithm( Args && ... args) { + boost::fibers::context::active()->get_scheduler() + ->set_sched_algo( + std::make_unique< SchedAlgo >( + std::forward< Args >( args) ... ) ); +} + }} #ifdef BOOST_HAS_ABI_HEADERS diff --git a/include/boost/fiber/properties.hpp b/include/boost/fiber/properties.hpp new file mode 100644 index 00000000..0d15be70 --- /dev/null +++ b/include/boost/fiber/properties.hpp @@ -0,0 +1,76 @@ +// Copyright Nat Goodspeed 2014. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// Define fiber_properties, a base class from which a library consumer can +// derive a subclass with specific properties important to a user-coded +// scheduler. + +#ifndef BOOST_FIBERS_PROPERTIES_HPP +#define BOOST_FIBERS_PROPERTIES_HPP + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +# if defined(BOOST_MSVC) +# pragma warning(push) +# pragma warning(disable:4275) +# endif + +namespace boost { +namespace fibers { + +struct sched_algorithm; +class context; + +class BOOST_FIBERS_DECL fiber_properties { +protected: + // initialized by constructor + context * ctx_; + // set every time this fiber becomes READY + sched_algorithm * sched_algo_; + + // Inform the relevant sched_algorithm instance that something important + // has changed, so it can (presumably) adjust its data structures + // accordingly. + void notify(); + +public: + // Any specific property setter method, after updating the relevant + // instance variable, can/should call notify(). + + // fiber_properties, and by implication every subclass, must accept a back + // pointer to its context. + fiber_properties( context * ctx): + ctx_( ctx), + sched_algo_( nullptr) { + } + + // We need a virtual destructor (hence a vtable) because fiber_properties + // is stored polymorphically (as fiber_properties*) in context, and + // destroyed via that pointer. + virtual ~fiber_properties() { + } + + // not really intended for public use, but sched_algorithm_with_properties + // must be able to call this + void set_sched_algorithm( sched_algorithm * algo) { + sched_algo_ = algo; + } +}; + +}} // namespace boost::fibers + +# if defined(BOOST_MSVC) +# pragma warning(pop) +# endif + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_PROPERTIES_HPP diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp new file mode 100644 index 00000000..22034e74 --- /dev/null +++ b/include/boost/fiber/round_robin.hpp @@ -0,0 +1,43 @@ +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_DEFAULT_ROUND_ROBIN_H +#define BOOST_FIBERS_DEFAULT_ROUND_ROBIN_H + +#include + +#include +#include +#include +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { + +class context; + +class BOOST_FIBERS_DECL round_robin : public sched_algorithm { +private: + scheduler::ready_queue_t ready_queue_; + +public: + virtual void awakened( context *); + + virtual context * pick_next(); + + virtual bool has_ready_fibers() const noexcept; +}; + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_DEFAULT_ROUND_ROBIN_H diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index 2402bcaf..aec58789 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -7,12 +7,14 @@ #define BOOST_FIBERS_FIBER_MANAGER_H #include +#include #include #include #include #include +#include #include #include #include @@ -26,7 +28,7 @@ namespace boost { namespace fibers { class BOOST_FIBERS_DECL scheduler { -private: +public: struct timepoint_less { bool operator()( context const& l, context const& r) { return l.tp_ < r.tp_; @@ -64,25 +66,25 @@ private: & context::worker_hook_ >, intrusive::constant_time_size< false > > worker_queue_t; - context * main_ctx_; - intrusive_ptr< context > dispatcher_ctx_; +private: + std::unique_ptr< sched_algorithm > sched_algo_; + context * main_ctx_; + intrusive_ptr< context > dispatcher_ctx_; // worker-queue contains all context' mananged by this scheduler // except main-context and dispatcher-context // unlink happens on destruction of a context - worker_queue_t worker_queue_; + worker_queue_t worker_queue_; // terminated-queue contains context' which have been terminated - terminated_queue_t terminated_queue_; - // ready-queue contains context' ready to be resumed - ready_queue_t ready_queue_; + terminated_queue_t terminated_queue_; // remote ready-queue contains context' signaled by schedulers // running in other threads - remote_ready_queue_t remote_ready_queue_; + remote_ready_queue_t remote_ready_queue_; // sleep-queue cotnains context' whic hahve been called // scheduler::wait_until() - sleep_queue_t sleep_queue_; - bool shutdown_; - detail::autoreset_event ready_queue_ev_; - detail::spinlock remote_ready_splk_; + sleep_queue_t sleep_queue_; + bool shutdown_; + detail::autoreset_event ready_queue_ev_; + detail::spinlock remote_ready_splk_; void resume_( context *, context *); @@ -119,6 +121,10 @@ public: bool wait_until( context *, std::chrono::steady_clock::time_point const&) noexcept; void re_schedule( context *) noexcept; + + bool has_ready_fibers() const noexcept; + + void set_sched_algo( std::unique_ptr< sched_algorithm >); }; }} diff --git a/src/algorithm.cpp b/src/algorithm.cpp new file mode 100644 index 00000000..7fd8430c --- /dev/null +++ b/src/algorithm.cpp @@ -0,0 +1,34 @@ + +// Copyright Oliver Kowalke / Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include "boost/fiber/algorithm.hpp" + +#include "boost/fiber/context.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { + +//static +fiber_properties * +sched_algorithm_with_properties_base::get_properties( context * ctx) noexcept { + return ctx->get_properties(); +} + +//static +void +sched_algorithm_with_properties_base::set_properties( context * ctx, fiber_properties * props) noexcept { + ctx->set_properties( props); +} + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif diff --git a/src/context.cpp b/src/context.cpp index b64faf2a..83dc7abd 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -68,7 +68,8 @@ context::context( main_context_t) : tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), wait_queue_(), - splk_() { + splk_(), + properties_( nullptr) { } // dispatcher fiber context @@ -94,7 +95,8 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), wait_queue_(), - splk_() { + splk_(), + properties_( nullptr) { } context::~context() { @@ -103,6 +105,7 @@ context::~context() { BOOST_ASSERT( ! remote_ready_is_linked() ); BOOST_ASSERT( ! sleep_is_linked() ); BOOST_ASSERT( ! wait_is_linked() ); + delete properties_; } void @@ -278,6 +281,12 @@ context::set_fss_data( void const * vp, } } +void +context::set_properties( fiber_properties * props) { + delete properties_; + properties_ = props; +} + bool context::worker_is_linked() { std::unique_lock< detail::spinlock > lk( hook_splk_); diff --git a/src/properties.cpp b/src/properties.cpp new file mode 100644 index 00000000..c7deca8b --- /dev/null +++ b/src/properties.cpp @@ -0,0 +1,40 @@ +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include "boost/fiber/properties.hpp" + +#include + +#include "boost/fiber/algorithm.hpp" +#include "boost/fiber/scheduler.hpp" +#include "boost/fiber/context.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { + +void +fiber_properties::notify() { + BOOST_ASSERT( nullptr != sched_algo_); + // Application code might change an important property for any fiber at + // any time. The fiber in question might be ready, running or waiting. + // Significantly, only a fiber which is ready but not actually running is + // in the sched_algorithm's ready queue. Don't bother the sched_algorithm + // with a change to a fiber it's not currently tracking: it will do the + // right thing next time the fiber is passed to its awakened() method. + if ( ctx_->ready_is_linked() ) { + static_cast< sched_algorithm_with_properties_base * >( sched_algo_)-> + property_change_( ctx_, this); + } +} + +}} // boost::fibers + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif diff --git a/src/round_robin.cpp b/src/round_robin.cpp new file mode 100644 index 00000000..c21786a9 --- /dev/null +++ b/src/round_robin.cpp @@ -0,0 +1,47 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include "boost/fiber/round_robin.hpp" + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { + +void +round_robin::awakened( context * ctx) { + BOOST_ASSERT( nullptr != ctx); + + BOOST_ASSERT( ! ctx->ready_is_linked() ); + ctx->ready_link( ready_queue_); +} + +context * +round_robin::pick_next() { + context * victim( nullptr); + if ( ! ready_queue_.empty() ) { + victim = & ready_queue_.front(); + ready_queue_.pop_front(); + BOOST_ASSERT( nullptr != victim); + BOOST_ASSERT( ! victim->ready_is_linked() ); + } + return victim; +} + +bool +round_robin::has_ready_fibers() const noexcept { + return ! ready_queue_.empty(); +} + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif diff --git a/src/scheduler.cpp b/src/scheduler.cpp index ff8d249b..ba045b4e 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -13,6 +13,7 @@ #include "boost/fiber/context.hpp" #include "boost/fiber/exceptions.hpp" +#include "boost/fiber/round_robin.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -31,6 +32,8 @@ scheduler::resume_( context * active_ctx, context * ctx) { BOOST_ASSERT( main_ctx_ == ctx || dispatcher_ctx_.get() == ctx || ctx->worker_is_linked() ); + BOOST_ASSERT( this == active_ctx->get_scheduler() ); + BOOST_ASSERT( this == ctx->get_scheduler() ); BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() ); // fiber next-to-run is same as current active-fiber // this might happen in context of this_fiber::yield() @@ -52,12 +55,7 @@ scheduler::resume_( context * active_ctx, context * ctx) { context * scheduler::get_next_() noexcept { - context * ctx( nullptr); - if ( ! ready_queue_.empty() ) { - ctx = & ready_queue_.front(); - ready_queue_.pop_front(); - } - return ctx; + return sched_algo_->pick_next(); } void @@ -121,7 +119,7 @@ scheduler::sleep2ready_() noexcept { // reset sleep-tp ctx->tp_ = (std::chrono::steady_clock::time_point::max)(); // push new context to ready-queue - ctx->ready_link( ready_queue_); + sched_algo_->awakened( ctx); } else { break; // first context with now < deadline } @@ -129,11 +127,11 @@ scheduler::sleep2ready_() noexcept { } scheduler::scheduler() noexcept : + sched_algo_( new round_robin() ), main_ctx_( nullptr), dispatcher_ctx_(), worker_queue_(), terminated_queue_(), - ready_queue_(), remote_ready_queue_(), sleep_queue_(), shutdown_( false), @@ -152,7 +150,7 @@ scheduler::~scheduler() noexcept { // no context' in worker-queue BOOST_ASSERT( worker_queue_.empty() ); BOOST_ASSERT( terminated_queue_.empty() ); - BOOST_ASSERT( ready_queue_.empty() ); + BOOST_ASSERT( ! sched_algo_->has_ready_fibers() ); BOOST_ASSERT( remote_ready_queue_.empty() ); BOOST_ASSERT( sleep_queue_.empty() ); // deallocate dispatcher-context @@ -187,7 +185,7 @@ scheduler::set_dispatcher_context( intrusive_ptr< context > dispatcher_ctx) noex // the dispatcher-context is resumed and // scheduler::dispatch() is executed dispatcher_ctx_->set_scheduler( this); - dispatcher_ctx_->ready_link( ready_queue_); + sched_algo_->awakened( dispatcher_ctx_.get() ); } void @@ -226,7 +224,7 @@ scheduler::dispatch() { } // push dispatcher-context to ready-queue // so that ready-queue never becomes empty - dispatcher_ctx_->ready_link( ready_queue_); + sched_algo_->awakened( dispatcher_ctx_.get() ); resume_( dispatcher_ctx_.get(), ctx); BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); } @@ -278,7 +276,7 @@ scheduler::set_ready( context * ctx) noexcept { // signaled to interrupt if ( ! ctx->ready_is_linked() ) { // push new context to ready-queue - ctx->ready_link( ready_queue_); + sched_algo_->awakened( ctx); } } @@ -328,7 +326,7 @@ scheduler::yield( context * active_ctx) noexcept { // context::wait_is_linked() is not sychronized // with other threads // push active context to ready-queue - active_ctx->ready_link( ready_queue_); + sched_algo_->awakened( active_ctx); // resume another fiber resume_( active_ctx, get_next_() ); } @@ -374,6 +372,16 @@ scheduler::re_schedule( context * active_ctx) noexcept { resume_( active_ctx, get_next_() ); } +bool +scheduler::has_ready_fibers() const noexcept { + return sched_algo_->has_ready_fibers(); +} + +void +scheduler::set_sched_algo( std::unique_ptr< sched_algorithm > algo) { + sched_algo_ = std::move( algo); +} + }} #ifdef BOOST_HAS_ABI_HEADERS