From 757d692cae0d2acdcae623abb60014f2fb946475 Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 11 Nov 2014 16:15:25 -0500 Subject: [PATCH] Re-add thread_affinity specific to workstealing_round_robin. thread_affinity is a good example of a property relevant only to a particular sched_algorithm implementation. In examples/cpp03/migration, introduce an 'affinity' subclass of fiber_properties with a thread_affinity data member. Derive workstealing_round_robin from sched_algorithm_with_properties and, as required by that base class, forward awakened() calls to base-class awakened() method. Reimplement workstealing_round_robin's queue from a std::deque to a "by hand" intrusive singly-linked list so we can efficiently remove an arbitrary item. Make steal() method, instead of always popping the last item, scan the list to find the last item willing to migrate (! thread_affinity). From examples/cpp03/migration/workstealing_round_robin.hpp, an example of a user-supplied sched_algorithm implementation, remove all boost/fiber/detail #includes. These should no longer be needed. Change sched_algorithm_with_properties::properties(worker_fiber*) method to accept fiber_base* instead. The original signature was introduced when every sched_algorithm implementation necessarily manipulated worker_fiber* pointers. Now we're intentionally avoiding the need. For the same reason, introduce a fiber_properties::back_ptr typedef so subclasses can opaquely pass such pointers through their own constructor to the base-class constructor. --- .../migration/workstealing_round_robin.cpp | 52 +++++++++++++++---- .../migration/workstealing_round_robin.hpp | 29 ++++++++--- include/boost/fiber/algorithm.hpp | 4 +- include/boost/fiber/properties.hpp | 8 +-- 4 files changed, 71 insertions(+), 22 deletions(-) diff --git a/examples/cpp03/migration/workstealing_round_robin.cpp b/examples/cpp03/migration/workstealing_round_robin.cpp index 0ad78ee4..78e56191 100644 --- a/examples/cpp03/migration/workstealing_round_robin.cpp +++ b/examples/cpp03/migration/workstealing_round_robin.cpp @@ -12,11 +12,22 @@ # include BOOST_ABI_PREFIX #endif +workstealing_round_robin::workstealing_round_robin(): + rhead_(0), + rtail_(&rhead_) +{} + void workstealing_round_robin::awakened( boost::fibers::fiber_base * f) { + // forward this call to base-class method + boost::fibers::sched_algorithm_with_properties::awakened(f); + boost::mutex::scoped_lock lk( mtx_); - rqueue_.push_back( f); + // append this fiber_base* to ready queue + BOOST_ASSERT(! f->nxt_); + *rtail_ = f; + rtail_ = &f->nxt_; } boost::fibers::fiber_base * @@ -24,10 +35,15 @@ workstealing_round_robin::pick_next() { boost::mutex::scoped_lock lk( mtx_); boost::fibers::fiber_base * f = 0; - if ( ! rqueue_.empty() ) + if ( rhead_ ) { - f = rqueue_.front(); - rqueue_.pop_front(); + f = rhead_; + // pop head item from ready queue + rhead_ = rhead_->nxt_; + f->nxt_ = 0; + // if that was the last item, reset tail_ + if (! rhead_) + rtail_ = &rhead_; } return f; } @@ -46,13 +62,31 @@ boost::fibers::fiber workstealing_round_robin::steal() BOOST_NOEXCEPT { boost::mutex::scoped_lock lk( mtx_); - boost::fibers::fiber_base * f = 0; - if ( ! rqueue_.empty() ) + + // Search the queue for the LAST fiber_base that's willing to migrate, + // in other words (! thread_affinity). + boost::fibers::fiber_base ** fp = &rhead_, ** found = 0; + for ( ; *fp; fp = &(*fp)->nxt_) { - f = rqueue_.back(); - rqueue_.pop_back(); + // do not consider any fiber whose thread_affinity is set + if (! properties(*fp).thread_affinity) + found = fp; } - return boost::fibers::fiber( f); + if (! found) + { + // either the queue is completely empty or all current entries have + // thread_affinity set + return boost::fibers::fiber(static_cast(0)); + } + // We found at least one fiber_base whose thread_affinity is NOT set; + // *found points to the last of these. Unlink and return it. + boost::fibers::fiber_base* ret = *found; + *found = ret->nxt_; + ret->nxt_ = 0; + // if that was the last item, reset tail_ + if (! *found) + rtail_ = &rhead_; + return boost::fibers::fiber(ret); } #ifdef BOOST_HAS_ABI_HEADERS diff --git a/examples/cpp03/migration/workstealing_round_robin.hpp b/examples/cpp03/migration/workstealing_round_robin.hpp index 7c70fd71..bf9dc1f5 100644 --- a/examples/cpp03/migration/workstealing_round_robin.hpp +++ b/examples/cpp03/migration/workstealing_round_robin.hpp @@ -11,11 +11,8 @@ #include #include -#include -#include -#include #include -#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -26,15 +23,31 @@ # pragma warning(disable:4251 4275) # endif -class workstealing_round_robin : public boost::fibers::sched_algorithm +struct affinity: public boost::fibers::fiber_properties +{ + affinity(boost::fibers::fiber_properties::back_ptr p): + fiber_properties(p), + // By default, assume a given fiber CAN migrate to another thread. + thread_affinity(false) + {} + + bool thread_affinity; +}; + +class workstealing_round_robin : + public boost::fibers::sched_algorithm_with_properties { private: - typedef std::deque< boost::fibers::fiber_base * > rqueue_t; - boost::mutex mtx_; - rqueue_t rqueue_; + + // We should package these as a queue class. Better yet, we should + // refactor one of our existing (intrusive) queue classes to support the + // required operations generically. But for now... + boost::fibers::fiber_base *rhead_, **rtail_; public: + workstealing_round_robin(); + virtual void awakened( boost::fibers::fiber_base *); virtual boost::fibers::fiber_base * pick_next(); diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp index 007b6cd1..80510d1a 100644 --- a/include/boost/fiber/algorithm.hpp +++ b/include/boost/fiber/algorithm.hpp @@ -71,9 +71,9 @@ public: } // used for all internal calls - PROPS& properties(detail::worker_fiber* f) + PROPS& properties(fiber_base* f) { - return static_cast(*f->get_properties()); + return static_cast(*static_cast(f)->get_properties()); } // public-facing properties(fiber::id) method in case consumer retains a diff --git a/include/boost/fiber/properties.hpp b/include/boost/fiber/properties.hpp index e13da8c3..5ae52eab 100644 --- a/include/boost/fiber/properties.hpp +++ b/include/boost/fiber/properties.hpp @@ -45,9 +45,11 @@ protected: public: // fiber_properties, and by implication every subclass, must accept a back - // pointer to its worker_fiber. Any specific property setter method, after - // updating the relevant instance variable, can/should call notify(). - fiber_properties(detail::worker_fiber* f): + // pointer to its worker_fiber. + typedef detail::worker_fiber* back_ptr; + // Any specific property setter method, after updating the relevant + // instance variable, can/should call notify(). + fiber_properties(back_ptr f): fiber_(f), sched_algo_(0) {}