diff --git a/doc/fibers.qbk b/doc/fibers.qbk index 5608b6db..8ceb4cb3 100644 --- a/doc/fibers.qbk +++ b/doc/fibers.qbk @@ -155,7 +155,8 @@ [def __cond_any_wait_for__ [member_link condition_variable_any..wait_for]] [def __cond_any_wait__ [member_link condition_variable_any..wait]] [def __cond_any_wait_until__ [member_link condition_variable_any..wait_until]] -[def __context_migrate__ [member_link context..migrate]] +[def __context_attach__ [member_link context..attach]] +[def __context_detach__ [member_link context..detach]] [def __detach__ [member_link fiber..detach]] [def __fiber_id__ [dblink class_fiber_id..`fiber::id`]] [def __fsp__ [class_link fiber_specific_ptr]] @@ -163,7 +164,6 @@ [def __get_id__ [member_link fiber..get_id]] [def __io_service__ [@http://www.boost.org/doc/libs/release/doc/html/boost_asio/reference/io_service.html `boost::asio::io_service`]] [def __join__ [member_link fiber..join]] -[def __migrate__ [member_link context..migrate]] [def __mutex_lock__ [member_link mutex..lock]] [def __mutex_try_lock__ [member_link mutex..try_lock]] [def __run_service__ `boost::fibers::asio::run_svc()`] diff --git a/doc/migration.qbk b/doc/migration.qbk index b9218984..2de6a8a9 100644 --- a/doc/migration.qbk +++ b/doc/migration.qbk @@ -35,9 +35,9 @@ memory access. Only fibers that are contained in __algo__'s ready queue can migrate between threads. You cannot migrate a running fiber, nor one that is __blocked__. -In__boost_fiber__ a fiber is migrated by invoking __context_migrate__ on the -[class_link context] instance for a fiber already associated with the -destination thread, passing the `context` for the fiber to be migrated. +In__boost_fiber__ a fiber is migrated by invoking __context_detach__ within the +thread from which the fiber migrates from and __context_attach__ within the the +thread the fiber migrates to. [heading Example of work sharing] diff --git a/doc/rationale.qbk b/doc/rationale.qbk index f0639028..91bef14d 100644 --- a/doc/rationale.qbk +++ b/doc/rationale.qbk @@ -66,9 +66,10 @@ See also [link condition_variable_spurious_wakeups No Spurious Wakeups]. [heading migrating fibers between threads] Support for migrating fibers between threads has been integrated. The -user-defined scheduler must call __migrate__ on a fiber-context on the -destination thread, passing `migrate()` the fiber-context to migrate. (For -more information about custom schedulers, see [link custom Customization].) +user-defined scheduler must call __context_detach__ on a fiber-context on the +source thread and __context_attach__ on the destination thread, passing the +fiber-context to migrate. (For more information about custom schedulers, see +[link custom Customization].) Examples `work_sharing` and `work_stealing` in directory `examples` might be used as a blueprint. diff --git a/doc/scheduling.qbk b/doc/scheduling.qbk index 9ba57051..4bf9ce85 100644 --- a/doc/scheduling.qbk +++ b/doc/scheduling.qbk @@ -436,7 +436,8 @@ of typical STL containers. id get_id() const noexcept; - void migrate( context *) noexcept; + void detach() noexcept; + void attach( context *) noexcept; bool is_context( type) const noexcept; @@ -484,12 +485,21 @@ default-constructed __fiber_id__.]] [[See also:] [[member_link fiber..get_id]]] ] -[member_heading context..migrate] +[member_heading context..attach] - void migrate( context * f) noexcept; + void attach( context * f) noexcept; [variablelist -[[Effects:] [Migrate fiber `f` to scheduler running `*this`.]] +[[Effects:] [Atach fiber `f` to scheduler running `*this`.]] +[[Throws:] [Nothing]] +] + +[member_heading context..detach] + + void detach() noexcept; + +[variablelist +[[Effects:] [Detach fiber `*this` from its scheduler running `*this`.]] [[Throws:] [Nothing]] ] diff --git a/examples/work_sharing.cpp b/examples/work_sharing.cpp index b8a33e8c..bd04b7f8 100644 --- a/examples/work_sharing.cpp +++ b/examples/work_sharing.cpp @@ -52,6 +52,7 @@ public: >*/ local_queue_.push( ctx); } else { + ctx->detach(); lock_t lk(rqueue_mtx_); /*< worker fiber, enqueue on shared queue >*/ @@ -70,7 +71,7 @@ public: rqueue_.pop(); lk.unlock(); BOOST_ASSERT( nullptr != ctx); - boost::fibers::context::active()->migrate( ctx); /*< + boost::fibers::context::active()->attach( ctx); /*< attach context to current scheduler via the active fiber of this thread; benign if the fiber already belongs to this thread diff --git a/examples/work_stealing.cpp b/examples/work_stealing.cpp index ec77fc50..fe9f3a7d 100644 --- a/examples/work_stealing.cpp +++ b/examples/work_stealing.cpp @@ -21,6 +21,9 @@ #include +static std::atomic< int > count{ 0 }; +static std::atomic< bool > fini{ false }; + class work_stealing_queue { private: typedef std::deque< boost::fibers::context * > rqueue_t; @@ -62,9 +65,6 @@ public: BOOST_ASSERT( ! ctx->sleep_is_linked() ); BOOST_ASSERT( ! ctx->terminated_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); - //BOOST_ASSERT( ! ctx->worker_is_linked() ); - // attach context to current scheduler - boost::fibers::context::active()->migrate( ctx); } return ctx; } @@ -101,11 +101,18 @@ public: virtual void awakened( boost::fibers::context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); + if ( ! ctx->is_context( boost::fibers::type::pinned_context) ) { + ctx->detach(); + } rqueue_->push_back( ctx); } virtual boost::fibers::context * pick_next() noexcept { - return rqueue_->pick_next(); + boost::fibers::context * ctx = rqueue_->pick_next(); + if ( nullptr != ctx && ! ctx->is_context( boost::fibers::type::pinned_context) ) { + boost::fibers::context::active()->attach( ctx); + } + return ctx; } virtual bool has_ready_fibers() const noexcept { @@ -139,15 +146,13 @@ private: rqueue_t rqueue_{}; std::shared_ptr< ws_rqueue_t > ws_rqueue_; - std::atomic< int > * count_; std::mutex mtx_{}; std::condition_variable cnd_{}; bool flag_{ false }; public: - tief_algo( std::shared_ptr< ws_rqueue_t > ws_rqueue, std::atomic< int > * count) : - ws_rqueue_( ws_rqueue), - count_( count) { + tief_algo( std::shared_ptr< ws_rqueue_t > ws_rqueue) : + ws_rqueue_( ws_rqueue) { } virtual void awakened( boost::fibers::context * ctx) noexcept { @@ -162,12 +167,12 @@ public: rqueue_.pop_front(); BOOST_ASSERT( nullptr != ctx); if ( rqueue_.empty() ) { - // we have no more fiber in the queue // try stealing a fiber from the other thread boost::fibers::context * stolen = ws_rqueue_->steal(); if ( nullptr != stolen) { - ++( * count_); - rqueue_.push_back( * stolen); + ++count; + boost::fibers::context::active()->attach( stolen); + stolen->ready_link( rqueue_); } } } @@ -222,10 +227,10 @@ boost::fibers::future< int > fibonacci( int n) { return f; } -void thread( std::shared_ptr< work_stealing_queue > ws_queue, std::atomic< int > * count, std::atomic< bool > * fini) { - boost::fibers::use_scheduling_algorithm< tief_algo >( ws_queue, count); +void thread( std::shared_ptr< work_stealing_queue > ws_queue) { + boost::fibers::use_scheduling_algorithm< tief_algo >( ws_queue); - while ( ! ( * fini) ) { + while ( ! fini) { // To guarantee progress, we must ensure that // threads that have work to do are not unreasonably delayed by (thief) threads // which are idle except for task-stealing. @@ -244,17 +249,14 @@ int main() { for ( int i = 0; i < 10; ++i) { BOOST_ASSERT( ! ws_queue->has_work_items() ); - std::atomic< int > count( 0); - std::atomic< bool > fini( false); + count = 0; int n = 10; // launch a couple threads to help process them std::thread threads[] = { - std::thread( thread, ws_queue, & count, & fini), - std::thread( thread, ws_queue, & count, & fini), - std::thread( thread, ws_queue, & count, & fini), - std::thread( thread, ws_queue, & count, & fini), - std::thread( thread, ws_queue, & count, & fini) + std::thread( thread, ws_queue), + std::thread( thread, ws_queue), + std::thread( thread, ws_queue) }; // main fiber computes fibonacci( n) diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 5a6e22c9..e8bb67eb 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -162,7 +162,7 @@ private: type type_; #endif launch_policy lpol_{ launch_policy::post }; - scheduler * scheduler_{ nullptr }; + std::atomic< scheduler * > scheduler_{ nullptr }; #if (BOOST_EXECUTION_CONTEXT==1) boost::context::execution_context ctx_; #else @@ -457,9 +457,9 @@ public: void worker_unlink() noexcept; - void attach( context *) noexcept; + void detach() noexcept; - void migrate( context *) noexcept; + void attach( context *) noexcept; friend void intrusive_ptr_add_ref( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index f2c331af..a04657db 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -80,7 +80,6 @@ private: sleep_queue_t sleep_queue_{}; bool shutdown_{ false }; detail::spinlock remote_ready_splk_{}; - detail::spinlock worker_splk_{}; context * get_next_() noexcept; diff --git a/src/context.cpp b/src/context.cpp index d0a37c61..959773fc 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -176,7 +176,7 @@ context::resume_( detail::data_t & d) noexcept { void context::set_ready_( context * ctx) noexcept { - scheduler_->set_ready( ctx); + scheduler_.load()->set_ready( ctx); } // main fiber context @@ -288,12 +288,12 @@ context::resume( context * ready_ctx) noexcept { void context::suspend() noexcept { - scheduler_->suspend(); + scheduler_.load()->suspend(); } void context::suspend( detail::spinlock_lock & lk) noexcept { - scheduler_->suspend( lk); + scheduler_.load()->suspend( lk); } void @@ -310,7 +310,7 @@ context::join() { active_ctx->wait_link( wait_queue_); lk.unlock(); // suspend active context - scheduler_->suspend(); + scheduler_.load()->suspend(); // remove from wait-queue active_ctx->wait_unlink(); // active context resumed @@ -321,7 +321,7 @@ context::join() { void context::yield() noexcept { // yield active context - scheduler_->yield( context::active() ); + scheduler_.load()->yield( context::active() ); } #if (BOOST_EXECUTION_CONTEXT>1) @@ -363,9 +363,9 @@ context::set_terminated() noexcept { fss_data_.clear(); // switch to another context #if (BOOST_EXECUTION_CONTEXT==1) - scheduler_->set_terminated( this); + scheduler_.load()->set_terminated( this); #else - return scheduler_->set_terminated( this); + return scheduler_.load()->set_terminated( this); #endif } @@ -373,7 +373,7 @@ bool context::wait_until( std::chrono::steady_clock::time_point const& tp) noexcept { BOOST_ASSERT( nullptr != scheduler_); BOOST_ASSERT( this == active_); - return scheduler_->wait_until( this, tp); + return scheduler_.load()->wait_until( this, tp); } bool @@ -381,7 +381,7 @@ context::wait_until( std::chrono::steady_clock::time_point const& tp, detail::spinlock_lock & lk) noexcept { BOOST_ASSERT( nullptr != scheduler_); BOOST_ASSERT( this == active_); - return scheduler_->wait_until( this, tp, lk); + return scheduler_.load()->wait_until( this, tp, lk); } void @@ -395,10 +395,10 @@ context::set_ready( context * ctx) noexcept { // (other scheduler assigned) if ( scheduler_ == ctx->scheduler_) { // local - scheduler_->set_ready( ctx); + scheduler_.load()->set_ready( ctx); } else { // remote - ctx->scheduler_->set_remote_ready( ctx); + ctx->scheduler_.load()->set_remote_ready( ctx); } } @@ -490,19 +490,15 @@ context::wait_unlink() noexcept { } void -context::attach( context * ctx) noexcept { - BOOST_ASSERT( nullptr != ctx); - scheduler_->attach_worker_context( ctx); +context::detach() noexcept { + BOOST_ASSERT( context::active() != this); + scheduler_.load()->detach_worker_context( this); } void -context::migrate( context * ctx) noexcept { +context::attach( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); - BOOST_ASSERT( context::active() != ctx); - if ( scheduler_ != ctx->scheduler_) { - ctx->scheduler_->detach_worker_context( ctx); - scheduler_->attach_worker_context( ctx); - } + scheduler_.load()->attach_worker_context( ctx); } }} diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 297a5767..74aec1d9 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -43,9 +43,7 @@ scheduler::release_terminated_() noexcept { BOOST_ASSERT( ! ctx->ready_is_linked() ); BOOST_ASSERT( ! ctx->sleep_is_linked() ); // remove context from worker-queue - std::unique_lock< detail::spinlock > lk( worker_splk_); ctx->worker_unlink(); - lk.unlock(); // remove context from terminated-queue i = terminated_queue_.erase( i); // if last reference, e.g. fiber::join() or fiber::detach() @@ -112,7 +110,6 @@ scheduler::~scheduler() { // by joining dispatcher-context dispatcher_ctx_->join(); // no context' in worker-queue - std::unique_lock< detail::spinlock > lk( worker_splk_); BOOST_ASSERT( worker_queue_.empty() ); BOOST_ASSERT( terminated_queue_.empty() ); BOOST_ASSERT( remote_ready_queue_.empty() ); @@ -135,9 +132,7 @@ scheduler::dispatch() noexcept { #endif BOOST_ASSERT( context::active() == dispatcher_ctx_); for (;;) { - std::unique_lock< detail::spinlock > lk( worker_splk_); bool no_worker = worker_queue_.empty(); - lk.unlock(); if ( shutdown_) { // notify sched-algorithm about termination sched_algo_->notify(); @@ -390,8 +385,8 @@ scheduler::attach_worker_context( context * ctx) noexcept { BOOST_ASSERT( ! ctx->sleep_is_linked() ); BOOST_ASSERT( ! ctx->terminated_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); - std::unique_lock< detail::spinlock > lk( worker_splk_); BOOST_ASSERT( ! ctx->worker_is_linked() ); + BOOST_ASSERT( nullptr == ctx->scheduler_); ctx->worker_link( worker_queue_); ctx->scheduler_ = this; } @@ -404,8 +399,9 @@ scheduler::detach_worker_context( context * ctx) noexcept { BOOST_ASSERT( ! ctx->terminated_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); - std::unique_lock< detail::spinlock > lk( worker_splk_); + BOOST_ASSERT( ! ctx->is_context( type::pinned_context) ); ctx->worker_unlink(); + ctx->scheduler_ = nullptr; } }}