2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-16 13:22:17 +00:00

protect worker_queue by spinlock

This commit is contained in:
Oliver Kowalke
2015-10-21 15:26:06 +02:00
parent f743b63bc0
commit cfc4b6f292
2 changed files with 18 additions and 26 deletions

View File

@@ -81,6 +81,7 @@ private:
sleep_queue_t sleep_queue_;
bool shutdown_;
detail::spinlock remote_ready_splk_;
detail::spinlock worker_splk_;
void resume_( context *, context *, std::function< void() > *);

View File

@@ -26,12 +26,7 @@ void
scheduler::resume_( context * active_ctx, context * ctx, std::function< void() > * func) {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
active_ctx->worker_is_linked() );
BOOST_ASSERT( main_ctx_ == ctx ||
dispatcher_ctx_.get() == ctx ||
ctx->worker_is_linked() );
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
BOOST_ASSERT( this == active_ctx->get_scheduler() );
BOOST_ASSERT( this == ctx->get_scheduler() );
BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() );
@@ -41,9 +36,6 @@ scheduler::resume_( context * active_ctx, context * ctx, std::function< void() >
// resume active-fiber == ctx
func = ctx->resume( func);
BOOST_ASSERT( context::active() == active_ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
active_ctx->worker_is_linked() );
if ( nullptr != func) {
( * func)();
}
@@ -67,12 +59,14 @@ scheduler::release_terminated_() {
context * ctx = & ( * i);
BOOST_ASSERT( ! ctx->is_main_context() );
BOOST_ASSERT( ! ctx->is_dispatcher_context() );
BOOST_ASSERT( ctx->worker_is_linked() );
//BOOST_ASSERT( ctx->worker_is_linked() );
BOOST_ASSERT( ctx->is_terminated() );
BOOST_ASSERT( ! ctx->ready_is_linked() );
BOOST_ASSERT( ! ctx->sleep_is_linked() );
// remove context from worker-queue
std::unique_lock< detail::spinlock > lk( worker_splk_);
ctx->worker_unlink();
lk.unlock();
// remove context from terminated-queue
i = terminated_queue_.erase( i);
// if last reference, e.g. fiber::join() or fiber::detach()
@@ -106,8 +100,7 @@ scheduler::sleep2ready_() noexcept {
for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) {
context * ctx = & ( * i);
BOOST_ASSERT( ! ctx->is_dispatcher_context() );
BOOST_ASSERT( main_ctx_ == ctx ||
ctx->worker_is_linked() );
//BOOST_ASSERT( main_ctx_ == ctx || ctx->worker_is_linked() );
BOOST_ASSERT( ! ctx->is_terminated() );
BOOST_ASSERT( ! ctx->ready_is_linked() );
BOOST_ASSERT( ctx->sleep_is_linked() );
@@ -136,7 +129,8 @@ scheduler::scheduler() noexcept :
remote_ready_queue_(),
sleep_queue_(),
shutdown_( false),
remote_ready_splk_() {
remote_ready_splk_(),
worker_splk_() {
}
scheduler::~scheduler() noexcept {
@@ -148,7 +142,7 @@ scheduler::~scheduler() noexcept {
// resume pending fibers
resume_( main_ctx_, get_next_(), nullptr);
// no context' in worker-queue
BOOST_ASSERT( worker_queue_.empty() );
//BOOST_ASSERT( worker_queue_.empty() );
BOOST_ASSERT( terminated_queue_.empty() );
BOOST_ASSERT( ! sched_algo_->has_ready_fibers() );
BOOST_ASSERT( remote_ready_queue_.empty() );
@@ -201,8 +195,8 @@ scheduler::dispatch() {
BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
}
// loop till all context' have been terminated
std::unique_lock< detail::spinlock > lk( worker_splk_);
while ( ! worker_queue_.empty() ) {
release_terminated_();
// force unwinding of all context' in worker-queue
worker_queue_t::iterator e = worker_queue_.end();
for ( worker_queue_t::iterator i = worker_queue_.begin(); i != e;) {
@@ -225,6 +219,7 @@ scheduler::dispatch() {
BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
}
}
lk.unlock();
// release termianted context'
release_terminated_();
// return to main-context
@@ -281,7 +276,7 @@ scheduler::set_terminated( context * active_ctx) noexcept {
BOOST_ASSERT( context::active() == active_ctx);
BOOST_ASSERT( ! active_ctx->is_main_context() );
BOOST_ASSERT( ! active_ctx->is_dispatcher_context() );
BOOST_ASSERT( active_ctx->worker_is_linked() );
//BOOST_ASSERT( active_ctx->worker_is_linked() );
BOOST_ASSERT( active_ctx->is_terminated() );
BOOST_ASSERT( ! active_ctx->ready_is_linked() );
BOOST_ASSERT( ! active_ctx->sleep_is_linked() );
@@ -297,9 +292,7 @@ scheduler::set_terminated( context * active_ctx) noexcept {
void
scheduler::yield( context * active_ctx) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
active_ctx->worker_is_linked() );
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
BOOST_ASSERT( ! active_ctx->is_terminated() );
BOOST_ASSERT( ! active_ctx->ready_is_linked() );
BOOST_ASSERT( ! active_ctx->sleep_is_linked() );
@@ -323,9 +316,7 @@ scheduler::wait_until( context * active_ctx,
std::chrono::steady_clock::time_point const& sleep_tp,
std::function< void() > * func) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
active_ctx->worker_is_linked() );
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
BOOST_ASSERT( ! active_ctx->is_terminated() );
// if the active-fiber running in this thread calls
// condition:wait() and code in another thread calls
@@ -354,9 +345,7 @@ void
scheduler::suspend( context * active_ctx,
std::function< void() > * func) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( main_ctx_ == active_ctx ||
dispatcher_ctx_.get() == active_ctx ||
active_ctx->worker_is_linked() );
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
// resume another context
resume_( active_ctx, get_next_(), func);
}
@@ -412,7 +401,8 @@ scheduler::attach_worker_context( context * ctx) noexcept {
BOOST_ASSERT( ! ctx->sleep_is_linked() );
BOOST_ASSERT( ! ctx->terminated_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
BOOST_ASSERT( ! ctx->worker_is_linked() );
//BOOST_ASSERT( ! ctx->worker_is_linked() );
std::unique_lock< detail::spinlock > lk( worker_splk_);
ctx->worker_link( worker_queue_);
ctx->scheduler_ = this;
}
@@ -426,6 +416,7 @@ scheduler::detach_worker_context( context * ctx) noexcept {
BOOST_ASSERT( ! ctx->terminated_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
std::unique_lock< detail::spinlock > lk( worker_splk_);
ctx->worker_unlink();
}