2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-12 12:02:54 +00:00

Nat Goodspeed:

- Change some doc references from 'algorithm' to 'sched_algorithm'.
- Initial cut at supporting arbitrary user-coded scheduler properties.
- Set fiber_properties::sched_algo_ every time through awakened().
- Define sched_algorithm methods on fiber_base*, not worker_fiber*.
- Simplify detail::fifo by making tail_ point to last link pointer.
- Reimplement waiting_queue::push() using pointer-to-pointer trick.
- Reimplement waiting_queue::move_to() using fiber_base** scan.
- Make bounded_queue::tail_ a ptr* to simplify appending new nodes.
- Make unbounded_queue::tail_ a ptr* to simplify linking new nodes.
- Remove thread_affinity flag and access methods.
- Re-add thread_affinity specific to workstealing_round_robin.
- Remove 'priority' for every fiber, and its support methods.
This commit is contained in:
Oliver Kowalke
2014-11-12 20:03:12 +01:00
parent 6a1257442b
commit 1e751d7dbf
22 changed files with 391 additions and 279 deletions

View File

@@ -41,6 +41,7 @@ lib boost_fiber
future.cpp
interruption.cpp
mutex.cpp
properties.cpp
recursive_mutex.cpp
recursive_timed_mutex.cpp
round_robin.cpp

View File

@@ -12,47 +12,71 @@
# include BOOST_ABI_PREFIX
#endif
workstealing_round_robin::workstealing_round_robin():
rhead_(0),
rtail_(&rhead_)
{}
void
workstealing_round_robin::awakened( boost::fibers::detail::worker_fiber * f)
workstealing_round_robin::awakened( boost::fibers::fiber_base * f)
{
// forward this call to base-class method
boost::fibers::sched_algorithm_with_properties<affinity>::awakened(f);
boost::mutex::scoped_lock lk( mtx_);
rqueue_.push_back( f);
// append this fiber_base* to ready queue
BOOST_ASSERT(! f->nxt_);
*rtail_ = f;
rtail_ = &f->nxt_;
}
boost::fibers::detail::worker_fiber *
boost::fibers::fiber_base *
workstealing_round_robin::pick_next()
{
boost::mutex::scoped_lock lk( mtx_);
boost::fibers::detail::worker_fiber * f = 0;
if ( ! rqueue_.empty() )
boost::fibers::fiber_base * f = 0;
if ( rhead_ )
{
f = rqueue_.front();
rqueue_.pop_front();
f = rhead_;
// pop head item from ready queue
rhead_ = rhead_->nxt_;
f->nxt_ = 0;
// if that was the last item, reset tail_
if (! rhead_)
rtail_ = &rhead_;
}
return f;
}
void
workstealing_round_robin::priority( boost::fibers::detail::worker_fiber * f, int prio) BOOST_NOEXCEPT
{
BOOST_ASSERT( f);
// set only priority to fiber
// round-robin does not respect priorities
f->priority( prio);
}
boost::fibers::fiber
workstealing_round_robin::steal() BOOST_NOEXCEPT
{
boost::mutex::scoped_lock lk( mtx_);
boost::fibers::detail::worker_fiber * f = 0;
if ( ! rqueue_.empty() )
// Search the queue for the LAST fiber_base that's willing to migrate,
// in other words (! thread_affinity).
boost::fibers::fiber_base ** fp = &rhead_, ** found = 0;
for ( ; *fp; fp = &(*fp)->nxt_)
{
f = rqueue_.back();
rqueue_.pop_back();
// do not consider any fiber whose thread_affinity is set
if (! properties(*fp).thread_affinity)
found = fp;
}
return boost::fibers::fiber( f);
if (! found)
{
// either the queue is completely empty or all current entries have
// thread_affinity set
return boost::fibers::fiber(static_cast<boost::fibers::fiber_base*>(0));
}
// We found at least one fiber_base whose thread_affinity is NOT set;
// *found points to the last of these. Unlink and return it.
boost::fibers::fiber_base* ret = *found;
*found = ret->nxt_;
ret->nxt_ = 0;
// if that was the last item, reset tail_
if (! *found)
rtail_ = &rhead_;
return boost::fibers::fiber(ret);
}
#ifdef BOOST_HAS_ABI_HEADERS

View File

@@ -11,10 +11,8 @@
#include <boost/config.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/fifo.hpp>
#include <boost/fiber/detail/worker_fiber.hpp>
#include <boost/fiber/fiber_manager.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/properties.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@@ -25,20 +23,34 @@
# pragma warning(disable:4251 4275)
# endif
class workstealing_round_robin : public boost::fibers::sched_algorithm
struct affinity: public boost::fibers::fiber_properties
{
affinity(boost::fibers::fiber_properties::back_ptr p):
fiber_properties(p),
// By default, assume a given fiber CAN migrate to another thread.
thread_affinity(false)
{}
bool thread_affinity;
};
class workstealing_round_robin :
public boost::fibers::sched_algorithm_with_properties<affinity>
{
private:
typedef std::deque< boost::fibers::detail::worker_fiber * > rqueue_t;
boost::mutex mtx_;
rqueue_t rqueue_;
// We should package these as a queue class. Better yet, we should
// refactor one of our existing (intrusive) queue classes to support the
// required operations generically. But for now...
boost::fibers::fiber_base *rhead_, **rtail_;
public:
virtual void awakened( boost::fibers::detail::worker_fiber *);
workstealing_round_robin();
virtual boost::fibers::detail::worker_fiber * pick_next();
virtual void awakened( boost::fibers::fiber_base *);
virtual void priority( boost::fibers::detail::worker_fiber *, int) BOOST_NOEXCEPT;
virtual boost::fibers::fiber_base * pick_next();
boost::fibers::fiber steal();
};

View File

@@ -9,6 +9,7 @@
#include <boost/config.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/properties.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/worker_fiber.hpp>
@@ -24,15 +25,68 @@
namespace boost {
namespace fibers {
// hoist fiber_base out of detail namespace into boost::fibers
typedef detail::fiber_base fiber_base;
struct sched_algorithm
{
virtual ~sched_algorithm() {}
virtual void awakened( detail::worker_fiber *) = 0;
virtual void awakened( fiber_base *) = 0;
virtual detail::worker_fiber * pick_next() = 0;
virtual fiber_base * pick_next() = 0;
virtual void priority( detail::worker_fiber *, int) BOOST_NOEXCEPT = 0;
virtual void property_change( fiber_base *, fiber_properties* ) {}
};
namespace detail {
// support sched_algorithm_with_properties::properties(fiber::id)
inline
fiber_base* extract_base(fiber_base::id id) { return id.impl_; }
} // detail
template <class PROPS>
struct sched_algorithm_with_properties: public sched_algorithm
{
public:
typedef sched_algorithm_with_properties<PROPS> super;
// Start every subclass awakened() override with:
// sched_algorithm_with_properties<PROPS>::awakened(fb);
virtual void awakened( fiber_base *fb)
{
detail::worker_fiber* f = static_cast<detail::worker_fiber*>(fb);
if (! f->get_properties())
{
// TODO: would be great if PROPS could be allocated on the new
// fiber's stack somehow
f->set_properties(new PROPS(f));
}
// Set sched_algo_ again every time this fiber becomes READY. That
// handles the case of a fiber migrating to a new thread with a new
// sched_algorithm subclass instance.
f->get_properties()->set_sched_algorithm(this);
}
// used for all internal calls
PROPS& properties(fiber_base* f)
{
return static_cast<PROPS&>(*static_cast<detail::worker_fiber*>(f)->get_properties());
}
// public-facing properties(fiber::id) method in case consumer retains a
// pointer to supplied sched_algorithm_with_properties<PROPS> subclass
PROPS& properties(detail::worker_fiber::id id)
{
return properties(extract(id));
}
private:
// support sched_algorithm_with_properties::properties(fiber::id)
detail::worker_fiber* extract(detail::worker_fiber::id id)
{
return static_cast<detail::worker_fiber*>(detail::extract_base(id));
}
};
}}

View File

@@ -93,7 +93,7 @@ private:
state_t state_;
std::size_t count_;
typename node_type::ptr head_;
typename node_type::ptr tail_;
typename node_type::ptr * tail_;
mutable mutex mtx_;
condition not_empty_cond_;
condition not_full_cond_;
@@ -171,13 +171,8 @@ private:
void push_tail_( typename node_type::ptr new_node)
{
if ( is_empty_() )
head_ = tail_ = new_node;
else
{
tail_->next = new_node;
tail_ = new_node;
}
*tail_ = new_node;
tail_ = &new_node->next;
++count_;
}
@@ -210,7 +205,7 @@ private:
{
typename node_type::ptr old_head = head_;
head_ = old_head->next;
if ( 0 == head_) tail_ = 0;
if ( 0 == head_) tail_ = &head_;
old_head->next = 0;
return old_head;
}
@@ -222,7 +217,7 @@ public:
state_( OPEN),
count_( 0),
head_(),
tail_( head_),
tail_( &head_),
mtx_(),
not_empty_cond_(),
not_full_cond_(),
@@ -240,7 +235,7 @@ public:
state_( OPEN),
count_( 0),
head_(),
tail_( head_),
tail_( &head_),
mtx_(),
not_empty_cond_(),
not_full_cond_(),

View File

@@ -39,6 +39,9 @@ public:
private:
fiber_base * impl_;
// support sched_algorithm_with_properties::properties(fiber::id)
friend fiber_base* extract_base(id);
public:
id() BOOST_NOEXCEPT :
impl_( 0)
@@ -83,7 +86,8 @@ public:
{ return 0 == impl_; }
};
fiber_base()
fiber_base():
nxt_(0)
{}
virtual ~fiber_base() {};
@@ -93,6 +97,9 @@ public:
virtual void set_ready() BOOST_NOEXCEPT = 0;
virtual id get_id() const BOOST_NOEXCEPT = 0;
// for use by sched_algorithm to queue any subclass instance
fiber_base * nxt_;
};
}}}

View File

@@ -30,34 +30,32 @@ class fifo : private noncopyable
public:
fifo() BOOST_NOEXCEPT :
head_( 0),
tail_( 0)
tail_( &head_)
{}
bool empty() const BOOST_NOEXCEPT
{ return 0 == head_; }
void push( worker_fiber * item) BOOST_NOEXCEPT
void push( fiber_base * item) BOOST_NOEXCEPT
{
BOOST_ASSERT( 0 != item);
BOOST_ASSERT( 0 == item->next() );
BOOST_ASSERT( 0 == item->nxt_ );
if ( empty() )
head_ = tail_ = item;
else
{
tail_->next( item);
tail_ = item;
}
// *tail_ holds the null marking the end of the fifo. So we can extend
// the fifo by assigning to *tail_.
*tail_ = item;
// Advance tail_ to point to the new end marker.
tail_ = &item->nxt_;
}
worker_fiber * pop() BOOST_NOEXCEPT
fiber_base * pop() BOOST_NOEXCEPT
{
BOOST_ASSERT( ! empty() );
worker_fiber * item = head_;
head_ = head_->next();
if ( 0 == head_) tail_ = 0;
item->next_reset();
fiber_base * item = head_;
head_ = head_->nxt_;
if ( 0 == head_) tail_ = &head_;
item->nxt_ = 0;
return item;
}
@@ -68,8 +66,13 @@ public:
}
private:
worker_fiber * head_;
worker_fiber * tail_;
// head_ points to the head item, or is null
fiber_base * head_;
// tail_ points to the nxt_ field that contains the null that marks the
// end of the fifo. When the fifo is empty, tail_ points to head_. tail_
// must never be null: it always points to a real fiber_base*. However, in
// normal use, (*tail_) is always null.
fiber_base ** tail_;
};
}}}

View File

@@ -23,8 +23,7 @@ enum flag_t
{
flag_interruption_blocked = 1 << 0,
flag_interruption_requested = 1 << 1,
flag_thread_affinity = 1 << 2,
flag_detached = 1 << 3
flag_detached = 1 << 2
};
}}}

View File

@@ -30,8 +30,7 @@ class waiting_queue : private noncopyable
{
public:
waiting_queue() BOOST_NOEXCEPT :
head_( 0),
tail_( 0)
head_( 0)
{}
bool empty() const BOOST_NOEXCEPT
@@ -40,55 +39,38 @@ public:
void push( worker_fiber * item) BOOST_NOEXCEPT
{
BOOST_ASSERT( 0 != item);
BOOST_ASSERT( 0 == item->next() );
BOOST_ASSERT( 0 == item->nxt_ );
if ( empty() )
head_ = tail_ = item;
else
{
worker_fiber * f = head_, * prev = 0;
do
{
worker_fiber * nxt = f->next();
if ( item->time_point() <= f->time_point() )
{
if ( head_ == f)
{
BOOST_ASSERT( 0 == prev);
// Skip past any worker_fibers in the queue whose time_point() is less
// than item->time_point(), looking for the first worker_fiber in the
// queue whose time_point() is at least item->time_point(). Insert
// item before that. In other words, insert item so as to preserve
// ascending order of time_point() values. (Recall that a worker_fiber
// waiting with no timeout uses the maximum time_point value.)
item->next( f);
head_ = item;
}
else
{
BOOST_ASSERT( 0 != prev);
// We do this by walking the linked list of nxt_ fields with a
// fiber_base**. In other words, first we point to &head_, then to
// &head_->nxt_, then to &head_->nxt_->nxt_ and so forth. When we find
// the item with the right time_point(), we're already pointing to the
// fiber_base* that links it into the list. Insert item right there.
item->next( f);
prev->next( item);
}
break;
}
else if ( tail_ == f)
{
BOOST_ASSERT( 0 == nxt);
fiber_base** f = &head_;
for ( ; *f; f = &(*f)->nxt_)
if (item->time_point() <= (static_cast<worker_fiber*>(*f))->time_point())
break;
tail_->next( item);
tail_ = item;
break;
}
prev = f;
f = nxt;
}
while ( 0 != f);
}
// Here, either we reached the end of the list (! *f) or we found a
// (*f) before which to insert 'item'. Break the link at *f and insert
// item.
item->nxt_ = *f;
*f = item;
}
worker_fiber * top() const BOOST_NOEXCEPT
{
BOOST_ASSERT( ! empty() );
return head_;
return static_cast<worker_fiber*>(head_);
}
template< typename SchedAlgo, typename Fn >
@@ -96,49 +78,41 @@ public:
{
BOOST_ASSERT( sched_algo);
worker_fiber * f = head_, * prev = 0;
chrono::high_resolution_clock::time_point now( chrono::high_resolution_clock::now() );
while ( 0 != f)
// Search the queue for every worker_fiber 'f' for which fn(f, now)
// returns true. Each time we find such a worker_fiber, unlink it from
// the queue and pass it to sched_algo->awakened().
// Search using a fiber_base**, starting at &head_.
for (fiber_base** fp = &head_; *fp; )
{
worker_fiber * nxt = f->next();
if ( fn( f, now) )
worker_fiber *f = static_cast<worker_fiber*>(*fp);
if (! fn(f, now))
{
if ( f == head_)
{
BOOST_ASSERT( 0 == prev);
head_ = nxt;
if ( 0 == head_)
tail_ = 0;
}
else
{
BOOST_ASSERT( 0 != prev);
if ( 0 == nxt)
tail_ = prev;
prev->next( nxt);
}
f->next_reset();
f->time_point_reset();
sched_algo->awakened( f);
// If f does NOT meet caller's criteria, skip fp past it.
fp = &(*fp)->nxt_;
}
else
prev = f;
f = nxt;
{
// Here f satisfies our caller. Unlink it from the list.
*fp = (*fp)->nxt_;
f->nxt_ = 0;
// Pass the newly-unlinked worker_fiber* to sched_algo.
f->time_point_reset();
sched_algo->awakened(f);
}
}
}
void swap( waiting_queue & other)
{
std::swap( head_, other.head_);
std::swap( tail_, other.tail_);
}
private:
worker_fiber * head_;
worker_fiber * tail_;
fiber_base * head_;
};
}}}

View File

@@ -41,6 +41,9 @@
namespace boost {
namespace fibers {
class fiber_properties;
namespace detail {
namespace coro = boost::coroutines;
@@ -89,16 +92,15 @@ private:
atomic< std::size_t > use_count_;
fss_data_t fss_data_;
worker_fiber * nxt_;
chrono::high_resolution_clock::time_point tp_;
coro_t::yield_type * callee_;
coro_t::call_type caller_;
atomic< state_t > state_;
atomic< int > flags_;
atomic< int > priority_;
exception_ptr except_;
spinlock splk_;
std::vector< worker_fiber * > waiting_;
fiber_properties * properties_;
public:
worker_fiber( coro_t::yield_type *);
@@ -108,12 +110,6 @@ public:
id get_id() const BOOST_NOEXCEPT
{ return id( const_cast< worker_fiber * >( this) ); }
int priority() const BOOST_NOEXCEPT
{ return priority_; }
void priority( int prio) BOOST_NOEXCEPT
{ priority_ = prio; }
bool join( worker_fiber *);
bool interruption_blocked() const BOOST_NOEXCEPT
@@ -126,11 +122,6 @@ public:
void request_interruption( bool req) BOOST_NOEXCEPT;
bool thread_affinity() const BOOST_NOEXCEPT
{ return 0 != ( flags_.load() & flag_thread_affinity); }
void thread_affinity( bool req) BOOST_NOEXCEPT;
bool is_terminated() const BOOST_NOEXCEPT
{ return TERMINATED == state_; }
@@ -214,15 +205,6 @@ public:
BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm
}
worker_fiber * next() const BOOST_NOEXCEPT
{ return nxt_; }
void next( worker_fiber * nxt) BOOST_NOEXCEPT
{ nxt_ = nxt; }
void next_reset() BOOST_NOEXCEPT
{ nxt_ = 0; }
chrono::high_resolution_clock::time_point const& time_point() const BOOST_NOEXCEPT
{ return tp_; }
@@ -254,6 +236,13 @@ public:
f->deallocate();
}
}
void set_properties( fiber_properties* props);
fiber_properties* get_properties()
{
return properties_;
}
};
}}}

View File

@@ -26,6 +26,7 @@
#include <boost/fiber/detail/setup.hpp>
#include <boost/fiber/detail/trampoline.hpp>
#include <boost/fiber/detail/worker_fiber.hpp>
#include <boost/fiber/fiber_manager.hpp>
#include <boost/fiber/stack_allocator.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
@@ -107,8 +108,10 @@ public:
impl_()
{}
explicit fiber( detail::worker_fiber * impl) BOOST_NOEXCEPT :
impl_( impl)
// This fiber_base* is allowed to be 0 -- call joinable() before performing
// operations on such a fiber object!
explicit fiber( fiber_base * impl) BOOST_NOEXCEPT :
impl_( dynamic_cast<detail::worker_fiber*>(impl))
{}
#ifdef BOOST_MSVC
@@ -305,19 +308,17 @@ public:
id get_id() const BOOST_NOEXCEPT
{ return 0 != impl_ ? impl_->get_id() : id(); }
int priority() const BOOST_NOEXCEPT;
void priority( int) BOOST_NOEXCEPT;
bool thread_affinity() const BOOST_NOEXCEPT;
void thread_affinity( bool) BOOST_NOEXCEPT;
void detach() BOOST_NOEXCEPT;
void join();
void interrupt() BOOST_NOEXCEPT;
template <class PROPS>
PROPS& properties()
{
return fm_properties<PROPS>(impl_);
}
};
inline

View File

@@ -20,7 +20,7 @@
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/detail/waiting_queue.hpp>
#include <boost/fiber/detail/worker_fiber.hpp>
#include <boost/fiber/fiber.hpp>
//#include <boost/fiber/fiber.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@@ -54,9 +54,9 @@ void fm_resume_( detail::worker_fiber *);
void fm_set_sched_algo( sched_algorithm *);
void fm_spawn( detail::worker_fiber *);
sched_algorithm* fm_get_sched_algo_();
void fm_priority( detail::worker_fiber *, int) BOOST_NOEXCEPT;
void fm_spawn( detail::worker_fiber *);
void fm_wait_interval( chrono::high_resolution_clock::duration const&) BOOST_NOEXCEPT;
template< typename Rep, typename Period >
@@ -98,6 +98,21 @@ chrono::high_resolution_clock::time_point fm_next_wakeup();
void fm_migrate( detail::worker_fiber *);
// implementation for fiber::properties<PROPS>()
template < class PROPS >
PROPS& fm_properties( detail::worker_fiber * f )
{
return dynamic_cast<sched_algorithm_with_properties<PROPS>&>(*fm_get_sched_algo_())
.properties(f);
}
// implementation for this_fiber::properties<PROPS>()
template < class PROPS >
PROPS& fm_properties()
{
return fm_properties<PROPS>(fm_active());
}
}}
# if defined(BOOST_MSVC)

View File

@@ -64,22 +64,13 @@ template< typename Rep, typename Period >
void sleep_for( chrono::duration< Rep, Period > const& timeout_duration)
{ sleep_until( chrono::high_resolution_clock::now() + timeout_duration); }
inline
bool thread_affinity() BOOST_NOEXCEPT
template < class PROPS >
PROPS& properties()
{
return 0 != fibers::fm_active()
? fibers::fm_active()->thread_affinity()
: true;
return fibers::fm_properties<PROPS>();
}
inline
void thread_affinity( bool req) BOOST_NOEXCEPT
{
if ( 0 != fibers::fm_active() )
fibers::fm_active()->thread_affinity( req);
}
}
} // this_fiber
namespace fibers {

View File

@@ -0,0 +1,80 @@
// Copyright Nat Goodspeed 2014.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// Define fiber_properties, a base class from which a library consumer can
// derive a subclass with specific properties important to a user-coded
// scheduler.
#ifndef BOOST_FIBERS_PROPERTIES_HPP
#define BOOST_FIBERS_PROPERTIES_HPP
#include <boost/fiber/detail/config.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
# if defined(BOOST_MSVC)
# pragma warning(push)
# pragma warning(disable:4275)
# endif
namespace boost {
namespace fibers {
namespace detail {
class worker_fiber;
} // detail
struct sched_algorithm;
class fiber_properties
{
protected:
// initialized by constructor
detail::worker_fiber* fiber_;
// set every time this fiber becomes READY
sched_algorithm* sched_algo_;
// Inform the relevant sched_algorithm instance that something important
// has changed, so it can (presumably) adjust its data structures
// accordingly.
void notify();
public:
// fiber_properties, and by implication every subclass, must accept a back
// pointer to its worker_fiber.
typedef detail::worker_fiber* back_ptr;
// Any specific property setter method, after updating the relevant
// instance variable, can/should call notify().
fiber_properties(back_ptr f):
fiber_(f),
sched_algo_(0)
{}
// We need a virtual destructor (hence a vtable) because fiber_properties
// is stored polymorphically (as fiber_properties*) in worker_fiber, and
// destroyed via that pointer.
~fiber_properties() {}
// not really intended for public use, but sched_algorithm_with_properties
// must be able to call this
void set_sched_algorithm(sched_algorithm* algo)
{
sched_algo_ = algo;
}
};
}} // namespace boost::fibers
# if defined(BOOST_MSVC)
# pragma warning(pop)
# endif
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_PROPERTIES_HPP

View File

@@ -33,11 +33,9 @@ private:
rqueue_t rqueue_;
public:
virtual void awakened( detail::worker_fiber *);
virtual void awakened( fiber_base *);
virtual detail::worker_fiber * pick_next();
virtual void priority( detail::worker_fiber *, int) BOOST_NOEXCEPT;
virtual fiber_base * pick_next();
};
}}

View File

@@ -90,7 +90,7 @@ private:
state state_;
typename node_type::ptr head_;
typename node_type::ptr tail_;
typename node_type::ptr * tail_;
mutable mutex mtx_;
condition not_empty_cond_;
@@ -132,13 +132,8 @@ private:
void push_tail_( typename node_type::ptr new_node)
{
if ( is_empty_() )
head_ = tail_ = new_node;
else
{
tail_->next = new_node;
tail_ = new_node;
}
*tail_ = new_node;
tail_ = &new_node->next;
}
value_type value_pop_()
@@ -161,7 +156,7 @@ private:
{
typename node_type::ptr old_head = head_;
head_ = old_head->next;
if ( 0 == head_) tail_ = 0;
if ( 0 == head_) tail_ = &head_;
old_head->next = 0;
return old_head;
}
@@ -170,7 +165,7 @@ public:
unbounded_queue() :
state_( OPEN),
head_(),
tail_( head_),
tail_( &head_),
mtx_(),
not_empty_cond_()
{}

View File

@@ -14,6 +14,7 @@
#include "boost/fiber/detail/scheduler.hpp"
#include "boost/fiber/exceptions.hpp"
#include "boost/fiber/properties.hpp"
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@@ -29,21 +30,22 @@ worker_fiber::worker_fiber( coro_t::yield_type * callee) :
fiber_base(),
use_count_( 1), // allocated on stack
fss_data_(),
nxt_( 0),
tp_( (chrono::high_resolution_clock::time_point::max)() ),
callee_( callee),
caller_(),
state_( READY),
flags_( 0),
priority_( 0),
except_(),
waiting_()
waiting_(),
properties_(0)
{ BOOST_ASSERT( callee_); }
worker_fiber::~worker_fiber()
{
BOOST_ASSERT( is_terminated() );
BOOST_ASSERT( waiting_.empty() );
delete properties_;
}
void
@@ -95,15 +97,6 @@ worker_fiber::request_interruption( bool req) BOOST_NOEXCEPT
flags_ &= ~flag_interruption_requested;
}
void
worker_fiber::thread_affinity( bool req) BOOST_NOEXCEPT
{
if ( req)
flags_ |= flag_thread_affinity;
else
flags_ &= ~flag_thread_affinity;
}
void *
worker_fiber::get_fss_data( void const* vp) const
{
@@ -142,6 +135,13 @@ worker_fiber::set_fss_data(
fss_data( data, cleanup_fn) ) );
}
void
worker_fiber::set_properties( fiber_properties* props)
{
delete properties_;
properties_ = props;
}
}}}
#ifdef BOOST_HAS_ABI_HEADERS

View File

@@ -28,38 +28,6 @@ fiber::start_()
fm_spawn( impl_.get() );
}
int
fiber::priority() const BOOST_NOEXCEPT
{
BOOST_ASSERT( impl_);
return impl_->priority();
}
void
fiber::priority( int prio) BOOST_NOEXCEPT
{
BOOST_ASSERT( impl_);
fm_priority( impl_.get(), prio);
}
bool
fiber::thread_affinity() const BOOST_NOEXCEPT
{
BOOST_ASSERT( impl_);
return impl_->thread_affinity();
}
void
fiber::thread_affinity( bool req) BOOST_NOEXCEPT
{
BOOST_ASSERT( impl_);
impl_->thread_affinity( req);
}
void
fiber::join()
{

View File

@@ -102,6 +102,14 @@ void fm_set_sched_algo( sched_algorithm * algo)
fm->def_algo_.reset();
}
sched_algorithm* fm_get_sched_algo_()
{
fiber_manager * fm = detail::scheduler::instance();
BOOST_ASSERT( 0 != fm);
return fm->sched_algo_;
}
chrono::high_resolution_clock::time_point fm_next_wakeup()
{
fiber_manager * fm = detail::scheduler::instance();
@@ -132,16 +140,6 @@ void fm_spawn( detail::worker_fiber * f)
fm->sched_algo_->awakened( f);
}
void fm_priority( detail::worker_fiber * f,
int prio) BOOST_NOEXCEPT
{
fiber_manager * fm = detail::scheduler::instance();
BOOST_ASSERT( 0 != fm);
fm->sched_algo_->priority( f, prio);
}
void fm_wait_interval( chrono::high_resolution_clock::duration const& wait_interval) BOOST_NOEXCEPT
{
fiber_manager * fm = detail::scheduler::instance();
@@ -166,17 +164,17 @@ void fm_run()
BOOST_ASSERT( 0 != fm);
// move all fibers witch are ready (state_ready)
// move all fibers which are ready (state_ready)
// from waiting-queue to the runnable-queue
fm->wqueue_.move_to( fm->sched_algo_, fetch_ready);
// pop new fiber from ready-queue which is not complete
// (example: fiber in ready-queue could be canceled by active-fiber)
detail::worker_fiber * f( fm->sched_algo_->pick_next() );
fiber_base * f( fm->sched_algo_->pick_next());
if ( f)
{
BOOST_ASSERT_MSG( f->is_ready(), "fiber with invalid state in ready-queue");
fm_resume_( f);
fm_resume_( static_cast<detail::worker_fiber*>(f));
}
else
{

28
src/properties.cpp Normal file
View File

@@ -0,0 +1,28 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <boost/assert.hpp>
#include "boost/fiber/properties.hpp"
#include "boost/fiber/algorithm.hpp"
#include "boost/fiber/fiber_manager.hpp"
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
void fiber_properties::notify()
{
BOOST_ASSERT(sched_algo_);
sched_algo_->property_change(fiber_, this);
}
}} // boost::fiber
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif

View File

@@ -16,32 +16,22 @@ namespace boost {
namespace fibers {
void
round_robin::awakened( detail::worker_fiber * f)
round_robin::awakened( fiber_base * f)
{
BOOST_ASSERT( 0 != f);
rqueue_.push( f);
}
detail::worker_fiber *
fiber_base *
round_robin::pick_next()
{
detail::worker_fiber * victim = 0;
fiber_base * victim = 0;
if ( ! rqueue_.empty() )
victim = rqueue_.pop();
return victim;
}
void
round_robin::priority( detail::worker_fiber * f, int prio) BOOST_NOEXCEPT
{
BOOST_ASSERT( f);
// set only priority to fiber
// round-robin does not respect priorities
f->priority( prio);
}
}}
#ifdef BOOST_HAS_ABI_HEADERS

View File

@@ -179,15 +179,6 @@ void test_move()
}
}
void test_priority()
{
boost::fibers::fiber f( f1);
BOOST_CHECK_EQUAL( 0, f.priority() );
f.priority( 7);
BOOST_CHECK_EQUAL( 7, f.priority() );
f.join();
}
void test_id()
{
boost::fibers::fiber s1;
@@ -353,7 +344,6 @@ boost::unit_test::test_suite * init_unit_test_suite( int, char* [])
test->add( BOOST_TEST_CASE( & test_move) );
test->add( BOOST_TEST_CASE( & test_id) );
test->add( BOOST_TEST_CASE( & test_priority) );
test->add( BOOST_TEST_CASE( & test_detach) );
test->add( BOOST_TEST_CASE( & test_complete) );
test->add( BOOST_TEST_CASE( & test_join_in_thread) );