From 559e49898235a1ace17fbfd252df07f5f7896002 Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Sat, 17 Oct 2015 15:10:28 +0200 Subject: [PATCH] context::terminate() as replacement for set_terminated_()/release() --- include/boost/fiber/context.hpp | 17 ++++------ src/context.cpp | 56 ++++++++++++++------------------- src/scheduler.cpp | 32 +++++++++---------- 3 files changed, 46 insertions(+), 59 deletions(-) diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 8c072aa5..1a0f1da9 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -191,8 +191,6 @@ private: detail::spinlock splk_; fiber_properties * properties_; - void set_terminated_() noexcept; - public: class id { private: @@ -281,6 +279,7 @@ public: auto tpl( std::move( tpl_) ); // jump back after initialization void * vp = ctx(); + // execute returned functor if ( nullptr != vp) { std::function< void() > * func( static_cast< std::function< void() > * >( vp) ); ( * func)(); @@ -294,12 +293,8 @@ public: } catch ( ... ) { std::terminate(); } - // mark fiber as terminated - set_terminated_(); - // notify waiting (joining) fibers - release(); - // switch to another fiber - suspend(); + // terminate context + terminate(); BOOST_ASSERT_MSG( false, "fiber already terminated"); }), ready_hook_(), @@ -325,14 +320,14 @@ public: std::function< void() > * resume( std::function< void() > *); - void suspend( std::function< void() > * = nullptr) noexcept; - - void release() noexcept; + void suspend( std::function< void() > *) noexcept; void join(); void yield() noexcept; + void terminate() noexcept; + bool wait_until( std::chrono::steady_clock::time_point const&, std::function< void() > * = nullptr) noexcept; diff --git a/src/context.cpp b/src/context.cpp index 142a2330..9268ed52 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -152,14 +152,6 @@ context::reset_active() noexcept { active_ = nullptr; } -void -context::set_terminated_() noexcept { - // protect for concurrent access - std::unique_lock< detail::spinlock > lk( splk_); - flags_ |= flag_terminated; - scheduler_->set_terminated( this); -} - // main fiber context context::context( main_context_t) : use_count_( 1), // allocated on main- or thread-stack @@ -238,30 +230,6 @@ context::suspend( std::function< void() > * func) noexcept { scheduler_->re_schedule( this, func); } -void -context::release() noexcept { - BOOST_ASSERT( is_terminated() ); - wait_queue_t tmp; - // protect for concurrent access - std::unique_lock< detail::spinlock > lk( splk_); - tmp.swap( wait_queue_); - lk.unlock(); - // notify all waiting fibers - wait_queue_t::iterator e = tmp.end(); - for ( wait_queue_t::iterator i = tmp.begin(); i != e;) { - context * ctx = & ( * i); - // remove fiber from wait-queue - i = tmp.erase( i); - // notify scheduler - scheduler_->set_ready( ctx); - } - // release fiber-specific-data - for ( fss_data_t::value_type & data : fss_data_) { - data.second.do_cleanup(); - } - fss_data_.clear(); -} - void context::join() { // get active context @@ -296,6 +264,30 @@ context::yield() noexcept { scheduler_->yield( active_ctx); } +void +context::terminate() noexcept { + // protect for concurrent access + std::unique_lock< detail::spinlock > lk( splk_); + // mark as terminated + flags_ |= flag_terminated; + // notify all waiting fibers + while ( ! wait_queue_.empty() ) { + context * ctx = & wait_queue_.front(); + // remove fiber from wait-queue + wait_queue_.pop_front(); + // notify scheduler + scheduler_->set_ready( ctx); + } + lk.unlock(); + // release fiber-specific-data + for ( fss_data_t::value_type & data : fss_data_) { + data.second.do_cleanup(); + } + fss_data_.clear(); + // switch to another context + scheduler_->set_terminated( this); +} + bool context::wait_until( std::chrono::steady_clock::time_point const& tp, std::function< void() > * func) noexcept { diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 9a62a4ac..1b5b0fae 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -88,15 +88,12 @@ scheduler::release_terminated_() { void scheduler::remote_ready2ready_() { - remote_ready_queue_t tmp; // protect for concurrent access std::unique_lock< detail::spinlock > lk( remote_ready_splk_); - remote_ready_queue_.swap( tmp); - lk.unlock(); // get context from remote ready-queue - while ( ! tmp.empty() ) { - context * ctx = & tmp.front(); - tmp.pop_front(); + while ( ! remote_ready_queue_.empty() ) { + context * ctx = & remote_ready_queue_.front(); + remote_ready_queue_.pop_front(); // store context in local queues set_ready( ctx); } @@ -275,19 +272,22 @@ scheduler::set_remote_ready( context * ctx) noexcept { } void -scheduler::set_terminated( context * ctx) noexcept { - BOOST_ASSERT( nullptr != ctx); - BOOST_ASSERT( ! ctx->is_main_context() ); - BOOST_ASSERT( ! ctx->is_dispatcher_context() ); - BOOST_ASSERT( ctx->worker_is_linked() ); - BOOST_ASSERT( ctx->is_terminated() ); - BOOST_ASSERT( ! ctx->ready_is_linked() ); - BOOST_ASSERT( ! ctx->sleep_is_linked() ); - BOOST_ASSERT( ! ctx->wait_is_linked() ); +scheduler::set_terminated( context * active_ctx) noexcept { + BOOST_ASSERT( nullptr != active_ctx); + BOOST_ASSERT( context::active() == active_ctx); + BOOST_ASSERT( ! active_ctx->is_main_context() ); + BOOST_ASSERT( ! active_ctx->is_dispatcher_context() ); + BOOST_ASSERT( active_ctx->worker_is_linked() ); + BOOST_ASSERT( active_ctx->is_terminated() ); + BOOST_ASSERT( ! active_ctx->ready_is_linked() ); + BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); + BOOST_ASSERT( ! active_ctx->wait_is_linked() ); // store the terminated fiber in the terminated-queue // the dispatcher-context will call // intrusive_ptr_release( ctx); - ctx->terminated_link( terminated_queue_); + active_ctx->terminated_link( terminated_queue_); + // resume another fiber + resume_( active_ctx, get_next_(), nullptr); } void