2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-16 13:22:17 +00:00

fix using intruisve::list<>

This commit is contained in:
Oliver Kowalke
2015-09-10 18:08:10 +02:00
parent 2173200c14
commit d3843efbe0
17 changed files with 184 additions and 157 deletions

View File

@@ -27,9 +27,9 @@ namespace fibers {
scheduler::scheduler( context * main_context) noexcept :
sched_algo_( new round_robin() ),
main_context_( main_context),
wqueue_(),
tqueue_(),
yqueue_(),
ready_queue_(),
sleep_queue_(),
terminated_queue_(),
wait_interval_( std::chrono::milliseconds( 10) ) {
}
@@ -39,15 +39,15 @@ scheduler::~scheduler() noexcept {
// NOTE: at this stage the fibers in the waiting-queue
// can only be detached fibers
// interrupt all waiting fibers (except main-fiber)
for ( context & f : wqueue_) {
for ( context & f : sleep_queue_) {
f.request_interruption( true);
}
// move all fibers which are ready (state_ready)
// from waiting-queue to the runnable-queue
std::chrono::steady_clock::time_point now(
std::chrono::steady_clock::now() );
wqueue_t::iterator e = wqueue_.end();
for ( wqueue_t::iterator i = wqueue_.begin(); i != e;) {
sleep_queue_t::iterator e = sleep_queue_.end();
for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) {
context * f = & ( * i);
BOOST_ASSERT( ! f->is_running() );
BOOST_ASSERT( ! f->is_terminated() );
@@ -59,12 +59,14 @@ scheduler::~scheduler() noexcept {
}
if ( f->is_ready() ) {
i = wqueue_.erase( i);
// Pass the newly-unlinked context* to sched_algo.
i = sleep_queue_.erase( i);
// unlink after accesing iterator
f->time_point_reset();
BOOST_ASSERT( ! f->state_is_linked() );
BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT( ! f->yield_is_linked() );
// Pass the newly-unlinked context* to sched_algo.
BOOST_ASSERT( ! f->runnable_is_linked() );
BOOST_ASSERT( ! f->ready_is_linked() );
BOOST_ASSERT( ! f->sleep_is_linked() );
//BOOST_ASSERT( ! f->wait_is_linked() );
sched_algo_->awakened( f);
} else {
++i;
@@ -72,10 +74,11 @@ scheduler::~scheduler() noexcept {
}
// pop new fiber from ready-queue
context * f( sched_algo_->pick_next() );
BOOST_ASSERT( ! f->state_is_linked() );
BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT( ! f->yield_is_linked() );
if ( nullptr != f) {
BOOST_ASSERT( ! f->runnable_is_linked() );
BOOST_ASSERT( ! f->ready_is_linked() );
BOOST_ASSERT( ! f->sleep_is_linked() );
//BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT_MSG( f->is_ready(), "fiber with invalid state in ready-queue");
// set scheduler
f->set_scheduler( this);
@@ -86,29 +89,29 @@ scheduler::~scheduler() noexcept {
// set main-fiber to state_waiting
main_context_->set_waiting();
// push main-fiber to waiting-queue
BOOST_ASSERT( ! main_context_->state_is_linked() );
yqueue_.push_back( * main_context_);
BOOST_ASSERT( ! main_context_->runnable_is_linked() );
ready_queue_.push_back( * main_context_);
// resume fiber f
resume_( main_context_, f);
} else if ( wqueue_.empty() ) {
} else if ( sleep_queue_.empty() ) {
// ready- and waiting-queue are empty so we can quit
break;
}
}
// destroy terminated fibers from terminated-queue
tqueue_t::iterator e = tqueue_.end();
for ( tqueue_t::iterator i = tqueue_.begin(); i != e;) {
context * f( & ( * i) );
for ( context * f : terminated_queue_) {
BOOST_ASSERT( f->is_terminated() );
i = tqueue_.erase( i);
BOOST_ASSERT( ! f->state_is_linked() );
BOOST_ASSERT( ! f->runnable_is_linked() );
BOOST_ASSERT( ! f->ready_is_linked() );
BOOST_ASSERT( ! f->sleep_is_linked() );
BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT( ! f->yield_is_linked() );
intrusive_ptr_release( f); // might call ~context()
}
terminated_queue_.clear();
BOOST_ASSERT( context::active() == main_context_);
BOOST_ASSERT( wqueue_.empty() );
BOOST_ASSERT( yqueue_.empty() );
BOOST_ASSERT( ready_queue_.empty() );
BOOST_ASSERT( sleep_queue_.empty() );
BOOST_ASSERT( terminated_queue_.empty() );
BOOST_ASSERT( 0 == sched_algo_->ready_fibers() );
}
@@ -132,8 +135,8 @@ scheduler::resume_( context * af, context * f) {
context::active( f);
// push terminated fibers to terminated-queue
if ( af->is_terminated() ) {
BOOST_ASSERT( ! af->state_is_linked() );
tqueue_.push_back( * af);
BOOST_ASSERT( ! af->runnable_is_linked() );
terminated_queue_.push_back( af);
}
// resume active-fiber == f
f->resume();
@@ -145,9 +148,9 @@ scheduler::spawn( context * f) {
BOOST_ASSERT( f->is_ready() );
BOOST_ASSERT( f != context::active() );
// add new fiber to the scheduler
BOOST_ASSERT( ! f->state_is_linked() );
BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT( ! f->yield_is_linked() );
BOOST_ASSERT( ! f->runnable_is_linked() );
BOOST_ASSERT( ! f->ready_is_linked() );
BOOST_ASSERT( ! f->sleep_is_linked() );
sched_algo_->awakened( f);
}
@@ -158,15 +161,16 @@ scheduler::run( context * af) {
for (;;) {
{
// move all fibers in yield-queue to ready-queue
yqueue_t::iterator e = yqueue_.end();
for ( yqueue_t::iterator i = yqueue_.begin(); i != e;) {
ready_queue_t::iterator e = ready_queue_.end();
for ( ready_queue_t::iterator i = ready_queue_.begin(); i != e;) {
context * f = & ( * i);
BOOST_ASSERT( f->is_ready() );
i = yqueue_.erase( i);
BOOST_ASSERT( ! f->state_is_linked() );
BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT( ! f->yield_is_linked() );
i = ready_queue_.erase( i);
BOOST_ASSERT( ! f->runnable_is_linked() );
BOOST_ASSERT( ! f->ready_is_linked() );
BOOST_ASSERT( ! f->sleep_is_linked() );
//BOOST_ASSERT( ! f->wait_is_linked() );
sched_algo_->awakened( f);
}
}
@@ -175,8 +179,8 @@ scheduler::run( context * af) {
// from waiting-queue to the ready-queue
std::chrono::steady_clock::time_point now(
std::chrono::steady_clock::now() );
wqueue_t::iterator e = wqueue_.end();
for ( wqueue_t::iterator i = wqueue_.begin(); i != e;) {
sleep_queue_t::iterator e = sleep_queue_.end();
for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) {
context * f = & ( * i);
BOOST_ASSERT( ! f->is_running() );
BOOST_ASSERT( ! f->is_terminated() );
@@ -186,12 +190,14 @@ scheduler::run( context * af) {
f->set_ready();
}
if ( f->is_ready() ) {
i = wqueue_.erase( i);
// Pass the newly-unlinked context* to sched_algo.
i = sleep_queue_.erase( i);
// unlink only after accessing iterator
f->time_point_reset();
BOOST_ASSERT( ! f->state_is_linked() );
BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT( ! f->yield_is_linked() );
// Pass the newly-unlinked context* to sched_algo.
BOOST_ASSERT( ! f->runnable_is_linked() );
BOOST_ASSERT( ! f->ready_is_linked() );
BOOST_ASSERT( ! f->sleep_is_linked() );
//BOOST_ASSERT( ! f->wait_is_linked() );
sched_algo_->awakened( f);
} else {
++i;
@@ -201,20 +207,23 @@ scheduler::run( context * af) {
// pop new fiber from ready-queue
context * f( sched_algo_->pick_next() );
if ( nullptr != f) {
BOOST_ASSERT( ! f->runnable_is_linked() );
BOOST_ASSERT( ! f->ready_is_linked() );
BOOST_ASSERT( ! f->sleep_is_linked() );
//BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT_MSG( f->is_ready(), "fiber with invalid state in ready-queue");
// resume fiber f
resume_( af, f);
// destroy terminated fibers from terminated-queue
tqueue_t::iterator e = tqueue_.end();
for ( tqueue_t::iterator i = tqueue_.begin(); i != e;) {
context * f( & ( * i) );
for ( context * f : terminated_queue_) {
BOOST_ASSERT( f->is_terminated() );
i = tqueue_.erase( i);
BOOST_ASSERT( ! f->state_is_linked() );
BOOST_ASSERT( ! f->runnable_is_linked() );
BOOST_ASSERT( ! f->ready_is_linked() );
BOOST_ASSERT( ! f->sleep_is_linked() );
BOOST_ASSERT( ! f->wait_is_linked() );
BOOST_ASSERT( ! f->yield_is_linked() );
intrusive_ptr_release( f); // might call ~context()
}
terminated_queue_.clear();
return;
} else {
// no fibers ready to run; the thread should sleep
@@ -246,8 +255,8 @@ scheduler::wait_until( context * af,
lk.unlock();
// push active-fiber to waiting-queue
af->time_point( timeout_time);
BOOST_ASSERT( ! af->state_is_linked() );
wqueue_.push_back( * af);
BOOST_ASSERT( ! af->sleep_is_linked() );
sleep_queue_.push_back( * af);
// switch to another fiber
run( af);
// fiber has been resumed
@@ -264,9 +273,9 @@ scheduler::yield( context * af) {
BOOST_ASSERT( af->is_running() );
// set active-fiber to state_waiting
af->set_ready();
// push active-fiber to yqueue_
BOOST_ASSERT( ! af->yield_is_linked() );
yqueue_.push_back( * af);
// push active-fiber to ready_queue_
BOOST_ASSERT( ! af->ready_is_linked() );
ready_queue_.push_back( * af);
// switch to another fiber
run( af);
// fiber has been resumed
@@ -284,8 +293,8 @@ scheduler::join( context * af, context * f) {
// set active-fiber to state_waiting
af->set_waiting();
// push active-fiber to waiting-queue
BOOST_ASSERT( ! af->state_is_linked() );
wqueue_.push_back( * af);
BOOST_ASSERT( ! af->sleep_is_linked() );
sleep_queue_.push_back( * af);
// add active-fiber to joinig-list of f
if ( ! f->join( af) ) {
// f must be already terminated therefore we set