2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-14 12:42:28 +00:00

use symmetric_coroutine<>::yield_type::operator()()

This commit is contained in:
Oliver Kowalke
2014-03-15 19:16:13 +01:00
parent 8884e791b1
commit 9f7f74f662
14 changed files with 256 additions and 258 deletions

View File

@@ -28,8 +28,44 @@
namespace boost {
namespace fibers {
detail::worker_fiber::ptr_t
round_robin::pick_next_()
{
detail::worker_fiber::ptr_t victim;
if ( ! rqueue_.empty() )
{
victim.swap( rqueue_.front() );
rqueue_.pop_front();
}
return victim;
}
void
round_robin::resume_( detail::worker_fiber::ptr_t const& f)
{
BOOST_ASSERT( f);
BOOST_ASSERT( f->is_ready() );
// store active fiber in local var
detail::worker_fiber::ptr_t tmp = active_fiber_;
// assign new fiber to active fiber
active_fiber_ = f;
// set active fiber to state_running
active_fiber_->set_running();
// check if active-fiber calls itself
// this might happend if fiber calls yield() and no
// other fiber is in the ready-queue
if ( tmp != active_fiber_)
// resume active-fiber == start or yield to
active_fiber_->resume( tmp.get() );
//BOOST_ASSERT( f == active_fiber_);
// reset active fiber to previous
active_fiber_ = tmp;
}
round_robin::round_robin() BOOST_NOEXCEPT :
active_fiber_(),
active_fiber_( 0),
wqueue_(),
rqueue_(),
mn_()
@@ -52,66 +88,55 @@ round_robin::spawn( detail::worker_fiber::ptr_t const& f)
BOOST_ASSERT( f);
BOOST_ASSERT( f->is_ready() );
// store active fiber in local var
detail::worker_fiber::ptr_t tmp = active_fiber_;
// assign new fiber to active fiber
active_fiber_ = f;
// set active fiber to state_running
active_fiber_->set_running();
// resume active fiber
active_fiber_->resume();
// fiber is resumed
BOOST_ASSERT( f == active_fiber_);
// reset active fiber to previous
active_fiber_ = tmp;
rqueue_.push_back( f);
//resume_( f);
}
bool
void
round_robin::run()
{
wqueue_t wqueue;
// move all fibers witch are ready (state_ready)
// from waiting-queue to the runnable-queue
for ( wqueue_t::iterator it = wqueue_.begin(), end = wqueue_.end(); it != end; ++it )
for (;;)
{
detail::worker_fiber::ptr_t f( it->f);
BOOST_ASSERT( ! f->is_running() );
BOOST_ASSERT( ! f->is_terminated() );
// set fiber to state_ready if dead-line was reached
// set fiber to state_ready if interruption was requested
if ( it->tp <= clock_type::now() || f->interruption_requested() )
f->set_ready();
if ( f->is_ready() )
rqueue_.push_back( f);
else wqueue.push( * it);
}
// exchange local with global waiting queue
std::swap( wqueue_, wqueue);
// pop new fiber from ready-queue which is not complete
// (example: fiber in ready-queue could be canceled by active-fiber)
detail::worker_fiber::ptr_t f;
do
{
if ( rqueue_.empty() )
// move all fibers witch are ready (state_ready)
// from waiting-queue to the runnable-queue
wqueue_t wqueue;
for ( wqueue_t::iterator it = wqueue_.begin(), end = wqueue_.end(); it != end; ++it )
{
this_thread::yield();
return false;
detail::worker_fiber::ptr_t f( it->f);
BOOST_ASSERT( ! f->is_running() );
BOOST_ASSERT( ! f->is_terminated() );
// set fiber to state_ready if dead-line was reached
// set fiber to state_ready if interruption was requested
if ( it->tp <= clock_type::now() || f->interruption_requested() )
f->set_ready();
if ( f->is_ready() )
rqueue_.push_back( f);
else wqueue.push( * it);
}
// exchange local with global waiting queue
std::swap( wqueue_, wqueue);
// pop new fiber from ready-queue which is not complete
// (example: fiber in ready-queue could be canceled by active-fiber)
detail::worker_fiber::ptr_t f = pick_next_();
if ( f)
{
BOOST_ASSERT_MSG( f->is_ready(), "fiber with invalid state in ready-queue");
resume_( f);
return;
}
else
{
if ( active_fiber_)
active_fiber_->suspend();
else
this_thread::yield();
return;
}
f.swap( rqueue_.front() );
rqueue_.pop_front();
if ( f->is_ready() ) break;
else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue");
}
while ( true);
// resume fiber
spawn( f);
return true;
}
void
@@ -131,9 +156,8 @@ round_robin::wait_until( clock_type::time_point const& timeout_time,
lk.unlock();
// push active fiber to wqueue_
wqueue_.push( schedulable( active_fiber_, timeout_time) );
// suspend active fiber
active_fiber_->suspend();
// fiber is resumed
// run next fiber
run();
return clock_type::now() < timeout_time;
}
@@ -148,9 +172,8 @@ round_robin::yield()
active_fiber_->set_ready();
// push active fiber to wqueue_
wqueue_.push( schedulable( active_fiber_) );
// suspend acitive fiber
active_fiber_->suspend();
// fiber is resumed
// run next fiber
run();
}
void
@@ -171,18 +194,15 @@ round_robin::join( detail::worker_fiber::ptr_t const& f)
// active fiber to state_ready
// FIXME: better state_running and no suspend
active_fiber_->set_ready();
// suspend fiber until f terminates
active_fiber_->suspend();
// fiber is resumed by f
// run next fiber
run();
}
else
{
while ( ! f->is_terminated() )
{
// yield this thread if scheduler did not
// resumed some fibers in the previous round
if ( ! run() ) this_thread::yield();
}
run();
}
BOOST_ASSERT( f->is_terminated() );