2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-16 13:22:17 +00:00

use yield-queue because work-sharing

- yield-queue prevents resumtion of a fiber which is passed to
  awakened() but has not yet suspended
This commit is contained in:
Oliver Kowalke
2015-10-10 21:52:03 +02:00
parent ecab1c5ee6
commit dbfd80fb2d
4 changed files with 86 additions and 11 deletions

View File

@@ -85,6 +85,14 @@ typedef intrusive::list_member_hook<
>
> remote_ready_hook;
struct yield_tag;
typedef intrusive::list_member_hook<
intrusive::tag< yield_tag >,
intrusive::link_mode<
intrusive::auto_unlink
>
> yield_hook;
struct sleep_tag;
typedef intrusive::set_member_hook<
intrusive::tag< sleep_tag >,
@@ -175,6 +183,7 @@ public:
detail::terminated_hook terminated_hook_;
detail::ready_hook ready_hook_;
detail::remote_ready_hook remote_ready_hook_;
detail::yield_hook yield_hook_;
detail::sleep_hook sleep_hook_;
detail::wait_hook wait_hook_;
std::chrono::steady_clock::time_point tp_;
@@ -302,6 +311,7 @@ public:
terminated_hook_(),
ready_hook_(),
remote_ready_hook_(),
yield_hook_(),
sleep_hook_(),
wait_hook_(),
tp_( (std::chrono::steady_clock::time_point::max)() ),
@@ -391,6 +401,8 @@ public:
bool remote_ready_is_linked();
bool yield_is_linked();
bool sleep_is_linked();
bool wait_is_linked();
@@ -419,6 +431,12 @@ public:
lst.push_back( * this);
}
template< typename List >
void yield_link( List & lst) {
std::unique_lock< detail::spinlock > lk( hook_splk_);
lst.push_back( * this);
}
template< typename Set >
void sleep_link( Set & set) {
std::unique_lock< detail::spinlock > lk( hook_splk_);
@@ -437,6 +455,8 @@ public:
void remote_ready_unlink();
void yield_unlink();
void sleep_unlink();
void wait_unlink();

View File

@@ -44,6 +44,11 @@ public:
intrusive::member_hook<
context, detail::remote_ready_hook, & context::remote_ready_hook_ >,
intrusive::constant_time_size< false > > remote_ready_queue_t;
typedef intrusive::list<
context,
intrusive::member_hook<
context, detail::yield_hook, & context::yield_hook_ >,
intrusive::constant_time_size< false > > yield_queue_t;
typedef intrusive::set<
context,
intrusive::member_hook<
@@ -80,9 +85,11 @@ private:
remote_ready_queue_t remote_ready_queue_;
// sleep-queue cotnains context' whic hahve been called
// scheduler::wait_until()
yield_queue_t yield_queue_;
sleep_queue_t sleep_queue_;
bool shutdown_;
detail::spinlock remote_ready_splk_;
std::mutex mtx_;
void resume_( context *, context *);
@@ -92,6 +99,8 @@ private:
void remote_ready2ready_();
void yield2ready_();
void sleep2ready_() noexcept;
public:

View File

@@ -170,6 +170,7 @@ context::context( main_context_t) :
terminated_hook_(),
ready_hook_(),
remote_ready_hook_(),
yield_hook_(),
sleep_hook_(),
wait_hook_(),
tp_( (std::chrono::steady_clock::time_point::max)() ),
@@ -197,6 +198,7 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall
terminated_hook_(),
ready_hook_(),
remote_ready_hook_(),
yield_hook_(),
sleep_hook_(),
wait_hook_(),
tp_( (std::chrono::steady_clock::time_point::max)() ),
@@ -418,6 +420,12 @@ context::remote_ready_is_linked() {
return remote_ready_hook_.is_linked();
}
bool
context::yield_is_linked() {
std::unique_lock< detail::spinlock > lk( hook_splk_);
return yield_hook_.is_linked();
}
bool
context::sleep_is_linked() {
std::unique_lock< detail::spinlock > lk( hook_splk_);
@@ -448,6 +456,12 @@ context::remote_ready_unlink() {
remote_ready_hook_.unlink();
}
void
context::yield_unlink() {
std::unique_lock< detail::spinlock > lk( hook_splk_);
yield_hook_.unlink();
}
void
context::sleep_unlink() {
std::unique_lock< detail::spinlock > lk( hook_splk_);

View File

@@ -48,6 +48,9 @@ scheduler::resume_( context * active_ctx, context * ctx) {
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
active_ctx->worker_is_linked() );
// move yielded context' to ready-queue
yield2ready_();
// check if unwinding was requested
if ( active_ctx->unwinding_requested() ) {
throw forced_unwind();
}
@@ -55,7 +58,14 @@ scheduler::resume_( context * active_ctx, context * ctx) {
context *
scheduler::get_next_() noexcept {
return sched_algo_->pick_next();
context * ctx = sched_algo_->pick_next();
if ( nullptr != ctx &&
! ctx->worker_is_linked() &&
! ctx->is_main_context() &&
! ctx->is_dispatcher_context() ) {
ctx->worker_link( worker_queue_);
}
return ctx;
}
void
@@ -94,6 +104,18 @@ scheduler::remote_ready2ready_() {
}
}
void
scheduler::yield2ready_() {
std::unique_lock< std::mutex > lk( mtx_);
// get context from yield-queue
while ( ! yield_queue_.empty() ) {
context * ctx = & yield_queue_.front();
yield_queue_.pop_front();
BOOST_ASSERT( ! ctx->ready_is_linked() );
set_ready( ctx);
}
}
void
scheduler::sleep2ready_() noexcept {
// move context which the deadline has reached
@@ -133,6 +155,7 @@ scheduler::scheduler() noexcept :
worker_queue_(),
terminated_queue_(),
remote_ready_queue_(),
yield_queue_(),
sleep_queue_(),
shutdown_( false),
remote_ready_splk_() {
@@ -151,6 +174,7 @@ scheduler::~scheduler() noexcept {
BOOST_ASSERT( terminated_queue_.empty() );
BOOST_ASSERT( ! sched_algo_->has_ready_fibers() );
BOOST_ASSERT( remote_ready_queue_.empty() );
BOOST_ASSERT( yield_queue_.empty() );
BOOST_ASSERT( sleep_queue_.empty() );
// set active context to nullptr
context::reset_active();
@@ -193,6 +217,8 @@ void
scheduler::dispatch() {
BOOST_ASSERT( context::active() == dispatcher_ctx_);
while ( ! shutdown_) {
// move yielded context' to ready-queue
yield2ready_();
// release termianted context'
release_terminated_();
// get sleeping context'
@@ -247,8 +273,10 @@ void
scheduler::set_ready( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( ! ctx->is_terminated() );
// dispatcher-context will never be passed to set_ready()
BOOST_ASSERT( ! ctx->is_dispatcher_context() );
// we do not test for wait-queue because
// context::wait_is_linked() is not sychronized
// context::wait_is_linked() is not synchronized
// with other threads
//BOOST_ASSERT( active_ctx->wait_is_linked() );
// handle newly created context
@@ -260,7 +288,7 @@ scheduler::set_ready( context * ctx) noexcept {
ctx->worker_link( worker_queue_);
}
} else {
// sanity checks, main-contxt might by signaled
// sanity checks, main-context might by signaled
// from another thread
BOOST_ASSERT( main_ctx_ == ctx);
BOOST_ASSERT( this == ctx->get_scheduler() );
@@ -272,13 +300,12 @@ scheduler::set_ready( context * ctx) noexcept {
// unlink it from sleep-queue
ctx->sleep_unlink();
}
// if context is already in ready-queue, do return
// this might happend if a newly created fiber was
// for safety unlink it from ready-queue
// this might happen if a newly created fiber was
// signaled to interrupt
if ( ! ctx->ready_is_linked() ) {
// push new context to ready-queue
sched_algo_->awakened( ctx);
}
ctx->ready_unlink();
// push new context to ready-queue
sched_algo_->awakened( ctx);
}
void
@@ -329,8 +356,13 @@ scheduler::yield( context * active_ctx) noexcept {
// we do not test for wait-queue because
// context::wait_is_linked() is not sychronized
// with other threads
// push active context to ready-queue
sched_algo_->awakened( active_ctx);
// push active context to yield-queue
// in work-sharing context (multiple threads read
// from one ready-queue) the context must be
// already suspended until another thread resumes it
std::unique_lock< std::mutex > lk( mtx_);
active_ctx->yield_link( yield_queue_);
lk.unlock();
// resume another fiber
resume_( active_ctx, get_next_() );
}