diff --git a/include/boost/fiber/condition.hpp b/include/boost/fiber/condition.hpp index 9e7c2c49..0857f574 100644 --- a/include/boost/fiber/condition.hpp +++ b/include/boost/fiber/condition.hpp @@ -71,7 +71,7 @@ public: // in order notify (resume) this fiber later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - wait_queue_.push_back( * ctx); + ctx->wait_link( wait_queue_); lk.unlock(); // unlock external lt.unlock(); @@ -104,7 +104,7 @@ public: // in order notify (resume) this fiber later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - wait_queue_.push_back( * ctx); + ctx->wait_link( wait_queue_); lk.unlock(); // unlock external lt.unlock(); diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 90fd6a6a..aaebba98 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -71,7 +71,7 @@ struct ready_tag; typedef intrusive::list_member_hook< intrusive::tag< ready_tag >, intrusive::link_mode< - intrusive::safe_link + intrusive::auto_unlink > > ready_hook; @@ -79,7 +79,7 @@ struct remote_ready_tag; typedef intrusive::list_member_hook< intrusive::tag< remote_ready_tag >, intrusive::link_mode< - intrusive::safe_link + intrusive::auto_unlink > > remote_ready_hook; @@ -166,6 +166,7 @@ private: boost::context::execution_context ctx_; public: + detail::spinlock hook_splk_; detail::worker_hook worker_hook_; detail::terminated_hook terminated_hook_; detail::ready_hook ready_hook_; @@ -288,6 +289,7 @@ public: suspend(); BOOST_ASSERT_MSG( false, "fiber already terminated"); }), + hook_splk_(), worker_hook_(), terminated_hook_(), ready_hook_(), @@ -368,6 +370,8 @@ public: bool worker_is_linked(); + bool terminated_is_linked(); + bool ready_is_linked(); bool remote_ready_is_linked(); @@ -376,8 +380,48 @@ public: bool wait_is_linked(); + template< typename List > + void worker_link( List & lst) { + std::unique_lock< detail::spinlock > lk( hook_splk_); + lst.push_back( * this); + } + + template< typename List > + void terminated_link( List & lst) { + std::unique_lock< detail::spinlock > lk( hook_splk_); + lst.push_back( * this); + } + + template< typename List > + void ready_link( List & lst) { + std::unique_lock< detail::spinlock > lk( hook_splk_); + lst.push_back( * this); + } + + template< typename List > + void remote_ready_link( List & lst) { + std::unique_lock< detail::spinlock > lk( hook_splk_); + lst.push_back( * this); + } + + template< typename Set > + void sleep_link( Set & set) { + std::unique_lock< detail::spinlock > lk( hook_splk_); + set.insert( * this); + } + + template< typename List > + void wait_link( List & lst) { + std::unique_lock< detail::spinlock > lk( hook_splk_); + lst.push_back( * this); + } + void worker_unlink(); + void ready_unlink(); + + void remote_ready_unlink(); + void sleep_unlink(); void wait_unlink(); diff --git a/src/context.cpp b/src/context.cpp index 99180702..b64faf2a 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -58,6 +58,7 @@ context::context( main_context_t) : flags_( flag_main_context), scheduler_( nullptr), ctx_( boost::context::execution_context::current() ), + hook_splk_(), worker_hook_(), terminated_hook_(), ready_hook_(), @@ -83,6 +84,7 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall // dispatcher context should never return from scheduler::dispatch() BOOST_ASSERT_MSG( false, "disatcher fiber already terminated"); }), + hook_splk_(), worker_hook_(), terminated_hook_(), ready_hook_(), @@ -167,7 +169,7 @@ context::join() { // push active context to wait-queue, member // of the context which has to be joined by // the active context - wait_queue_.push_back( * active_ctx); + active_ctx->wait_link( wait_queue_); lk.unlock(); // suspend active context scheduler_->re_schedule( active_ctx); @@ -278,41 +280,67 @@ context::set_fss_data( void const * vp, bool context::worker_is_linked() { + std::unique_lock< detail::spinlock > lk( hook_splk_); return worker_hook_.is_linked(); } +bool +context::terminated_is_linked() { + std::unique_lock< detail::spinlock > lk( hook_splk_); + return terminated_hook_.is_linked(); +} + bool context::ready_is_linked() { + std::unique_lock< detail::spinlock > lk( hook_splk_); return ready_hook_.is_linked(); } bool context::remote_ready_is_linked() { + std::unique_lock< detail::spinlock > lk( hook_splk_); return remote_ready_hook_.is_linked(); } bool context::sleep_is_linked() { + std::unique_lock< detail::spinlock > lk( hook_splk_); return sleep_hook_.is_linked(); } bool context::wait_is_linked() { + std::unique_lock< detail::spinlock > lk( hook_splk_); return wait_hook_.is_linked(); } void context::worker_unlink() { + std::unique_lock< detail::spinlock > lk( hook_splk_); worker_hook_.unlink(); } +void +context::ready_unlink() { + std::unique_lock< detail::spinlock > lk( hook_splk_); + ready_hook_.unlink(); +} + +void +context::remote_ready_unlink() { + std::unique_lock< detail::spinlock > lk( hook_splk_); + remote_ready_hook_.unlink(); +} + void context::sleep_unlink() { + std::unique_lock< detail::spinlock > lk( hook_splk_); sleep_hook_.unlink(); } void context::wait_unlink() { + std::unique_lock< detail::spinlock > lk( hook_splk_); wait_hook_.unlink(); } diff --git a/src/mutex.cpp b/src/mutex.cpp index 4d9d1fb7..c8007b0d 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -55,7 +55,7 @@ mutex::lock() { // store this fiber in order to be notified later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - wait_queue_.push_back( * ctx); + ctx->wait_link( wait_queue_); lk.unlock(); // suspend this fiber ctx->suspend(); diff --git a/src/recursive_mutex.cpp b/src/recursive_mutex.cpp index 895c0407..94c1ebfb 100644 --- a/src/recursive_mutex.cpp +++ b/src/recursive_mutex.cpp @@ -68,7 +68,7 @@ recursive_mutex::lock() { // store this fiber in order to be notified later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - wait_queue_.push_back( * ctx); + ctx->wait_link( wait_queue_); lk.unlock(); // suspend this fiber ctx->suspend(); diff --git a/src/recursive_timed_mutex.cpp b/src/recursive_timed_mutex.cpp index fb0ac7b5..909e4bb3 100644 --- a/src/recursive_timed_mutex.cpp +++ b/src/recursive_timed_mutex.cpp @@ -69,7 +69,7 @@ recursive_timed_mutex::lock() { // store this fiber in order to be notified later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - wait_queue_.push_back( * ctx); + ctx->wait_link( wait_queue_); lk.unlock(); // suspend this fiber ctx->suspend(); @@ -109,7 +109,7 @@ recursive_timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point co // store this fiber in order to be notified later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - wait_queue_.push_back( * ctx); + ctx->wait_link( wait_queue_); lk.unlock(); // suspend this fiber until notified or timed-out if ( ! ctx->wait_until( timeout_time) ) { diff --git a/src/scheduler.cpp b/src/scheduler.cpp index bd11b2b6..ff8d249b 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -121,7 +121,7 @@ scheduler::sleep2ready_() noexcept { // reset sleep-tp ctx->tp_ = (std::chrono::steady_clock::time_point::max)(); // push new context to ready-queue - ready_queue_.push_back( * ctx); + ctx->ready_link( ready_queue_); } else { break; // first context with now < deadline } @@ -187,7 +187,7 @@ scheduler::set_dispatcher_context( intrusive_ptr< context > dispatcher_ctx) noex // the dispatcher-context is resumed and // scheduler::dispatch() is executed dispatcher_ctx_->set_scheduler( this); - ready_queue_.push_back( * dispatcher_ctx_.get() ); + dispatcher_ctx_->ready_link( ready_queue_); } void @@ -226,7 +226,7 @@ scheduler::dispatch() { } // push dispatcher-context to ready-queue // so that ready-queue never becomes empty - ready_queue_.push_back( * dispatcher_ctx_); + dispatcher_ctx_->ready_link( ready_queue_); resume_( dispatcher_ctx_.get(), ctx); BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); } @@ -258,7 +258,7 @@ scheduler::set_ready( context * ctx) noexcept { // attach context to `this`-scheduler ctx->set_scheduler( this); // push to the worker-queue - worker_queue_.push_back( * ctx); + ctx->worker_link( worker_queue_); } } else { // sanity checks, main-contxt might by signaled @@ -278,7 +278,7 @@ scheduler::set_ready( context * ctx) noexcept { // signaled to interrupt if ( ! ctx->ready_is_linked() ) { // push new context to ready-queue - ready_queue_.push_back( * ctx); + ctx->ready_link( ready_queue_); } } @@ -296,7 +296,7 @@ scheduler::set_remote_ready( context * ctx) noexcept { // protect for concurrent access std::unique_lock< detail::spinlock > lk( remote_ready_splk_); // push new context to remote ready-queue - remote_ready_queue_.push_back( * ctx); + ctx->remote_ready_link( remote_ready_queue_); } void @@ -312,7 +312,7 @@ scheduler::set_terminated( context * ctx) noexcept { // store the terminated fiber in the terminated-queue // the dispatcher-context will call // intrusive_ptr_release( ctx); - terminated_queue_.push_back( * ctx); + ctx->terminated_link( terminated_queue_); } void @@ -328,7 +328,7 @@ scheduler::yield( context * active_ctx) noexcept { // context::wait_is_linked() is not sychronized // with other threads // push active context to ready-queue - ready_queue_.push_back( * active_ctx); + active_ctx->ready_link( ready_queue_); // resume another fiber resume_( active_ctx, get_next_() ); } @@ -356,7 +356,7 @@ scheduler::wait_until( context * active_ctx, // with other threads // push active context to sleep-queue active_ctx->tp_ = sleep_tp; - sleep_queue_.insert( * active_ctx); + active_ctx->sleep_link( sleep_queue_); // resume another context resume_( active_ctx, get_next_() ); // context has been resumed diff --git a/src/timed_mutex.cpp b/src/timed_mutex.cpp index 38685e4d..8ecc5e1f 100644 --- a/src/timed_mutex.cpp +++ b/src/timed_mutex.cpp @@ -56,7 +56,7 @@ timed_mutex::lock() { // store this fiber in order to be notified later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - wait_queue_.push_back( * ctx); + ctx->wait_link( wait_queue_); lk.unlock(); // suspend this fiber ctx->suspend(); @@ -96,7 +96,7 @@ timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point const& timeo // store this fiber in order to be notified later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - wait_queue_.push_back( * ctx); + ctx->wait_link( wait_queue_); lk.unlock(); // suspend this fiber until notified or timed-out if ( ! context::active()->wait_until( timeout_time) ) {