diff --git a/include/boost/fiber/condition.hpp b/include/boost/fiber/condition.hpp index 7530c09a..182c1419 100644 --- a/include/boost/fiber/condition.hpp +++ b/include/boost/fiber/condition.hpp @@ -74,14 +74,13 @@ public: // in order notify (resume) this fiber later BOOST_ASSERT( ! f->wait_is_linked() ); wait_queue_.push_back( * f); + lk.unlock(); // unlock external lt.unlock(); // suspend this fiber - // locked spinlock will be released if this fiber - // was stored inside manager's waiting-queue - f->do_wait( lk); + f->do_schedule(); // lock external again before returning lt.lock(); @@ -105,14 +104,13 @@ public: // in order notify (resume) this fiber later BOOST_ASSERT( ! f->wait_is_linked() ); wait_queue_.push_back( * f); + lk.unlock(); // unlock external lt.unlock(); // suspend this fiber - // locked spinlock will be released if this fiber - // was stored inside manager's waiting-queue - if ( ! f->do_wait_until( timeout_time, lk) ) { + if ( ! f->do_wait_until( timeout_time) ) { // this fiber was not notified before timeout // lock spinlock again detail::spinlock_lock lk( splk_); diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 9820d945..3541d76c 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -80,7 +80,9 @@ private: } }; - typedef std::map< uintptr_t, fss_data > fss_data_t; + typedef std::map< uintptr_t, fss_data > fss_data_t; + typedef std::vector< context * > wait_queue_t; + //typedef detail::wait_queue< context > wait_queue_t; static thread_local context * active_; @@ -97,13 +99,12 @@ private: scheduler * scheduler_; boost::context::execution_context ctx_; fss_data_t fss_data_; - std::vector< context * > wait_queue_; + wait_queue_t wait_queue_; std::exception_ptr except_; std::chrono::steady_clock::time_point tp_; fiber_properties * properties_; - bool do_wait_until_( std::chrono::steady_clock::time_point const&, - detail::spinlock_lock &); + bool do_wait_until_( std::chrono::steady_clock::time_point const&); protected: virtual void deallocate() { @@ -373,26 +374,19 @@ public: void do_schedule(); - void do_wait( detail::spinlock_lock &); - template< typename Clock, typename Duration > - bool do_wait_until( std::chrono::time_point< Clock, Duration > const& timeout_time_, - detail::spinlock_lock & lk) { + bool do_wait_until( std::chrono::time_point< Clock, Duration > const& timeout_time_) { std::chrono::steady_clock::time_point timeout_time( detail::convert_tp( timeout_time_) ); - return do_wait_until_( timeout_time, lk); - } - - template< typename Rep, typename Period > - bool do_wait_for( std::chrono::duration< Rep, Period > const& timeout_duration, - detail::spinlock_lock & lk) { - return do_wait_until_( std::chrono::steady_clock::now() + timeout_duration, lk); + return do_wait_until_( timeout_time); } void do_yield(); void do_join( context *); + void do_signal( context *); + std::size_t do_ready_fibers() const noexcept; void do_set_sched_algo( std::unique_ptr< sched_algorithm >); diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index e2c125ed..ca1a0914 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -60,13 +60,14 @@ public: void wait( context *, detail::spinlock_lock &); bool wait_until( context *, - std::chrono::steady_clock::time_point const&, - detail::spinlock_lock &); + std::chrono::steady_clock::time_point const&); void yield( context *); void join( context *,context *); + void signal( context *); + size_t ready_fibers() const noexcept; void set_sched_algo( std::unique_ptr< sched_algorithm >); diff --git a/src/condition.cpp b/src/condition.cpp index 3e9cd8ff..cf84ceb6 100644 --- a/src/condition.cpp +++ b/src/condition.cpp @@ -38,23 +38,22 @@ condition::notify_one() { // notify waiting fiber if ( nullptr != f) { - f->set_ready(); + context::active()->do_signal( f); } } void condition::notify_all() { - wait_queue_t waiting; + wait_queue_t tmp; detail::spinlock_lock lk( splk_); // get all waiting fibers - waiting.swap( wait_queue_); + tmp.swap( wait_queue_); lk.unlock(); // notify all waiting fibers - for ( context & f : waiting) { - f.set_ready(); - // f->wait_unlink(); ? + for ( context & f : tmp) { + context::active()->do_signal( & f); } } diff --git a/src/context.cpp b/src/context.cpp index 7add37c3..1e56b908 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -49,18 +49,16 @@ void context::release() { BOOST_ASSERT( is_terminated() ); - std::vector< context * > waiting; + wait_queue_t tmp; // get all waiting fibers splk_.lock(); - waiting.swap( wait_queue_); + tmp.swap( wait_queue_); splk_.unlock(); // notify all waiting fibers - for ( context * f : waiting) { - BOOST_ASSERT( nullptr != f); - BOOST_ASSERT( ! f->is_terminated() ); - f->set_ready(); + for ( context * f : wait_queue_) { + do_signal( f); } // release fiber-specific-data @@ -146,12 +144,11 @@ context::set_properties( fiber_properties * props) { } bool -context::do_wait_until_( std::chrono::steady_clock::time_point const& time_point, - detail::spinlock_lock & lk) { +context::do_wait_until_( std::chrono::steady_clock::time_point const& time_point) { BOOST_ASSERT( nullptr != scheduler_); BOOST_ASSERT( this == active_); - return scheduler_->wait_until( this, time_point, lk); + return scheduler_->wait_until( this, time_point); } void @@ -170,14 +167,6 @@ context::do_schedule() { scheduler_->run( this); } -void -context::do_wait( detail::spinlock_lock & lk) { - BOOST_ASSERT( nullptr != scheduler_); - BOOST_ASSERT( this == active_); - - scheduler_->wait( this, lk); -} - void context::do_yield() { BOOST_ASSERT( nullptr != scheduler_); @@ -191,8 +180,26 @@ context::do_join( context * f) { BOOST_ASSERT( nullptr != scheduler_); BOOST_ASSERT( this == active_); BOOST_ASSERT( nullptr != f); + BOOST_ASSERT( f != active_); - scheduler_->join( this, f); + if ( f->join( this) ) { + scheduler_->run( this); + } +} + +void +context::do_signal( context * f) { + BOOST_ASSERT( nullptr != scheduler_); + BOOST_ASSERT( this == active_); + BOOST_ASSERT( nullptr != f); + + if ( scheduler_ == f->scheduler_) { + // local scheduler + scheduler_->signal( f); + } else { + // scheduler in another thread + f->scheduler_->signal( f); + } } std::size_t diff --git a/src/mutex.cpp b/src/mutex.cpp index 08f14eb0..b2f694a0 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -12,7 +12,6 @@ #include "boost/fiber/scheduler.hpp" #include "boost/fiber/interruption.hpp" -#include "boost/fiber/operations.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -29,7 +28,7 @@ mutex::lock_if_unlocked_() { state_ = mutex_status::locked; BOOST_ASSERT( ! owner_); - owner_ = this_fiber::get_id(); + owner_ = context::active()->get_id(); return true; } @@ -58,10 +57,12 @@ mutex::lock() { // store this fiber in order to be notified later BOOST_ASSERT( ! f->wait_is_linked() ); + f->set_waiting(); wait_queue_.push_back( * f); + lk.unlock(); // suspend this fiber - context::active()->do_wait( lk); + f->do_schedule(); } } @@ -76,14 +77,14 @@ mutex::try_lock() { lk.unlock(); // let other fiber release the lock - this_fiber::yield(); + context::active()->do_yield(); return false; } void mutex::unlock() { BOOST_ASSERT( mutex_status::locked == state_); - BOOST_ASSERT( this_fiber::get_id() == owner_); + BOOST_ASSERT( context::active()->get_id() == owner_); detail::spinlock_lock lk( splk_); context * f( nullptr); @@ -97,8 +98,7 @@ mutex::unlock() { lk.unlock(); if ( nullptr != f) { - BOOST_ASSERT( ! f->is_terminated() ); - f->set_ready(); + context::active()->do_signal( f); } } diff --git a/src/recursive_mutex.cpp b/src/recursive_mutex.cpp index bb43f385..1f54477c 100644 --- a/src/recursive_mutex.cpp +++ b/src/recursive_mutex.cpp @@ -12,7 +12,6 @@ #include "boost/fiber/scheduler.hpp" #include "boost/fiber/interruption.hpp" -#include "boost/fiber/operations.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -26,10 +25,10 @@ recursive_mutex::lock_if_unlocked_() { if ( mutex_status::unlocked == state_) { state_ = mutex_status::locked; BOOST_ASSERT( ! owner_); - owner_ = this_fiber::get_id(); + owner_ = context::active()->get_id(); ++count_; return true; - } else if ( this_fiber::get_id() == owner_) { + } else if ( context::active()->get_id() == owner_) { ++count_; return true; } @@ -64,10 +63,12 @@ recursive_mutex::lock() { // store this fiber in order to be notified later BOOST_ASSERT( ! f->wait_is_linked() ); + f->set_waiting(); wait_queue_.push_back( * f); + lk.unlock(); // suspend this fiber - context::active()->do_wait( lk); + f->do_schedule(); } } @@ -82,14 +83,14 @@ recursive_mutex::try_lock() { lk.unlock(); // let other fiber release the lock - this_fiber::yield(); + context::active()->do_yield(); return false; } void recursive_mutex::unlock() { BOOST_ASSERT( mutex_status::locked == state_); - BOOST_ASSERT( this_fiber::get_id() == owner_); + BOOST_ASSERT( context::active()->get_id() == owner_); detail::spinlock_lock lk( splk_); context * f( nullptr); @@ -104,8 +105,7 @@ recursive_mutex::unlock() { lk.unlock(); if ( nullptr != f) { - BOOST_ASSERT( ! f->is_terminated() ); - f->set_ready(); + context::active()->do_signal( f); } } } diff --git a/src/recursive_timed_mutex.cpp b/src/recursive_timed_mutex.cpp index f805f627..bf7e54cb 100644 --- a/src/recursive_timed_mutex.cpp +++ b/src/recursive_timed_mutex.cpp @@ -12,7 +12,6 @@ #include "boost/fiber/scheduler.hpp" #include "boost/fiber/interruption.hpp" -#include "boost/fiber/operations.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -26,10 +25,10 @@ recursive_timed_mutex::lock_if_unlocked_() { if ( mutex_status::unlocked == state_) { state_ = mutex_status::locked; BOOST_ASSERT( ! owner_); - owner_ = this_fiber::get_id(); + owner_ = context::active()->get_id(); ++count_; return true; - } else if ( this_fiber::get_id() == owner_) { + } else if ( context::active()->get_id() == owner_) { ++count_; return true; } @@ -64,10 +63,12 @@ recursive_timed_mutex::lock() { // store this fiber in order to be notified later BOOST_ASSERT( ! f->wait_is_linked() ); + f->set_waiting(); wait_queue_.push_back( * f); + lk.unlock(); // suspend this fiber - context::active()->do_wait( lk); + f->do_schedule(); } } @@ -82,7 +83,7 @@ recursive_timed_mutex::try_lock() { lk.unlock(); // let other fiber release the lock - this_fiber::yield(); + context::active()->do_yield(); return false; } @@ -103,10 +104,12 @@ recursive_timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point co // store this fiber in order to be notified later BOOST_ASSERT( ! f->wait_is_linked() ); + f->set_waiting(); wait_queue_.push_back( * f); + lk.unlock(); // suspend this fiber until notified or timed-out - if ( ! context::active()->do_wait_until( timeout_time, lk) ) { + if ( ! f->do_wait_until( timeout_time) ) { lk.lock(); f->wait_unlink(); lk.unlock(); @@ -118,7 +121,7 @@ recursive_timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point co void recursive_timed_mutex::unlock() { BOOST_ASSERT( mutex_status::locked == state_); - BOOST_ASSERT( this_fiber::get_id() == owner_); + BOOST_ASSERT( context::active()->get_id() == owner_); detail::spinlock_lock lk( splk_); context * f( nullptr); @@ -133,8 +136,7 @@ recursive_timed_mutex::unlock() { lk.unlock(); if ( nullptr != f) { - BOOST_ASSERT( ! f->is_terminated() ); - f->set_ready(); + context::active()->do_signal( f); } } } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index da2579db..3007b9ad 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -233,26 +233,12 @@ scheduler::run( context * af) { } } -void -scheduler::wait( context * af, detail::spinlock_lock & lk) { - wait_until( - af, - std::chrono::steady_clock::time_point( - (std::chrono::steady_clock::duration::max)() ), - lk); -} - bool scheduler::wait_until( context * af, - std::chrono::steady_clock::time_point const& timeout_time, - detail::spinlock_lock & lk) { + std::chrono::steady_clock::time_point const& timeout_time) { BOOST_ASSERT( nullptr != af); BOOST_ASSERT( context::active() == af); - BOOST_ASSERT( af->is_running() ); - // set active-fiber to state_waiting - af->set_waiting(); - // release lock - lk.unlock(); + BOOST_ASSERT( af->is_running() || af->is_waiting() ); // push active-fiber to waiting-queue af->time_point( timeout_time); BOOST_ASSERT( ! af->sleep_is_linked() ); @@ -311,6 +297,16 @@ scheduler::join( context * af, context * f) { BOOST_ASSERT( f->is_terminated() ); } +void +scheduler::signal( context * f) { + BOOST_ASSERT( nullptr != f); + BOOST_ASSERT( ! f->is_terminated() ); + // set fiber to state_ready + f->set_ready(); + // put reafy fiber ot read-queue + ready_queue_.push_back( * f); +} + void scheduler::set_sched_algo( std::unique_ptr< sched_algorithm > algo) { sched_algo_ = std::move( algo); diff --git a/src/timed_mutex.cpp b/src/timed_mutex.cpp index 475ed2b2..745a989f 100644 --- a/src/timed_mutex.cpp +++ b/src/timed_mutex.cpp @@ -12,7 +12,6 @@ #include "boost/fiber/scheduler.hpp" #include "boost/fiber/interruption.hpp" -#include "boost/fiber/operations.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -29,7 +28,7 @@ timed_mutex::lock_if_unlocked_() { state_ = mutex_status::locked; BOOST_ASSERT( ! owner_); - owner_ = this_fiber::get_id(); + owner_ = context::active()->get_id(); return true; } @@ -58,10 +57,12 @@ timed_mutex::lock() { // store this fiber in order to be notified later BOOST_ASSERT( ! f->wait_is_linked() ); + f->set_waiting(); wait_queue_.push_back( * f); + lk.unlock(); // suspend this fiber - context::active()->do_wait( lk); + f->do_schedule(); } } @@ -76,7 +77,7 @@ timed_mutex::try_lock() { lk.unlock(); // let other fiber release the lock - this_fiber::yield(); + context::active()->do_yield(); return false; } @@ -97,10 +98,12 @@ timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point const& timeo // store this fiber in order to be notified later BOOST_ASSERT( ! f->wait_is_linked() ); + f->set_waiting(); wait_queue_.push_back( * f); + lk.unlock(); // suspend this fiber until notified or timed-out - if ( ! context::active()->do_wait_until( timeout_time, lk) ) { + if ( ! context::active()->do_wait_until( timeout_time) ) { lk.lock(); f->wait_unlink(); lk.unlock(); @@ -112,7 +115,7 @@ timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point const& timeo void timed_mutex::unlock() { BOOST_ASSERT( mutex_status::locked == state_); - BOOST_ASSERT( this_fiber::get_id() == owner_); + BOOST_ASSERT( context::active()->get_id() == owner_); detail::spinlock_lock lk( splk_); context * f( nullptr); @@ -126,8 +129,7 @@ timed_mutex::unlock() { lk.unlock(); if ( nullptr != f) { - BOOST_ASSERT( ! f->is_terminated() ); - f->set_ready(); + context::active()->do_signal( f); } }