2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-20 02:32:19 +00:00

remove yield-queue + pass functor to resume-op

This commit is contained in:
Oliver Kowalke
2015-10-16 21:48:57 +02:00
parent 6dfd42c343
commit bafd65bcf3
15 changed files with 378 additions and 512 deletions

View File

@@ -23,7 +23,7 @@ namespace boost {
namespace fibers {
void
scheduler::resume_( context * active_ctx, context * ctx) {
scheduler::resume_( context * active_ctx, context * ctx, std::function< void() > * func) {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
@@ -43,19 +43,14 @@ scheduler::resume_( context * active_ctx, context * ctx) {
// assign new fiber to active-fiber
context::active( ctx);
// resume active-fiber == ctx
ctx->resume();
func = ctx->resume( func);
BOOST_ASSERT( context::active() == active_ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
active_ctx->worker_is_linked() );
// move yielded context' to ready-queue
// note that we cann't call simply yield2ready_()
// because the context might be moved to another
// thread during suspension
// `this`might be cached and thus points to the
// previous scheduler (cotnaiend in the previous thread)
active_ctx->get_scheduler()->yield2ready_();
// check if unwinding was requested
if ( nullptr != func) {
( * func)();
}
if ( active_ctx->unwinding_requested() ) {
throw forced_unwind();
}
@@ -107,17 +102,6 @@ scheduler::remote_ready2ready_() {
}
}
void
scheduler::yield2ready_() {
// get context from yield-queue
while ( ! yield_queue_.empty() ) {
context * ctx = & yield_queue_.front();
yield_queue_.pop_front();
BOOST_ASSERT( ! ctx->ready_is_linked() );
set_ready( ctx);
}
}
void
scheduler::sleep2ready_() noexcept {
// move context which the deadline has reached
@@ -157,7 +141,6 @@ scheduler::scheduler() noexcept :
worker_queue_(),
terminated_queue_(),
remote_ready_queue_(),
yield_queue_(),
sleep_queue_(),
shutdown_( false),
remote_ready_splk_() {
@@ -170,13 +153,12 @@ scheduler::~scheduler() noexcept {
// signal dispatcher-context termination
shutdown_ = true;
// resume pending fibers
resume_( main_ctx_, get_next_() );
resume_( main_ctx_, get_next_(), nullptr);
// no context' in worker-queue
BOOST_ASSERT( worker_queue_.empty() );
BOOST_ASSERT( terminated_queue_.empty() );
BOOST_ASSERT( ! sched_algo_->has_ready_fibers() );
BOOST_ASSERT( remote_ready_queue_.empty() );
BOOST_ASSERT( yield_queue_.empty() );
BOOST_ASSERT( sleep_queue_.empty() );
// set active context to nullptr
context::reset_active();
@@ -190,8 +172,6 @@ void
scheduler::dispatch() {
BOOST_ASSERT( context::active() == dispatcher_ctx_);
while ( ! shutdown_) {
// move yielded context' to ready-queue
yield2ready_();
// release termianted context'
release_terminated_();
// get sleeping context'
@@ -224,7 +204,7 @@ scheduler::dispatch() {
// push dispatcher-context to ready-queue
// so that ready-queue never becomes empty
sched_algo_->awakened( dispatcher_ctx_.get() );
resume_( dispatcher_ctx_.get(), ctx);
resume_( dispatcher_ctx_.get(), ctx, nullptr);
BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
}
// loop till all context' have been terminated
@@ -247,7 +227,7 @@ scheduler::dispatch() {
// release termianted context'
release_terminated_();
// return to main-context
resume_( dispatcher_ctx_.get(), main_ctx_);
resume_( dispatcher_ctx_.get(), main_ctx_, nullptr);
}
void
@@ -322,18 +302,22 @@ scheduler::yield( context * active_ctx) noexcept {
// we do not test for wait-queue because
// context::wait_is_linked() is not sychronized
// with other threads
// push active context to yield-queue
// defer passing active context to set_ready()
// in work-sharing context (multiple threads read
// from one ready-queue) the context must be
// already suspended until another thread resumes it
active_ctx->yield_link( yield_queue_);
// (== maked as ready)
std::function< void() > func([=](){
set_ready( active_ctx);
});
// resume another fiber
resume_( active_ctx, get_next_() );
resume_( active_ctx, get_next_(), & func);
}
bool
scheduler::wait_until( context * active_ctx,
std::chrono::steady_clock::time_point const& sleep_tp) noexcept {
std::chrono::steady_clock::time_point const& sleep_tp,
std::function< void() > * func) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
@@ -356,20 +340,21 @@ scheduler::wait_until( context * active_ctx,
active_ctx->tp_ = sleep_tp;
active_ctx->sleep_link( sleep_queue_);
// resume another context
resume_( active_ctx, get_next_() );
resume_( active_ctx, get_next_(), func);
// context has been resumed
// check if deadline has reached
return std::chrono::steady_clock::now() < sleep_tp;
}
void
scheduler::re_schedule( context * active_ctx) noexcept {
scheduler::re_schedule( context * active_ctx,
std::function< void() > * func) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
active_ctx->worker_is_linked() );
// resume another context
resume_( active_ctx, get_next_() );
resume_( active_ctx, get_next_(), func);
}
bool
@@ -424,7 +409,6 @@ scheduler::attach_worker_context( context * ctx) noexcept {
BOOST_ASSERT( ! ctx->terminated_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
BOOST_ASSERT( ! ctx->worker_is_linked() );
BOOST_ASSERT( ! ctx->yield_is_linked() );
ctx->worker_link( worker_queue_);
ctx->scheduler_ = this;
}
@@ -437,7 +421,6 @@ scheduler::detach_worker_context( context * ctx) noexcept {
BOOST_ASSERT( ! ctx->sleep_is_linked() );
BOOST_ASSERT( ! ctx->terminated_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
BOOST_ASSERT( ! ctx->yield_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
ctx->worker_unlink();
}