diff --git a/include/boost/fiber/condition.hpp b/include/boost/fiber/condition.hpp index 7bd59865..9b589b43 100644 --- a/include/boost/fiber/condition.hpp +++ b/include/boost/fiber/condition.hpp @@ -21,7 +21,7 @@ #include #include -#include +#include #include #include #include @@ -38,24 +38,16 @@ # pragma warning(disable:4355 4251 4275) # endif +#include + namespace boost { namespace fibers { class BOOST_FIBERS_DECL condition : private noncopyable { private: - enum command - { - SLEEPING = 0, - NOTIFY_ONE, - NOTIFY_ALL - }; - - atomic< command > cmd_; - atomic< std::size_t > waiters_; - mutex enter_mtx_; - detail::spinlock waiting_mtx_; - std::deque< detail::fiber_base::ptr_t > waiting_; + detail::spinlock waiting_mtx_; + std::deque< detail::notify::ptr_t > waiting_; public: condition(); @@ -76,249 +68,53 @@ public: template< typename LockType > void wait( LockType & lt) { - detail::fiber_base::ptr_t f( detail::scheduler::instance().active() ); + detail::notify::ptr_t n( detail::scheduler::instance().active() ); + try { - unique_lock< mutex > lk( enter_mtx_); - BOOST_ASSERT( lk); + if ( n) { + // store this fiber in order to be notified later unique_lock< detail::spinlock > lk( waiting_mtx_); - BOOST_ASSERT( lk); - waiting_.push_back( f); - ++waiters_; + waiting_.push_back( n); + lt.unlock(); + + // suspend fiber + detail::scheduler::instance().wait( lk); + + // check if fiber was interrupted + this_fiber::interruption_point(); } - lt.unlock(); - } - - bool unlock_enter_mtx = false; - - //Loop until a notification indicates that the fiber should exit - for (;;) - { - //The fiber sleeps/spins until a spin_condition commands a notification - //Notification occurred, we will lock the checking mutex so that - while ( SLEEPING == cmd_) - { - try - { - if ( f) - { - // check if fiber was interrupted - this_fiber::interruption_point(); - - detail::scheduler::instance().wait(); - - // check if fiber was interrupted - this_fiber::interruption_point(); - } - else - { - // run scheduler - detail::scheduler::instance().run(); - } - } - catch (...) - { - // FIXME: use multi-index container - // remove fiber from waiting_ - unique_lock< detail::spinlock > lk( waiting_mtx_); - waiting_.erase( - std::find( waiting_.begin(), waiting_.end(), f) ); - --waiters_; - throw; - } - } - - command expected = NOTIFY_ONE; - cmd_.compare_exchange_strong( expected, SLEEPING); - if ( SLEEPING == expected) - //Other fiber has been notified and since it was a NOTIFY one - //command, this fiber must sleep again - continue; - else if ( NOTIFY_ONE == expected) - { - //If it was a NOTIFY_ONE command, only this fiber should - //exit. This fiber has atomically marked command as sleep before - //so no other fiber will exit. - //Decrement wait count. - --waiters_; - unlock_enter_mtx = true; - break; - } else { - //If it is a NOTIFY_ALL command, all fibers should return - //from do_timed_wait function. Decrement wait count. + // notifier for main-fiber + n = detail::scheduler::instance().notifier(); + // store this fiber in order to be notified later unique_lock< detail::spinlock > lk( waiting_mtx_); - unlock_enter_mtx = 0 == --waiters_; + waiting_.push_back( n); + lt.unlock(); + lk.unlock(); - //Check if this is the last fiber of notify_all waiters - //Only the last fiber will release the mutex - if ( unlock_enter_mtx) - { - expected = NOTIFY_ALL; - cmd_.compare_exchange_strong( expected, SLEEPING); - } - break; + while ( ! n->woken_up() ) + { + fprintf(stdout, "condition: main-fiber not woken-up\n"); + // run scheduler + detail::scheduler::instance().run(); + } + fprintf(stdout, "condition: main-fiber woken-up\n"); } } + catch (...) + { + // remove fiber from waiting_ + unique_lock< detail::spinlock > lk( waiting_mtx_); + waiting_.erase( + std::find( waiting_.begin(), waiting_.end(), n) ); + throw; + } - //Unlock the enter mutex if it is a single notification, if this is - //the last notified fiber in a notify_all or a timeout has occurred - if ( unlock_enter_mtx) - enter_mtx_.unlock(); - - //Lock external again before returning from the method + // lock external again before returning lt.lock(); } -#if 0 - template< typename LockType, typename TimeDuration > - bool timed_wait( LockType & lt, TimeDuration const& dt) - { return timed_wait( lt, chrono::system_clock::now() + dt); } - - template< typename LockType, typename TimeDuration, typename Pred > - bool timed_wait( LockType & lt, TimeDuration const& dt, Pred pred) - { return timed_wait( lt, chrono::system_clock::now() + dt, pred); } - - template< typename LockType, typename Pred > - bool timed_wait( LockType & lt, chrono::system_clock::time_point const& abs_time, Pred pred) - { - while ( ! pred() ) - if ( ! timed_wait( lt, abs_time) ) - return pred(); - return true; - } - - template< typename LockType > - bool timed_wait( LockType & lt, chrono::system_clock::time_point const& abs_time) - { - if ( (chrono::system_clock::time_point::max)() == abs_time){ - wait( lt); - return true; - } - chrono::system_clock::time_point now( chrono::system_clock::now() ); - if ( now >= abs_time) return false; - - { - mutex::scoped_lock lk( enter_mtx_); // FIXME: abs_time! - BOOST_ASSERT( lk); - ++waiters_; - lt.unlock(); - } - - bool unlock_enter_mtx = false, timed_out = false; - //Loop until a notification indicates that the fiber should - //exit or timeout occurs - for (;;) - { - //The fiber sleeps/spins until a spin_condition commands a notification - //Notification occurred, we will lock the checking mutex so that - while ( SLEEPING == cmd_) - { - detail::fiber_base::ptr_t f( detail::scheduler::instance().active() ); - try - { - if ( f) - { - // check if fiber was interrupted - this_fiber::interruption_point(); - - this_fiber::yield(); - - // check if fiber was interrupted - this_fiber::interruption_point(); - } - else - { - // store a dummy fiber - unique_lock< detail::spinlock > lk( waiting_mtx_); - waiting_.push_back( f); - // run scheduler - run(); - } - } - catch (...) - { - --waiters_; - // FIXME: use multi-index container - // remove fiber from waiting_ - unique_lock< detail::spinlock > lk( waiting_mtx_); - waiting_.erase( - std::find( waiting_.ebgin(), waiting_.end(), f) ); - throw; - } - - now = chrono::system_clock::now(); - if ( now >= abs_time) - { - //If we can lock the mutex it means that no notification - //is being executed in this spin_condition variable - timed_out = enter_mtx_.try_lock(); - - //If locking fails, indicates that another fiber is executing - //notification, so we play the notification game - if ( ! timed_out) - //There is an ongoing notification, we will try again later - continue; - - //No notification in execution, since enter mutex is locked. - //We will execute time-out logic, so we will decrement count, - //release the enter mutex and return false. - break; - } - } - - //If a timeout occurred, the mutex will not execute checking logic - if ( timed_out) - { - unlock_enter_mtx = true; - --waiters_; - break; - } - else - { - command expected = NOTIFY_ONE; - cmd_.compare_exchange_strong( expected, SLEEPING); - if ( SLEEPING == expected) - //Other fiber has been notified and since it was a NOTIFY one - //command, this fiber must sleep again - continue; - else if ( NOTIFY_ONE == expected) - { - //If it was a NOTIFY_ONE command, only this fiber should - //exit. This fiber has atomically marked command as sleep before - //so no other fiber will exit. - //Decrement wait count. - unlock_enter_mtx = true; - --waiters_; - break; - } - else - { - //If it is a NOTIFY_ALL command, all fibers should return - //from do_timed_wait function. Decrement wait count. - unlock_enter_mtx = 0 == --waiters_; - //Check if this is the last fiber of notify_all waiters - //Only the last fiber will release the mutex - if ( unlock_enter_mtx) - { - expected = NOTIFY_ALL; - cmd_.compare_exchange_strong( expected, SLEEPING); - } - break; - } - } - } - - //Unlock the enter mutex if it is a single notification, if this is - //the last notified fiber in a notify_all or a timeout has occurred - if ( unlock_enter_mtx) - enter_mtx_.unlock(); - - //Lock external again before returning from the method - lt.lock(); - return ! timed_out; - } -#endif }; typedef condition condition_variable; diff --git a/src/condition.cpp b/src/condition.cpp index fb1a3a9d..9c0459fd 100644 --- a/src/condition.cpp +++ b/src/condition.cpp @@ -22,9 +22,6 @@ namespace boost { namespace fibers { condition::condition() : - cmd_( SLEEPING), - waiters_( 0), - enter_mtx_(), waiting_mtx_(), waiting_() {} @@ -35,68 +32,31 @@ condition::~condition() void condition::notify_one() { - // This mutex guarantees that no other thread can enter to the - // wait method logic, so that thread count will be - // constant until the function writes a NOTIFY_ONE command. - // It also guarantees that no other notification can be signaled - // on this condition before this one ends - enter_mtx_.lock(); + detail::notify::ptr_t n; - // Return if there are no waiters - if ( waiting_.empty() ) - { - BOOST_ASSERT( 0 == waiters_); - enter_mtx_.unlock(); - return; - } - else - { - if ( waiting_.front() ) - waiting_.front()->wake_up(); + unique_lock< detail::spinlock > lk( waiting_mtx_); + if ( ! waiting_.empty() ) { + n.swap( waiting_.front() ); waiting_.pop_front(); } + lk.unlock(); - // Notify that all fibers should execute wait logic - command expected = SLEEPING; - while ( ! cmd_.compare_exchange_strong( expected, NOTIFY_ONE) ) - { -// if ( this_fiber::is_fiberized() ) -// this_fiber::yield(); - expected = SLEEPING; - } + if ( n) + n->wake_up(); } void condition::notify_all() { - // This mutex guarantees that no other thread can enter to the - // wait method logic, so that thread count will be - // constant until the function writes a NOTIFY_ALL command. - // It also guarantees that no other notification can be signaled - // on this condition before this one ends - enter_mtx_.lock(); + std::deque< detail::notify::ptr_t > waiting; - // Return if there are no waiters - if ( waiting_.empty() ) - { - BOOST_ASSERT( 0 == waiters_); - enter_mtx_.unlock(); - return; - } - else - { - BOOST_FOREACH( detail::fiber_base::ptr_t const& f, waiting_) - { if ( f) f->wake_up(); } - waiting_.clear(); - } + unique_lock< detail::spinlock > lk( waiting_mtx_); + waiting.swap( waiting_); + lk.unlock(); - // Notify that all fibers should execute wait logic - command expected = SLEEPING; - while ( SLEEPING != cmd_.compare_exchange_strong( expected, NOTIFY_ALL) ) + BOOST_FOREACH( detail::notify::ptr_t const& n, waiting) { -// if ( this_fiber::is_fiberized() ) -// this_fiber::yield(); - expected = SLEEPING; + n->wake_up(); } }