diff --git a/examples/cpp03/migration/workstealing_round_robin.cpp b/examples/cpp03/migration/workstealing_round_robin.cpp index 0ad78ee4..78e56191 100644 --- a/examples/cpp03/migration/workstealing_round_robin.cpp +++ b/examples/cpp03/migration/workstealing_round_robin.cpp @@ -12,11 +12,22 @@ # include BOOST_ABI_PREFIX #endif +workstealing_round_robin::workstealing_round_robin(): + rhead_(0), + rtail_(&rhead_) +{} + void workstealing_round_robin::awakened( boost::fibers::fiber_base * f) { + // forward this call to base-class method + boost::fibers::sched_algorithm_with_properties::awakened(f); + boost::mutex::scoped_lock lk( mtx_); - rqueue_.push_back( f); + // append this fiber_base* to ready queue + BOOST_ASSERT(! f->nxt_); + *rtail_ = f; + rtail_ = &f->nxt_; } boost::fibers::fiber_base * @@ -24,10 +35,15 @@ workstealing_round_robin::pick_next() { boost::mutex::scoped_lock lk( mtx_); boost::fibers::fiber_base * f = 0; - if ( ! rqueue_.empty() ) + if ( rhead_ ) { - f = rqueue_.front(); - rqueue_.pop_front(); + f = rhead_; + // pop head item from ready queue + rhead_ = rhead_->nxt_; + f->nxt_ = 0; + // if that was the last item, reset tail_ + if (! rhead_) + rtail_ = &rhead_; } return f; } @@ -46,13 +62,31 @@ boost::fibers::fiber workstealing_round_robin::steal() BOOST_NOEXCEPT { boost::mutex::scoped_lock lk( mtx_); - boost::fibers::fiber_base * f = 0; - if ( ! rqueue_.empty() ) + + // Search the queue for the LAST fiber_base that's willing to migrate, + // in other words (! thread_affinity). + boost::fibers::fiber_base ** fp = &rhead_, ** found = 0; + for ( ; *fp; fp = &(*fp)->nxt_) { - f = rqueue_.back(); - rqueue_.pop_back(); + // do not consider any fiber whose thread_affinity is set + if (! properties(*fp).thread_affinity) + found = fp; } - return boost::fibers::fiber( f); + if (! found) + { + // either the queue is completely empty or all current entries have + // thread_affinity set + return boost::fibers::fiber(static_cast(0)); + } + // We found at least one fiber_base whose thread_affinity is NOT set; + // *found points to the last of these. Unlink and return it. + boost::fibers::fiber_base* ret = *found; + *found = ret->nxt_; + ret->nxt_ = 0; + // if that was the last item, reset tail_ + if (! *found) + rtail_ = &rhead_; + return boost::fibers::fiber(ret); } #ifdef BOOST_HAS_ABI_HEADERS diff --git a/examples/cpp03/migration/workstealing_round_robin.hpp b/examples/cpp03/migration/workstealing_round_robin.hpp index 7c70fd71..bf9dc1f5 100644 --- a/examples/cpp03/migration/workstealing_round_robin.hpp +++ b/examples/cpp03/migration/workstealing_round_robin.hpp @@ -11,11 +11,8 @@ #include #include -#include -#include -#include #include -#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -26,15 +23,31 @@ # pragma warning(disable:4251 4275) # endif -class workstealing_round_robin : public boost::fibers::sched_algorithm +struct affinity: public boost::fibers::fiber_properties +{ + affinity(boost::fibers::fiber_properties::back_ptr p): + fiber_properties(p), + // By default, assume a given fiber CAN migrate to another thread. + thread_affinity(false) + {} + + bool thread_affinity; +}; + +class workstealing_round_robin : + public boost::fibers::sched_algorithm_with_properties { private: - typedef std::deque< boost::fibers::fiber_base * > rqueue_t; - boost::mutex mtx_; - rqueue_t rqueue_; + + // We should package these as a queue class. Better yet, we should + // refactor one of our existing (intrusive) queue classes to support the + // required operations generically. But for now... + boost::fibers::fiber_base *rhead_, **rtail_; public: + workstealing_round_robin(); + virtual void awakened( boost::fibers::fiber_base *); virtual boost::fibers::fiber_base * pick_next(); diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp index 007b6cd1..80510d1a 100644 --- a/include/boost/fiber/algorithm.hpp +++ b/include/boost/fiber/algorithm.hpp @@ -71,9 +71,9 @@ public: } // used for all internal calls - PROPS& properties(detail::worker_fiber* f) + PROPS& properties(fiber_base* f) { - return static_cast(*f->get_properties()); + return static_cast(*static_cast(f)->get_properties()); } // public-facing properties(fiber::id) method in case consumer retains a diff --git a/include/boost/fiber/properties.hpp b/include/boost/fiber/properties.hpp index e13da8c3..5ae52eab 100644 --- a/include/boost/fiber/properties.hpp +++ b/include/boost/fiber/properties.hpp @@ -45,9 +45,11 @@ protected: public: // fiber_properties, and by implication every subclass, must accept a back - // pointer to its worker_fiber. Any specific property setter method, after - // updating the relevant instance variable, can/should call notify(). - fiber_properties(detail::worker_fiber* f): + // pointer to its worker_fiber. + typedef detail::worker_fiber* back_ptr; + // Any specific property setter method, after updating the relevant + // instance variable, can/should call notify(). + fiber_properties(back_ptr f): fiber_(f), sched_algo_(0) {}