diff --git a/build/Jamfile.v2 b/build/Jamfile.v2 index cd69bfa5..36bc1a89 100644 --- a/build/Jamfile.v2 +++ b/build/Jamfile.v2 @@ -41,6 +41,7 @@ lib boost_fiber future.cpp interruption.cpp mutex.cpp + properties.cpp recursive_mutex.cpp recursive_timed_mutex.cpp round_robin.cpp diff --git a/examples/cpp03/migration/workstealing_round_robin.cpp b/examples/cpp03/migration/workstealing_round_robin.cpp index 7617cc11..fb79afa1 100644 --- a/examples/cpp03/migration/workstealing_round_robin.cpp +++ b/examples/cpp03/migration/workstealing_round_robin.cpp @@ -12,47 +12,71 @@ # include BOOST_ABI_PREFIX #endif +workstealing_round_robin::workstealing_round_robin(): + rhead_(0), + rtail_(&rhead_) +{} + void -workstealing_round_robin::awakened( boost::fibers::detail::worker_fiber * f) +workstealing_round_robin::awakened( boost::fibers::fiber_base * f) { + // forward this call to base-class method + boost::fibers::sched_algorithm_with_properties::awakened(f); + boost::mutex::scoped_lock lk( mtx_); - rqueue_.push_back( f); + // append this fiber_base* to ready queue + BOOST_ASSERT(! f->nxt_); + *rtail_ = f; + rtail_ = &f->nxt_; } -boost::fibers::detail::worker_fiber * +boost::fibers::fiber_base * workstealing_round_robin::pick_next() { boost::mutex::scoped_lock lk( mtx_); - boost::fibers::detail::worker_fiber * f = 0; - if ( ! rqueue_.empty() ) + boost::fibers::fiber_base * f = 0; + if ( rhead_ ) { - f = rqueue_.front(); - rqueue_.pop_front(); + f = rhead_; + // pop head item from ready queue + rhead_ = rhead_->nxt_; + f->nxt_ = 0; + // if that was the last item, reset tail_ + if (! rhead_) + rtail_ = &rhead_; } return f; } -void -workstealing_round_robin::priority( boost::fibers::detail::worker_fiber * f, int prio) BOOST_NOEXCEPT -{ - BOOST_ASSERT( f); - - // set only priority to fiber - // round-robin does not respect priorities - f->priority( prio); -} - boost::fibers::fiber workstealing_round_robin::steal() BOOST_NOEXCEPT { boost::mutex::scoped_lock lk( mtx_); - boost::fibers::detail::worker_fiber * f = 0; - if ( ! rqueue_.empty() ) + + // Search the queue for the LAST fiber_base that's willing to migrate, + // in other words (! thread_affinity). + boost::fibers::fiber_base ** fp = &rhead_, ** found = 0; + for ( ; *fp; fp = &(*fp)->nxt_) { - f = rqueue_.back(); - rqueue_.pop_back(); + // do not consider any fiber whose thread_affinity is set + if (! properties(*fp).thread_affinity) + found = fp; } - return boost::fibers::fiber( f); + if (! found) + { + // either the queue is completely empty or all current entries have + // thread_affinity set + return boost::fibers::fiber(static_cast(0)); + } + // We found at least one fiber_base whose thread_affinity is NOT set; + // *found points to the last of these. Unlink and return it. + boost::fibers::fiber_base* ret = *found; + *found = ret->nxt_; + ret->nxt_ = 0; + // if that was the last item, reset tail_ + if (! *found) + rtail_ = &rhead_; + return boost::fibers::fiber(ret); } #ifdef BOOST_HAS_ABI_HEADERS diff --git a/examples/cpp03/migration/workstealing_round_robin.hpp b/examples/cpp03/migration/workstealing_round_robin.hpp index c9373c31..e1fb2d87 100644 --- a/examples/cpp03/migration/workstealing_round_robin.hpp +++ b/examples/cpp03/migration/workstealing_round_robin.hpp @@ -11,10 +11,8 @@ #include #include -#include -#include -#include -#include +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -25,20 +23,34 @@ # pragma warning(disable:4251 4275) # endif -class workstealing_round_robin : public boost::fibers::sched_algorithm +struct affinity: public boost::fibers::fiber_properties +{ + affinity(boost::fibers::fiber_properties::back_ptr p): + fiber_properties(p), + // By default, assume a given fiber CAN migrate to another thread. + thread_affinity(false) + {} + + bool thread_affinity; +}; + +class workstealing_round_robin : + public boost::fibers::sched_algorithm_with_properties { private: - typedef std::deque< boost::fibers::detail::worker_fiber * > rqueue_t; - boost::mutex mtx_; - rqueue_t rqueue_; + + // We should package these as a queue class. Better yet, we should + // refactor one of our existing (intrusive) queue classes to support the + // required operations generically. But for now... + boost::fibers::fiber_base *rhead_, **rtail_; public: - virtual void awakened( boost::fibers::detail::worker_fiber *); + workstealing_round_robin(); - virtual boost::fibers::detail::worker_fiber * pick_next(); + virtual void awakened( boost::fibers::fiber_base *); - virtual void priority( boost::fibers::detail::worker_fiber *, int) BOOST_NOEXCEPT; + virtual boost::fibers::fiber_base * pick_next(); boost::fibers::fiber steal(); }; diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp index 9eae1236..848436d7 100644 --- a/include/boost/fiber/algorithm.hpp +++ b/include/boost/fiber/algorithm.hpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -24,15 +25,68 @@ namespace boost { namespace fibers { +// hoist fiber_base out of detail namespace into boost::fibers +typedef detail::fiber_base fiber_base; + struct sched_algorithm { virtual ~sched_algorithm() {} - virtual void awakened( detail::worker_fiber *) = 0; + virtual void awakened( fiber_base *) = 0; - virtual detail::worker_fiber * pick_next() = 0; + virtual fiber_base * pick_next() = 0; - virtual void priority( detail::worker_fiber *, int) BOOST_NOEXCEPT = 0; + virtual void property_change( fiber_base *, fiber_properties* ) {} +}; + +namespace detail { +// support sched_algorithm_with_properties::properties(fiber::id) +inline +fiber_base* extract_base(fiber_base::id id) { return id.impl_; } +} // detail + +template +struct sched_algorithm_with_properties: public sched_algorithm +{ +public: + typedef sched_algorithm_with_properties super; + + // Start every subclass awakened() override with: + // sched_algorithm_with_properties::awakened(fb); + virtual void awakened( fiber_base *fb) + { + detail::worker_fiber* f = static_cast(fb); + if (! f->get_properties()) + { + // TODO: would be great if PROPS could be allocated on the new + // fiber's stack somehow + f->set_properties(new PROPS(f)); + } + // Set sched_algo_ again every time this fiber becomes READY. That + // handles the case of a fiber migrating to a new thread with a new + // sched_algorithm subclass instance. + f->get_properties()->set_sched_algorithm(this); + } + + // used for all internal calls + PROPS& properties(fiber_base* f) + { + return static_cast(*static_cast(f)->get_properties()); + } + + // public-facing properties(fiber::id) method in case consumer retains a + // pointer to supplied sched_algorithm_with_properties subclass + PROPS& properties(detail::worker_fiber::id id) + { + return properties(extract(id)); + } + +private: + // support sched_algorithm_with_properties::properties(fiber::id) + detail::worker_fiber* extract(detail::worker_fiber::id id) + { + return static_cast(detail::extract_base(id)); + } }; }} diff --git a/include/boost/fiber/bounded_queue.hpp b/include/boost/fiber/bounded_queue.hpp index 1e484086..affa325a 100644 --- a/include/boost/fiber/bounded_queue.hpp +++ b/include/boost/fiber/bounded_queue.hpp @@ -93,7 +93,7 @@ private: state_t state_; std::size_t count_; typename node_type::ptr head_; - typename node_type::ptr tail_; + typename node_type::ptr * tail_; mutable mutex mtx_; condition not_empty_cond_; condition not_full_cond_; @@ -171,13 +171,8 @@ private: void push_tail_( typename node_type::ptr new_node) { - if ( is_empty_() ) - head_ = tail_ = new_node; - else - { - tail_->next = new_node; - tail_ = new_node; - } + *tail_ = new_node; + tail_ = &new_node->next; ++count_; } @@ -210,7 +205,7 @@ private: { typename node_type::ptr old_head = head_; head_ = old_head->next; - if ( 0 == head_) tail_ = 0; + if ( 0 == head_) tail_ = &head_; old_head->next = 0; return old_head; } @@ -222,7 +217,7 @@ public: state_( OPEN), count_( 0), head_(), - tail_( head_), + tail_( &head_), mtx_(), not_empty_cond_(), not_full_cond_(), @@ -240,7 +235,7 @@ public: state_( OPEN), count_( 0), head_(), - tail_( head_), + tail_( &head_), mtx_(), not_empty_cond_(), not_full_cond_(), diff --git a/include/boost/fiber/detail/fiber_base.hpp b/include/boost/fiber/detail/fiber_base.hpp index 00e11eb6..d4ff6457 100644 --- a/include/boost/fiber/detail/fiber_base.hpp +++ b/include/boost/fiber/detail/fiber_base.hpp @@ -39,6 +39,9 @@ public: private: fiber_base * impl_; + // support sched_algorithm_with_properties::properties(fiber::id) + friend fiber_base* extract_base(id); + public: id() BOOST_NOEXCEPT : impl_( 0) @@ -83,7 +86,8 @@ public: { return 0 == impl_; } }; - fiber_base() + fiber_base(): + nxt_(0) {} virtual ~fiber_base() {}; @@ -93,6 +97,9 @@ public: virtual void set_ready() BOOST_NOEXCEPT = 0; virtual id get_id() const BOOST_NOEXCEPT = 0; + + // for use by sched_algorithm to queue any subclass instance + fiber_base * nxt_; }; }}} diff --git a/include/boost/fiber/detail/fifo.hpp b/include/boost/fiber/detail/fifo.hpp index adf5aceb..df58baf5 100644 --- a/include/boost/fiber/detail/fifo.hpp +++ b/include/boost/fiber/detail/fifo.hpp @@ -30,34 +30,32 @@ class fifo : private noncopyable public: fifo() BOOST_NOEXCEPT : head_( 0), - tail_( 0) + tail_( &head_) {} bool empty() const BOOST_NOEXCEPT { return 0 == head_; } - void push( worker_fiber * item) BOOST_NOEXCEPT + void push( fiber_base * item) BOOST_NOEXCEPT { BOOST_ASSERT( 0 != item); - BOOST_ASSERT( 0 == item->next() ); + BOOST_ASSERT( 0 == item->nxt_ ); - if ( empty() ) - head_ = tail_ = item; - else - { - tail_->next( item); - tail_ = item; - } + // *tail_ holds the null marking the end of the fifo. So we can extend + // the fifo by assigning to *tail_. + *tail_ = item; + // Advance tail_ to point to the new end marker. + tail_ = &item->nxt_; } - worker_fiber * pop() BOOST_NOEXCEPT + fiber_base * pop() BOOST_NOEXCEPT { BOOST_ASSERT( ! empty() ); - worker_fiber * item = head_; - head_ = head_->next(); - if ( 0 == head_) tail_ = 0; - item->next_reset(); + fiber_base * item = head_; + head_ = head_->nxt_; + if ( 0 == head_) tail_ = &head_; + item->nxt_ = 0; return item; } @@ -68,8 +66,13 @@ public: } private: - worker_fiber * head_; - worker_fiber * tail_; + // head_ points to the head item, or is null + fiber_base * head_; + // tail_ points to the nxt_ field that contains the null that marks the + // end of the fifo. When the fifo is empty, tail_ points to head_. tail_ + // must never be null: it always points to a real fiber_base*. However, in + // normal use, (*tail_) is always null. + fiber_base ** tail_; }; }}} diff --git a/include/boost/fiber/detail/flags.hpp b/include/boost/fiber/detail/flags.hpp index 559990e7..b0ea7b60 100644 --- a/include/boost/fiber/detail/flags.hpp +++ b/include/boost/fiber/detail/flags.hpp @@ -23,8 +23,7 @@ enum flag_t { flag_interruption_blocked = 1 << 0, flag_interruption_requested = 1 << 1, - flag_thread_affinity = 1 << 2, - flag_detached = 1 << 3 + flag_detached = 1 << 2 }; }}} diff --git a/include/boost/fiber/detail/waiting_queue.hpp b/include/boost/fiber/detail/waiting_queue.hpp index e53f9632..886d4342 100644 --- a/include/boost/fiber/detail/waiting_queue.hpp +++ b/include/boost/fiber/detail/waiting_queue.hpp @@ -30,8 +30,7 @@ class waiting_queue : private noncopyable { public: waiting_queue() BOOST_NOEXCEPT : - head_( 0), - tail_( 0) + head_( 0) {} bool empty() const BOOST_NOEXCEPT @@ -40,55 +39,38 @@ public: void push( worker_fiber * item) BOOST_NOEXCEPT { BOOST_ASSERT( 0 != item); - BOOST_ASSERT( 0 == item->next() ); + BOOST_ASSERT( 0 == item->nxt_ ); - if ( empty() ) - head_ = tail_ = item; - else - { - worker_fiber * f = head_, * prev = 0; - do - { - worker_fiber * nxt = f->next(); - if ( item->time_point() <= f->time_point() ) - { - if ( head_ == f) - { - BOOST_ASSERT( 0 == prev); + // Skip past any worker_fibers in the queue whose time_point() is less + // than item->time_point(), looking for the first worker_fiber in the + // queue whose time_point() is at least item->time_point(). Insert + // item before that. In other words, insert item so as to preserve + // ascending order of time_point() values. (Recall that a worker_fiber + // waiting with no timeout uses the maximum time_point value.) - item->next( f); - head_ = item; - } - else - { - BOOST_ASSERT( 0 != prev); + // We do this by walking the linked list of nxt_ fields with a + // fiber_base**. In other words, first we point to &head_, then to + // &head_->nxt_, then to &head_->nxt_->nxt_ and so forth. When we find + // the item with the right time_point(), we're already pointing to the + // fiber_base* that links it into the list. Insert item right there. - item->next( f); - prev->next( item); - } - break; - } - else if ( tail_ == f) - { - BOOST_ASSERT( 0 == nxt); + fiber_base** f = &head_; + for ( ; *f; f = &(*f)->nxt_) + if (item->time_point() <= (static_cast(*f))->time_point()) + break; - tail_->next( item); - tail_ = item; - break; - } - - prev = f; - f = nxt; - } - while ( 0 != f); - } + // Here, either we reached the end of the list (! *f) or we found a + // (*f) before which to insert 'item'. Break the link at *f and insert + // item. + item->nxt_ = *f; + *f = item; } worker_fiber * top() const BOOST_NOEXCEPT { BOOST_ASSERT( ! empty() ); - return head_; + return static_cast(head_); } template< typename SchedAlgo, typename Fn > @@ -96,49 +78,41 @@ public: { BOOST_ASSERT( sched_algo); - worker_fiber * f = head_, * prev = 0; chrono::high_resolution_clock::time_point now( chrono::high_resolution_clock::now() ); - while ( 0 != f) + + // Search the queue for every worker_fiber 'f' for which fn(f, now) + // returns true. Each time we find such a worker_fiber, unlink it from + // the queue and pass it to sched_algo->awakened(). + + // Search using a fiber_base**, starting at &head_. + for (fiber_base** fp = &head_; *fp; ) { - worker_fiber * nxt = f->next(); - if ( fn( f, now) ) + worker_fiber *f = static_cast(*fp); + + if (! fn(f, now)) { - if ( f == head_) - { - BOOST_ASSERT( 0 == prev); - - head_ = nxt; - if ( 0 == head_) - tail_ = 0; - } - else - { - BOOST_ASSERT( 0 != prev); - - if ( 0 == nxt) - tail_ = prev; - - prev->next( nxt); - } - f->next_reset(); - f->time_point_reset(); - sched_algo->awakened( f); + // If f does NOT meet caller's criteria, skip fp past it. + fp = &(*fp)->nxt_; } else - prev = f; - f = nxt; + { + // Here f satisfies our caller. Unlink it from the list. + *fp = (*fp)->nxt_; + f->nxt_ = 0; + // Pass the newly-unlinked worker_fiber* to sched_algo. + f->time_point_reset(); + sched_algo->awakened(f); + } } } void swap( waiting_queue & other) { std::swap( head_, other.head_); - std::swap( tail_, other.tail_); } private: - worker_fiber * head_; - worker_fiber * tail_; + fiber_base * head_; }; }}} diff --git a/include/boost/fiber/detail/worker_fiber.hpp b/include/boost/fiber/detail/worker_fiber.hpp index ed5c139b..ef9d03d1 100644 --- a/include/boost/fiber/detail/worker_fiber.hpp +++ b/include/boost/fiber/detail/worker_fiber.hpp @@ -41,6 +41,9 @@ namespace boost { namespace fibers { + +class fiber_properties; + namespace detail { namespace coro = boost::coroutines; @@ -89,16 +92,15 @@ private: atomic< std::size_t > use_count_; fss_data_t fss_data_; - worker_fiber * nxt_; chrono::high_resolution_clock::time_point tp_; coro_t::yield_type * callee_; coro_t::call_type caller_; atomic< state_t > state_; atomic< int > flags_; - atomic< int > priority_; exception_ptr except_; spinlock splk_; std::vector< worker_fiber * > waiting_; + fiber_properties * properties_; public: worker_fiber( coro_t::yield_type *); @@ -108,12 +110,6 @@ public: id get_id() const BOOST_NOEXCEPT { return id( const_cast< worker_fiber * >( this) ); } - int priority() const BOOST_NOEXCEPT - { return priority_; } - - void priority( int prio) BOOST_NOEXCEPT - { priority_ = prio; } - bool join( worker_fiber *); bool interruption_blocked() const BOOST_NOEXCEPT @@ -126,11 +122,6 @@ public: void request_interruption( bool req) BOOST_NOEXCEPT; - bool thread_affinity() const BOOST_NOEXCEPT - { return 0 != ( flags_.load() & flag_thread_affinity); } - - void thread_affinity( bool req) BOOST_NOEXCEPT; - bool is_terminated() const BOOST_NOEXCEPT { return TERMINATED == state_; } @@ -214,15 +205,6 @@ public: BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm } - worker_fiber * next() const BOOST_NOEXCEPT - { return nxt_; } - - void next( worker_fiber * nxt) BOOST_NOEXCEPT - { nxt_ = nxt; } - - void next_reset() BOOST_NOEXCEPT - { nxt_ = 0; } - chrono::high_resolution_clock::time_point const& time_point() const BOOST_NOEXCEPT { return tp_; } @@ -254,6 +236,13 @@ public: f->deallocate(); } } + + void set_properties( fiber_properties* props); + + fiber_properties* get_properties() + { + return properties_; + } }; }}} diff --git a/include/boost/fiber/fiber.hpp b/include/boost/fiber/fiber.hpp index ab995b92..0261e5f5 100644 --- a/include/boost/fiber/fiber.hpp +++ b/include/boost/fiber/fiber.hpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #ifdef BOOST_HAS_ABI_HEADERS @@ -107,8 +108,10 @@ public: impl_() {} - explicit fiber( detail::worker_fiber * impl) BOOST_NOEXCEPT : - impl_( impl) + // This fiber_base* is allowed to be 0 -- call joinable() before performing + // operations on such a fiber object! + explicit fiber( fiber_base * impl) BOOST_NOEXCEPT : + impl_( dynamic_cast(impl)) {} #ifdef BOOST_MSVC @@ -305,19 +308,17 @@ public: id get_id() const BOOST_NOEXCEPT { return 0 != impl_ ? impl_->get_id() : id(); } - int priority() const BOOST_NOEXCEPT; - - void priority( int) BOOST_NOEXCEPT; - - bool thread_affinity() const BOOST_NOEXCEPT; - - void thread_affinity( bool) BOOST_NOEXCEPT; - void detach() BOOST_NOEXCEPT; void join(); void interrupt() BOOST_NOEXCEPT; + + template + PROPS& properties() + { + return fm_properties(impl_); + } }; inline diff --git a/include/boost/fiber/fiber_manager.hpp b/include/boost/fiber/fiber_manager.hpp index e991f38e..5a8ff6a6 100644 --- a/include/boost/fiber/fiber_manager.hpp +++ b/include/boost/fiber/fiber_manager.hpp @@ -20,7 +20,7 @@ #include #include #include -#include +//#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -54,9 +54,9 @@ void fm_resume_( detail::worker_fiber *); void fm_set_sched_algo( sched_algorithm *); -void fm_spawn( detail::worker_fiber *); +sched_algorithm* fm_get_sched_algo_(); -void fm_priority( detail::worker_fiber *, int) BOOST_NOEXCEPT; +void fm_spawn( detail::worker_fiber *); void fm_wait_interval( chrono::high_resolution_clock::duration const&) BOOST_NOEXCEPT; template< typename Rep, typename Period > @@ -98,6 +98,21 @@ chrono::high_resolution_clock::time_point fm_next_wakeup(); void fm_migrate( detail::worker_fiber *); +// implementation for fiber::properties() +template < class PROPS > +PROPS& fm_properties( detail::worker_fiber * f ) +{ + return dynamic_cast&>(*fm_get_sched_algo_()) + .properties(f); +} + +// implementation for this_fiber::properties() +template < class PROPS > +PROPS& fm_properties() +{ + return fm_properties(fm_active()); +} + }} # if defined(BOOST_MSVC) diff --git a/include/boost/fiber/operations.hpp b/include/boost/fiber/operations.hpp index 4bd3724c..8c24bb2f 100644 --- a/include/boost/fiber/operations.hpp +++ b/include/boost/fiber/operations.hpp @@ -64,22 +64,13 @@ template< typename Rep, typename Period > void sleep_for( chrono::duration< Rep, Period > const& timeout_duration) { sleep_until( chrono::high_resolution_clock::now() + timeout_duration); } -inline -bool thread_affinity() BOOST_NOEXCEPT +template < class PROPS > +PROPS& properties() { - return 0 != fibers::fm_active() - ? fibers::fm_active()->thread_affinity() - : true; + return fibers::fm_properties(); } -inline -void thread_affinity( bool req) BOOST_NOEXCEPT -{ - if ( 0 != fibers::fm_active() ) - fibers::fm_active()->thread_affinity( req); -} - -} +} // this_fiber namespace fibers { diff --git a/include/boost/fiber/properties.hpp b/include/boost/fiber/properties.hpp new file mode 100644 index 00000000..5ae52eab --- /dev/null +++ b/include/boost/fiber/properties.hpp @@ -0,0 +1,80 @@ +// Copyright Nat Goodspeed 2014. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// Define fiber_properties, a base class from which a library consumer can +// derive a subclass with specific properties important to a user-coded +// scheduler. + +#ifndef BOOST_FIBERS_PROPERTIES_HPP +#define BOOST_FIBERS_PROPERTIES_HPP + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +# if defined(BOOST_MSVC) +# pragma warning(push) +# pragma warning(disable:4275) +# endif + +namespace boost { +namespace fibers { + +namespace detail { +class worker_fiber; +} // detail + +struct sched_algorithm; + +class fiber_properties +{ +protected: + // initialized by constructor + detail::worker_fiber* fiber_; + // set every time this fiber becomes READY + sched_algorithm* sched_algo_; + + // Inform the relevant sched_algorithm instance that something important + // has changed, so it can (presumably) adjust its data structures + // accordingly. + void notify(); + +public: + // fiber_properties, and by implication every subclass, must accept a back + // pointer to its worker_fiber. + typedef detail::worker_fiber* back_ptr; + // Any specific property setter method, after updating the relevant + // instance variable, can/should call notify(). + fiber_properties(back_ptr f): + fiber_(f), + sched_algo_(0) + {} + + // We need a virtual destructor (hence a vtable) because fiber_properties + // is stored polymorphically (as fiber_properties*) in worker_fiber, and + // destroyed via that pointer. + ~fiber_properties() {} + + // not really intended for public use, but sched_algorithm_with_properties + // must be able to call this + void set_sched_algorithm(sched_algorithm* algo) + { + sched_algo_ = algo; + } +}; + +}} // namespace boost::fibers + +# if defined(BOOST_MSVC) +# pragma warning(pop) +# endif + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_PROPERTIES_HPP diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp index 09f7d5ef..6892a57f 100644 --- a/include/boost/fiber/round_robin.hpp +++ b/include/boost/fiber/round_robin.hpp @@ -33,11 +33,9 @@ private: rqueue_t rqueue_; public: - virtual void awakened( detail::worker_fiber *); + virtual void awakened( fiber_base *); - virtual detail::worker_fiber * pick_next(); - - virtual void priority( detail::worker_fiber *, int) BOOST_NOEXCEPT; + virtual fiber_base * pick_next(); }; }} diff --git a/include/boost/fiber/unbounded_queue.hpp b/include/boost/fiber/unbounded_queue.hpp index 20e08b40..a199d04d 100644 --- a/include/boost/fiber/unbounded_queue.hpp +++ b/include/boost/fiber/unbounded_queue.hpp @@ -90,7 +90,7 @@ private: state state_; typename node_type::ptr head_; - typename node_type::ptr tail_; + typename node_type::ptr * tail_; mutable mutex mtx_; condition not_empty_cond_; @@ -132,13 +132,8 @@ private: void push_tail_( typename node_type::ptr new_node) { - if ( is_empty_() ) - head_ = tail_ = new_node; - else - { - tail_->next = new_node; - tail_ = new_node; - } + *tail_ = new_node; + tail_ = &new_node->next; } value_type value_pop_() @@ -161,7 +156,7 @@ private: { typename node_type::ptr old_head = head_; head_ = old_head->next; - if ( 0 == head_) tail_ = 0; + if ( 0 == head_) tail_ = &head_; old_head->next = 0; return old_head; } @@ -170,7 +165,7 @@ public: unbounded_queue() : state_( OPEN), head_(), - tail_( head_), + tail_( &head_), mtx_(), not_empty_cond_() {} diff --git a/src/detail/worker_fiber.cpp b/src/detail/worker_fiber.cpp index 02de34ca..b5fa9338 100644 --- a/src/detail/worker_fiber.cpp +++ b/src/detail/worker_fiber.cpp @@ -14,6 +14,7 @@ #include "boost/fiber/detail/scheduler.hpp" #include "boost/fiber/exceptions.hpp" +#include "boost/fiber/properties.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -29,21 +30,22 @@ worker_fiber::worker_fiber( coro_t::yield_type * callee) : fiber_base(), use_count_( 1), // allocated on stack fss_data_(), - nxt_( 0), tp_( (chrono::high_resolution_clock::time_point::max)() ), callee_( callee), caller_(), state_( READY), flags_( 0), - priority_( 0), except_(), - waiting_() + waiting_(), + properties_(0) { BOOST_ASSERT( callee_); } worker_fiber::~worker_fiber() { BOOST_ASSERT( is_terminated() ); BOOST_ASSERT( waiting_.empty() ); + + delete properties_; } void @@ -95,15 +97,6 @@ worker_fiber::request_interruption( bool req) BOOST_NOEXCEPT flags_ &= ~flag_interruption_requested; } -void -worker_fiber::thread_affinity( bool req) BOOST_NOEXCEPT -{ - if ( req) - flags_ |= flag_thread_affinity; - else - flags_ &= ~flag_thread_affinity; -} - void * worker_fiber::get_fss_data( void const* vp) const { @@ -142,6 +135,13 @@ worker_fiber::set_fss_data( fss_data( data, cleanup_fn) ) ); } +void +worker_fiber::set_properties( fiber_properties* props) +{ + delete properties_; + properties_ = props; +} + }}} #ifdef BOOST_HAS_ABI_HEADERS diff --git a/src/fiber.cpp b/src/fiber.cpp index f70b798c..77bd7a93 100644 --- a/src/fiber.cpp +++ b/src/fiber.cpp @@ -28,38 +28,6 @@ fiber::start_() fm_spawn( impl_.get() ); } -int -fiber::priority() const BOOST_NOEXCEPT -{ - BOOST_ASSERT( impl_); - - return impl_->priority(); -} - -void -fiber::priority( int prio) BOOST_NOEXCEPT -{ - BOOST_ASSERT( impl_); - - fm_priority( impl_.get(), prio); -} - -bool -fiber::thread_affinity() const BOOST_NOEXCEPT -{ - BOOST_ASSERT( impl_); - - return impl_->thread_affinity(); -} - -void -fiber::thread_affinity( bool req) BOOST_NOEXCEPT -{ - BOOST_ASSERT( impl_); - - impl_->thread_affinity( req); -} - void fiber::join() { diff --git a/src/fiber_manager.cpp b/src/fiber_manager.cpp index 6cf3601a..28609f7f 100644 --- a/src/fiber_manager.cpp +++ b/src/fiber_manager.cpp @@ -102,6 +102,14 @@ void fm_set_sched_algo( sched_algorithm * algo) fm->def_algo_.reset(); } +sched_algorithm* fm_get_sched_algo_() +{ + fiber_manager * fm = detail::scheduler::instance(); + + BOOST_ASSERT( 0 != fm); + return fm->sched_algo_; +} + chrono::high_resolution_clock::time_point fm_next_wakeup() { fiber_manager * fm = detail::scheduler::instance(); @@ -132,16 +140,6 @@ void fm_spawn( detail::worker_fiber * f) fm->sched_algo_->awakened( f); } -void fm_priority( detail::worker_fiber * f, - int prio) BOOST_NOEXCEPT -{ - fiber_manager * fm = detail::scheduler::instance(); - - BOOST_ASSERT( 0 != fm); - - fm->sched_algo_->priority( f, prio); -} - void fm_wait_interval( chrono::high_resolution_clock::duration const& wait_interval) BOOST_NOEXCEPT { fiber_manager * fm = detail::scheduler::instance(); @@ -166,17 +164,17 @@ void fm_run() BOOST_ASSERT( 0 != fm); - // move all fibers witch are ready (state_ready) + // move all fibers which are ready (state_ready) // from waiting-queue to the runnable-queue fm->wqueue_.move_to( fm->sched_algo_, fetch_ready); // pop new fiber from ready-queue which is not complete // (example: fiber in ready-queue could be canceled by active-fiber) - detail::worker_fiber * f( fm->sched_algo_->pick_next() ); + fiber_base * f( fm->sched_algo_->pick_next()); if ( f) { BOOST_ASSERT_MSG( f->is_ready(), "fiber with invalid state in ready-queue"); - fm_resume_( f); + fm_resume_( static_cast(f)); } else { diff --git a/src/properties.cpp b/src/properties.cpp new file mode 100644 index 00000000..226c7441 --- /dev/null +++ b/src/properties.cpp @@ -0,0 +1,28 @@ +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include "boost/fiber/properties.hpp" +#include "boost/fiber/algorithm.hpp" +#include "boost/fiber/fiber_manager.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { + +void fiber_properties::notify() +{ + BOOST_ASSERT(sched_algo_); + sched_algo_->property_change(fiber_, this); +} + +}} // boost::fiber + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif diff --git a/src/round_robin.cpp b/src/round_robin.cpp index bf398da5..f8f019d1 100644 --- a/src/round_robin.cpp +++ b/src/round_robin.cpp @@ -16,32 +16,22 @@ namespace boost { namespace fibers { void -round_robin::awakened( detail::worker_fiber * f) +round_robin::awakened( fiber_base * f) { BOOST_ASSERT( 0 != f); rqueue_.push( f); } -detail::worker_fiber * +fiber_base * round_robin::pick_next() { - detail::worker_fiber * victim = 0; + fiber_base * victim = 0; if ( ! rqueue_.empty() ) victim = rqueue_.pop(); return victim; } -void -round_robin::priority( detail::worker_fiber * f, int prio) BOOST_NOEXCEPT -{ - BOOST_ASSERT( f); - - // set only priority to fiber - // round-robin does not respect priorities - f->priority( prio); -} - }} #ifdef BOOST_HAS_ABI_HEADERS diff --git a/test/test_fiber.cpp b/test/test_fiber.cpp index 7df80941..1fde5f95 100644 --- a/test/test_fiber.cpp +++ b/test/test_fiber.cpp @@ -179,15 +179,6 @@ void test_move() } } -void test_priority() -{ - boost::fibers::fiber f( f1); - BOOST_CHECK_EQUAL( 0, f.priority() ); - f.priority( 7); - BOOST_CHECK_EQUAL( 7, f.priority() ); - f.join(); -} - void test_id() { boost::fibers::fiber s1; @@ -353,7 +344,6 @@ boost::unit_test::test_suite * init_unit_test_suite( int, char* []) test->add( BOOST_TEST_CASE( & test_move) ); test->add( BOOST_TEST_CASE( & test_id) ); - test->add( BOOST_TEST_CASE( & test_priority) ); test->add( BOOST_TEST_CASE( & test_detach) ); test->add( BOOST_TEST_CASE( & test_complete) ); test->add( BOOST_TEST_CASE( & test_join_in_thread) );