mirror of
https://github.com/boostorg/fiber.git
synced 2026-02-02 08:52:07 +00:00
some modifications -> wqueue loop
This commit is contained in:
@@ -29,8 +29,6 @@ namespace fibers {
|
||||
|
||||
struct BOOST_FIBERS_DECL algorithm : private noncopyable
|
||||
{
|
||||
virtual void spawn( detail::fiber_base::ptr_t const&) = 0;
|
||||
|
||||
virtual void priority( detail::fiber_base::ptr_t const&, int) = 0;
|
||||
|
||||
virtual void join( detail::fiber_base::ptr_t const&) = 0;
|
||||
|
||||
@@ -176,6 +176,17 @@ public:
|
||||
flags_ &= ~flag_interruption_requested;
|
||||
}
|
||||
|
||||
bool wake_up() const BOOST_NOEXCEPT
|
||||
{ return 0 != ( flags_ & flag_wake_up); }
|
||||
|
||||
void wake_up( bool req) BOOST_NOEXCEPT
|
||||
{
|
||||
if ( req)
|
||||
flags_ |= flag_wake_up;
|
||||
else
|
||||
flags_ &= ~flag_wake_up;
|
||||
}
|
||||
|
||||
bool is_terminated() const BOOST_NOEXCEPT
|
||||
{ return state_terminated == state_; }
|
||||
|
||||
@@ -228,7 +239,7 @@ public:
|
||||
}
|
||||
#endif
|
||||
state_t previous = state_.exchange( state_ready, memory_order_seq_cst);
|
||||
BOOST_ASSERT( state_waiting == previous || state_running == previous);
|
||||
BOOST_ASSERT( state_waiting == previous || state_running == previous || state_ready == previous);
|
||||
}
|
||||
|
||||
void set_running() BOOST_NOEXCEPT
|
||||
|
||||
@@ -25,7 +25,8 @@ enum flag_t
|
||||
flag_unwind_stack = 1 << 2,
|
||||
flag_preserve_fpu = 1 << 3,
|
||||
flag_interruption_blocked = 1 << 4,
|
||||
flag_interruption_requested = 1 << 5
|
||||
flag_interruption_requested = 1 << 5,
|
||||
flag_wake_up = 1 << 6
|
||||
};
|
||||
|
||||
}}}
|
||||
|
||||
@@ -49,8 +49,6 @@ public:
|
||||
|
||||
~round_robin() BOOST_NOEXCEPT;
|
||||
|
||||
void spawn( detail::fiber_base::ptr_t const&);
|
||||
|
||||
void priority( detail::fiber_base::ptr_t const&, int);
|
||||
|
||||
void join( detail::fiber_base::ptr_t const&);
|
||||
|
||||
@@ -61,10 +61,7 @@ fiber_base::release()
|
||||
unique_lock< spinlock > lk( joining_mtx_);
|
||||
BOOST_FOREACH( fiber_base::ptr_t & p, joining_)
|
||||
{
|
||||
// active fiber migth join this fiber
|
||||
// therefore do not set to state_ready
|
||||
if ( ! p->is_running() )
|
||||
p->set_ready();
|
||||
p->wake_up( true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -54,90 +54,30 @@ round_robin::~round_robin()
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
round_robin::spawn( detail::fiber_base::ptr_t const& f)
|
||||
{
|
||||
BOOST_ASSERT( f);
|
||||
BOOST_ASSERT( ! f->is_terminated() );
|
||||
BOOST_ASSERT( f != active_fiber_);
|
||||
|
||||
detail::fiber_base::ptr_t tmp = active_fiber_;
|
||||
try
|
||||
{
|
||||
active_fiber_ = f;
|
||||
active_fiber_->set_running();
|
||||
active_fiber_->resume();
|
||||
// after return from fiber::resume() the fiber is:
|
||||
// ready -> already in requeue_
|
||||
// waiting -> already in wqueue_
|
||||
// terminated -> not stored in round_robin/will be deleted
|
||||
// call terminate() in order to release
|
||||
// joining fibers
|
||||
if ( f->is_terminated() )
|
||||
f->release();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
active_fiber_ = tmp;
|
||||
throw;
|
||||
}
|
||||
active_fiber_ = tmp;
|
||||
}
|
||||
|
||||
bool
|
||||
round_robin::run()
|
||||
{
|
||||
#if 0
|
||||
// stable-sort has n*log(n) complexity if n*log(n) extra space is available
|
||||
std::size_t n = wqueue_.size();
|
||||
if ( 1 < n)
|
||||
{
|
||||
std::size_t new_capacity = n * std::log10( n) + n;
|
||||
if ( wqueue_.capacity() < new_capacity)
|
||||
wqueue_.reserve( new_capacity);
|
||||
|
||||
// sort wqueue_ depending on state
|
||||
wqueue_.sort();
|
||||
}
|
||||
|
||||
// check which waiting fiber should be interrupted
|
||||
// make it ready and add it to rqueue_
|
||||
std::pair< wqueue_t::iterator, wqueue_t::iterator > p =
|
||||
wqueue_.equal_range( detail::state_waiting);
|
||||
for ( wqueue_t::iterator i = p.first; i != p.second; ++i)
|
||||
{
|
||||
if ( ( * i)->interruption_requested() )
|
||||
( * i)->set_ready();
|
||||
}
|
||||
|
||||
// copy all ready fibers to rqueue_
|
||||
// and remove fibers from wqueue_
|
||||
p = wqueue_.equal_range( detail::state_ready);
|
||||
if ( p.first != p.second)
|
||||
{
|
||||
unique_lock< detail::spinlock > lk( rqueue_mtx_);
|
||||
rqueue_.insert( rqueue_.end(), p.first, p.second);
|
||||
lk.unlock();
|
||||
wqueue_.erase( p.first, p.second);
|
||||
}
|
||||
#endif
|
||||
// loop over waiting queue
|
||||
wqueue_t wqueue;
|
||||
BOOST_FOREACH( detail::fiber_base::ptr_t const& f, wqueue_)
|
||||
{
|
||||
if ( f->interruption_requested() && ! f->is_ready() )
|
||||
f->set_ready();
|
||||
if ( f->is_ready() )
|
||||
// set fiber to state_ready if interruption was requested
|
||||
if ( f->interruption_requested() || f->wake_up() || f->is_ready() )
|
||||
{
|
||||
f->wake_up( false);
|
||||
f->set_ready();
|
||||
unique_lock< detail::spinlock > lk( rqueue_mtx_);
|
||||
rqueue_.push_back( f);
|
||||
}
|
||||
else
|
||||
// otherwise in the local waiting queue
|
||||
wqueue.push_back( f);
|
||||
}
|
||||
// exchange local with global waiting queue
|
||||
wqueue_.swap( wqueue);
|
||||
|
||||
// pop new fiber from runnable-queue which is not complete
|
||||
// (example: fiber in runnable-queue could be canceled by active-fiber)
|
||||
// pop new fiber from ready-queue which is not complete
|
||||
// (example: fiber in ready-queue could be canceled by active-fiber)
|
||||
detail::fiber_base::ptr_t f;
|
||||
do
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user