2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-11 23:52:29 +00:00

Re-add thread_affinity specific to workstealing_round_robin.

thread_affinity is a good example of a property relevant only to a particular
sched_algorithm implementation. In examples/cpp03/migration, introduce an
'affinity' subclass of fiber_properties with a thread_affinity data
member.

Derive workstealing_round_robin from sched_algorithm_with_properties<affinity>
and, as required by that base class, forward awakened() calls to base-class
awakened() method.

Reimplement workstealing_round_robin's queue from a std::deque to a "by hand"
intrusive singly-linked list so we can efficiently remove an arbitrary item.
Make steal() method, instead of always popping the last item, scan the list to
find the last item willing to migrate (! thread_affinity).

From examples/cpp03/migration/workstealing_round_robin.hpp, an example of a
user-supplied sched_algorithm implementation, remove all boost/fiber/detail
 #includes. These should no longer be needed.

Change sched_algorithm_with_properties::properties(worker_fiber*) method to
accept fiber_base* instead. The original signature was introduced when every
sched_algorithm implementation necessarily manipulated worker_fiber* pointers.
Now we're intentionally avoiding the need.

For the same reason, introduce a fiber_properties::back_ptr typedef so
subclasses can opaquely pass such pointers through their own constructor to
the base-class constructor.
This commit is contained in:
Nat Goodspeed
2014-11-11 16:15:25 -05:00
parent 3cb5b2a341
commit 757d692cae
4 changed files with 71 additions and 22 deletions

View File

@@ -12,11 +12,22 @@
# include BOOST_ABI_PREFIX
#endif
workstealing_round_robin::workstealing_round_robin():
rhead_(0),
rtail_(&rhead_)
{}
void
workstealing_round_robin::awakened( boost::fibers::fiber_base * f)
{
// forward this call to base-class method
boost::fibers::sched_algorithm_with_properties<affinity>::awakened(f);
boost::mutex::scoped_lock lk( mtx_);
rqueue_.push_back( f);
// append this fiber_base* to ready queue
BOOST_ASSERT(! f->nxt_);
*rtail_ = f;
rtail_ = &f->nxt_;
}
boost::fibers::fiber_base *
@@ -24,10 +35,15 @@ workstealing_round_robin::pick_next()
{
boost::mutex::scoped_lock lk( mtx_);
boost::fibers::fiber_base * f = 0;
if ( ! rqueue_.empty() )
if ( rhead_ )
{
f = rqueue_.front();
rqueue_.pop_front();
f = rhead_;
// pop head item from ready queue
rhead_ = rhead_->nxt_;
f->nxt_ = 0;
// if that was the last item, reset tail_
if (! rhead_)
rtail_ = &rhead_;
}
return f;
}
@@ -46,13 +62,31 @@ boost::fibers::fiber
workstealing_round_robin::steal() BOOST_NOEXCEPT
{
boost::mutex::scoped_lock lk( mtx_);
boost::fibers::fiber_base * f = 0;
if ( ! rqueue_.empty() )
// Search the queue for the LAST fiber_base that's willing to migrate,
// in other words (! thread_affinity).
boost::fibers::fiber_base ** fp = &rhead_, ** found = 0;
for ( ; *fp; fp = &(*fp)->nxt_)
{
f = rqueue_.back();
rqueue_.pop_back();
// do not consider any fiber whose thread_affinity is set
if (! properties(*fp).thread_affinity)
found = fp;
}
return boost::fibers::fiber( f);
if (! found)
{
// either the queue is completely empty or all current entries have
// thread_affinity set
return boost::fibers::fiber(static_cast<boost::fibers::fiber_base*>(0));
}
// We found at least one fiber_base whose thread_affinity is NOT set;
// *found points to the last of these. Unlink and return it.
boost::fibers::fiber_base* ret = *found;
*found = ret->nxt_;
ret->nxt_ = 0;
// if that was the last item, reset tail_
if (! *found)
rtail_ = &rhead_;
return boost::fibers::fiber(ret);
}
#ifdef BOOST_HAS_ABI_HEADERS

View File

@@ -11,11 +11,8 @@
#include <boost/config.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/fifo.hpp>
#include <boost/fiber/detail/worker_fiber.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/fiber_manager.hpp>
#include <boost/fiber/properties.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@@ -26,15 +23,31 @@
# pragma warning(disable:4251 4275)
# endif
class workstealing_round_robin : public boost::fibers::sched_algorithm
struct affinity: public boost::fibers::fiber_properties
{
affinity(boost::fibers::fiber_properties::back_ptr p):
fiber_properties(p),
// By default, assume a given fiber CAN migrate to another thread.
thread_affinity(false)
{}
bool thread_affinity;
};
class workstealing_round_robin :
public boost::fibers::sched_algorithm_with_properties<affinity>
{
private:
typedef std::deque< boost::fibers::fiber_base * > rqueue_t;
boost::mutex mtx_;
rqueue_t rqueue_;
// We should package these as a queue class. Better yet, we should
// refactor one of our existing (intrusive) queue classes to support the
// required operations generically. But for now...
boost::fibers::fiber_base *rhead_, **rtail_;
public:
workstealing_round_robin();
virtual void awakened( boost::fibers::fiber_base *);
virtual boost::fibers::fiber_base * pick_next();

View File

@@ -71,9 +71,9 @@ public:
}
// used for all internal calls
PROPS& properties(detail::worker_fiber* f)
PROPS& properties(fiber_base* f)
{
return static_cast<PROPS&>(*f->get_properties());
return static_cast<PROPS&>(*static_cast<detail::worker_fiber*>(f)->get_properties());
}
// public-facing properties(fiber::id) method in case consumer retains a

View File

@@ -45,9 +45,11 @@ protected:
public:
// fiber_properties, and by implication every subclass, must accept a back
// pointer to its worker_fiber. Any specific property setter method, after
// updating the relevant instance variable, can/should call notify().
fiber_properties(detail::worker_fiber* f):
// pointer to its worker_fiber.
typedef detail::worker_fiber* back_ptr;
// Any specific property setter method, after updating the relevant
// instance variable, can/should call notify().
fiber_properties(back_ptr f):
fiber_(f),
sched_algo_(0)
{}