From 57a33062d8a8c68eac8f55d26c29f788ca0ced08 Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Tue, 1 Jan 2013 15:29:39 +0100 Subject: [PATCH] spin_mutex used --- include/boost/fiber/algorithm.hpp | 5 +- include/boost/fiber/auto_reset_event.hpp | 9 +- include/boost/fiber/bounded_channel.hpp | 30 +++-- include/boost/fiber/condition.hpp | 136 ++++++++++++++------- include/boost/fiber/count_down_event.hpp | 10 +- include/boost/fiber/detail/fiber_base.hpp | 12 +- include/boost/fiber/manual_reset_event.hpp | 35 +++--- include/boost/fiber/mutex.hpp | 2 + include/boost/fiber/round_robin.hpp | 5 +- src/auto_reset_event.cpp | 61 +++++---- src/barrier.cpp | 11 +- src/condition.cpp | 47 ++++--- src/count_down_event.cpp | 61 +++++---- src/detail/fiber_base.cpp | 3 - src/detail/spin_mutex.cpp | 2 +- src/manual_reset_event.cpp | 57 +++++---- src/mutex.cpp | 49 +++----- src/round_robin.cpp | 84 +++++++------ 18 files changed, 331 insertions(+), 288 deletions(-) diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp index c413ee25..c3591fd7 100644 --- a/include/boost/fiber/algorithm.hpp +++ b/include/boost/fiber/algorithm.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #ifdef BOOST_HAS_ABI_HEADERS @@ -36,15 +37,13 @@ struct BOOST_FIBERS_DECL algorithm : private noncopyable virtual void cancel( detail::fiber_base::ptr_t const&) = 0; - virtual void notify( detail::fiber_base::ptr_t const&) = 0; - virtual detail::fiber_base::ptr_t active() = 0; virtual void sleep( chrono::system_clock::time_point const& abs_time) = 0; virtual bool run() = 0; - virtual void wait() = 0; + virtual void wait( detail::spin_mutex::scoped_lock &) = 0; virtual void yield() = 0; diff --git a/include/boost/fiber/auto_reset_event.hpp b/include/boost/fiber/auto_reset_event.hpp index 05f40c49..80564d27 100644 --- a/include/boost/fiber/auto_reset_event.hpp +++ b/include/boost/fiber/auto_reset_event.hpp @@ -9,12 +9,14 @@ #include +#include #include #include #include #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -37,10 +39,9 @@ private: RESET }; - state state_; - std::deque< - detail::fiber_base::ptr_t - > waiting_; + atomic< state > state_; + detail::spin_mutex waiting_mtx_; + std::deque< detail::fiber_base::ptr_t > waiting_; public: explicit auto_reset_event( bool = false); diff --git a/include/boost/fiber/bounded_channel.hpp b/include/boost/fiber/bounded_channel.hpp index ef94a182..6e70f68d 100644 --- a/include/boost/fiber/bounded_channel.hpp +++ b/include/boost/fiber/bounded_channel.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -78,8 +79,8 @@ private: DEACTIVE }; - state state_; - std::size_t count_; + atomic< state > state_; + atomic< std::size_t > count_; typename node_type::ptr head_; mutable mutex head_mtx_; typename node_type::ptr tail_; @@ -173,10 +174,7 @@ public: } bool empty() const - { - mutex::scoped_lock lk( head_mtx_); - return empty_(); - } + { return empty_(); } void put( T const& t) { @@ -240,13 +238,13 @@ public: return false; if ( empty) { -// try -// { + try + { while ( active_() && empty_() ) not_empty_cond_.wait( lk); -// } -// catch ( fibers_interrupted const&) -// { return false; } + } + catch ( fiber_interrupted const&) + { return false; } } if ( ! active_() && empty_() ) return false; @@ -276,16 +274,16 @@ public: return false; if ( empty) { -// try -// { + try + { while ( active_() && empty_() ) { if ( ! not_empty_cond_.timed_wait( lk, abs_time) ) return false; } -// } -// catch ( fibers_interrupted const&) -// { return false; } + } + catch ( fiber_interrupted const&) + { return false; } } if ( ! active_() && empty_() ) return false; diff --git a/include/boost/fiber/condition.hpp b/include/boost/fiber/condition.hpp index 514c417c..f9fad470 100644 --- a/include/boost/fiber/condition.hpp +++ b/include/boost/fiber/condition.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -21,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -47,13 +49,12 @@ private: NOTIFY_ALL }; - command cmd_; - std::size_t waiters_; - mutex enter_mtx_; - mutex check_mtx_; - std::deque< - detail::fiber_base::ptr_t - > waiting_; + atomic< command > cmd_; + atomic< std::size_t > waiters_; + mutex enter_mtx_; + mutex check_mtx_; + detail::spin_mutex waiting_mtx_; + std::deque< detail::fiber_base::ptr_t > waiting_; public: condition(); @@ -74,6 +75,8 @@ public: template< typename LockType > void wait( LockType & lt) { + BOOST_ASSERT( this_fiber::is_fiberized() ); + { mutex::scoped_lock lk( enter_mtx_); BOOST_ASSERT( lk); @@ -82,39 +85,58 @@ public: } bool unlock_enter_mtx = false; + + //Loop until a notification indicates that the thread should exit for (;;) { + //The thread sleeps/spins until a spin_condition commands a notification + //Notification occurred, we will lock the checking mutex so that while ( SLEEPING == cmd_) { - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( - detail::scheduler::instance().active() ); - detail::scheduler::instance().wait(); - } - else - detail::scheduler::instance().run(); + detail::spin_mutex::scoped_lock lk( waiting_mtx_); + waiting_.push_back( + detail::scheduler::instance().active() ); + detail::scheduler::instance().wait( lk); } - if ( NOTIFY_ONE == cmd_) - { - unlock_enter_mtx = true; - --waiters_; - cmd_ = SLEEPING; - break; - } + command expected = NOTIFY_ONE; + cmd_.compare_exchange_strong( expected, SLEEPING); + if ( SLEEPING == expected) + //Other thread has been notified and since it was a NOTIFY one + //command, this thread must sleep again + continue; + else if ( NOTIFY_ONE == expected) + { + //If it was a NOTIFY_ONE command, only this thread should + //exit. This thread has atomically marked command as sleep before + //so no other thread will exit. + //Decrement wait count. + unlock_enter_mtx = true; + --waiters_; + break; + } else { - unlock_enter_mtx = 0 == --waiters_; + //If it is a NOTIFY_ALL command, all threads should return + //from do_timed_wait function. Decrement wait count. + unlock_enter_mtx = 0 == --waiters_; + //Check if this is the last thread of notify_all waiters + //Only the last thread will release the mutex if ( unlock_enter_mtx) - cmd_ = SLEEPING; + { + expected = NOTIFY_ALL; + cmd_.compare_exchange_strong( expected, SLEEPING); + } break; } } + //Unlock the enter mutex if it is a single notification, if this is + //the last notified thread in a notify_all or a timeout has occurred if ( unlock_enter_mtx) enter_mtx_.unlock(); + //Lock external again before returning from the method lt.lock(); } @@ -138,6 +160,8 @@ public: template< typename LockType > bool timed_wait( LockType & lt, chrono::system_clock::time_point const& abs_time) { + BOOST_ASSERT( this_fiber::is_fiberized() ); + if ( (chrono::system_clock::time_point::max)() == abs_time){ wait( lt); return true; @@ -153,36 +177,40 @@ public: } bool unlock_enter_mtx = false, timed_out = false; + //Loop until a notification indicates that the thread should + //exit or timeout occurs for (;;) { + //The thread sleeps/spins until a spin_condition commands a notification + //Notification occurred, we will lock the checking mutex so that while ( SLEEPING == cmd_) { - now = chrono::system_clock::now(); - if ( now >= abs_time) - { - while ( ! ( timed_out = enter_mtx_.try_lock() ) ) - detail::scheduler::instance().yield(); - break; - } - - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( - detail::scheduler::instance().active() ); - detail::scheduler::instance().sleep( abs_time); - } - else - detail::scheduler::instance().run(); + detail::spin_mutex::scoped_lock lk( waiting_mtx_); + waiting_.push_back( + detail::scheduler::instance().active() ); + detail::scheduler::instance().wait( lk); now = chrono::system_clock::now(); if ( now >= abs_time) { - while ( ! ( timed_out = enter_mtx_.try_lock() ) ) - detail::scheduler::instance().yield(); + //If we can lock the mutex it means that no notification + //is being executed in this spin_condition variable + timed_out = enter_mtx_.try_lock(); + + //If locking fails, indicates that another thread is executing + //notification, so we play the notification game + if ( ! timed_out) + //There is an ongoing notification, we will try again later + continue; + + //No notification in execution, since enter mutex is locked. + //We will execute time-out logic, so we will decrement count, + //release the enter mutex and return false. break; } } + //If a timeout occurred, the mutex will not execute checking logic if ( timed_out) { unlock_enter_mtx = true; @@ -190,8 +218,18 @@ public: break; } - if ( NOTIFY_ONE == cmd_) + command expected = NOTIFY_ONE; + cmd_.compare_exchange_strong( expected, SLEEPING); + if ( SLEEPING == expected) + //Other thread has been notified and since it was a NOTIFY one + //command, this thread must sleep again + continue; + else if ( NOTIFY_ONE == expected) { + //If it was a NOTIFY_ONE command, only this thread should + //exit. This thread has atomically marked command as sleep before + //so no other thread will exit. + //Decrement wait count. unlock_enter_mtx = true; --waiters_; cmd_ = SLEEPING; @@ -199,16 +237,26 @@ public: } else { + //If it is a NOTIFY_ALL command, all threads should return + //from do_timed_wait function. Decrement wait count. unlock_enter_mtx = 0 == --waiters_; + //Check if this is the last thread of notify_all waiters + //Only the last thread will release the mutex if ( unlock_enter_mtx) - cmd_ = SLEEPING; + { + expected = NOTIFY_ALL; + cmd_.compare_exchange_strong( expected, SLEEPING); + } break; } } + //Unlock the enter mutex if it is a single notification, if this is + //the last notified thread in a notify_all or a timeout has occurred if ( unlock_enter_mtx) enter_mtx_.unlock(); + //Lock external again before returning from the method lt.lock(); return ! timed_out; } diff --git a/include/boost/fiber/count_down_event.hpp b/include/boost/fiber/count_down_event.hpp index 6f7e8344..db686635 100644 --- a/include/boost/fiber/count_down_event.hpp +++ b/include/boost/fiber/count_down_event.hpp @@ -16,6 +16,7 @@ #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -32,11 +33,10 @@ namespace fibers { class BOOST_FIBERS_DECL count_down_event : private noncopyable { private: - std::size_t initial_; - std::size_t current_; - std::deque< - detail::fiber_base::ptr_t - > waiting_; + std::size_t initial_; + atomic< std::size_t > current_; + detail::spin_mutex waiting_mtx_; + std::deque< detail::fiber_base::ptr_t > waiting_; public: explicit count_down_event( std::size_t); diff --git a/include/boost/fiber/detail/fiber_base.hpp b/include/boost/fiber/detail/fiber_base.hpp index 55333b25..76288896 100644 --- a/include/boost/fiber/detail/fiber_base.hpp +++ b/include/boost/fiber/detail/fiber_base.hpp @@ -117,10 +117,10 @@ public: { return id( ptr_t( const_cast< fiber_base * >( this) ) ); } int priority() const BOOST_NOEXCEPT - { return priority_.load(); } + { return priority_; } void priority( int prio) BOOST_NOEXCEPT - { priority_.store( prio); } + { priority_ = prio; } void resume(); @@ -142,16 +142,16 @@ public: { return 0 != ( flags_ & flag_preserve_fpu); } bool is_terminated() const BOOST_NOEXCEPT - { return state_terminated == state_.load(); } + { return state_terminated == state_; } bool is_ready() const BOOST_NOEXCEPT - { return state_ready == state_.load(); } + { return state_ready == state_; } bool is_running() const BOOST_NOEXCEPT - { return state_running == state_.load(); } + { return state_running == state_; } bool is_waiting() const BOOST_NOEXCEPT - { return state_waiting == state_.load(); } + { return state_waiting == state_; } bool set_terminated() BOOST_NOEXCEPT { diff --git a/include/boost/fiber/manual_reset_event.hpp b/include/boost/fiber/manual_reset_event.hpp index 1e20b71e..84fe0981 100644 --- a/include/boost/fiber/manual_reset_event.hpp +++ b/include/boost/fiber/manual_reset_event.hpp @@ -10,12 +10,14 @@ #include #include +#include #include #include #include #include #include +#include #include #ifdef BOOST_HAS_ABI_HEADERS @@ -33,35 +35,34 @@ namespace fibers { class BOOST_FIBERS_DECL manual_reset_event : private noncopyable { private: - enum state - { - SET = 0, - RESET - }; + enum state + { + SET = 0, + RESET + }; - state state_; - std::size_t waiters_; - mutex enter_mtx_; - std::deque< - detail::fiber_base::ptr_t - > waiting_; + atomic< state > state_; + std::size_t waiters_; + mutex enter_mtx_; + detail::spin_mutex waiting_mtx_; + std::deque< detail::fiber_base::ptr_t > waiting_; public: - explicit manual_reset_event( bool = false); + explicit manual_reset_event( bool = false); - void set(); + void set(); - void reset(); + void reset(); - void wait(); + void wait(); template< typename TimeDuration > - bool timed_wait( TimeDuration const& dt) + bool timed_wait( TimeDuration const& dt) { return timed_wait( chrono::system_clock::now() + dt); } bool timed_wait( chrono::system_clock::time_point const& abs_time); - bool try_wait(); + bool try_wait(); }; }} diff --git a/include/boost/fiber/mutex.hpp b/include/boost/fiber/mutex.hpp index e8878a13..6a4c2714 100644 --- a/include/boost/fiber/mutex.hpp +++ b/include/boost/fiber/mutex.hpp @@ -18,6 +18,7 @@ #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -42,6 +43,7 @@ private: atomic< state > state_; detail::fiber_base::id owner_; + detail::spin_mutex mtx_; std::deque< detail::fiber_base::ptr_t > waiting_; diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp index 4cddb110..b8420610 100644 --- a/include/boost/fiber/round_robin.hpp +++ b/include/boost/fiber/round_robin.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -105,8 +106,6 @@ public: void cancel( detail::fiber_base::ptr_t const&); - void notify( detail::fiber_base::ptr_t const&); - detail::fiber_base::ptr_t active() BOOST_NOEXCEPT { return active_fiber_; } @@ -114,7 +113,7 @@ public: bool run(); - void wait(); + void wait( detail::spin_mutex::scoped_lock &); void yield(); diff --git a/src/auto_reset_event.cpp b/src/auto_reset_event.cpp index ba5495ec..e92ef45a 100644 --- a/src/auto_reset_event.cpp +++ b/src/auto_reset_event.cpp @@ -23,58 +23,60 @@ namespace fibers { auto_reset_event::auto_reset_event( bool isset) : state_( isset ? SET : RESET), + waiting_mtx_(), waiting_() {} void auto_reset_event::wait() { - while ( SET != state_) + BOOST_ASSERT( this_fiber::is_fiberized() ); + + while ( RESET == state_.exchange( RESET, memory_order_acquire) ) { - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( - detail::scheduler::instance().active() ); - detail::scheduler::instance().wait(); - } - else - detail::scheduler::instance().run(); + detail::spin_mutex::scoped_lock lk( waiting_mtx_); + waiting_.push_back( + detail::scheduler::instance().active() ); + detail::scheduler::instance().wait( lk); } - state_ = RESET; } bool auto_reset_event::timed_wait( chrono::system_clock::time_point const& abs_time) { - while ( SET != state_) + BOOST_ASSERT( this_fiber::is_fiberized() ); + + if ( chrono::system_clock::now() >= abs_time) return false; + + while ( RESET == state_.exchange( RESET, memory_order_acquire) ) { - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( - detail::scheduler::instance().active() ); - detail::scheduler::instance().sleep( abs_time); - } - else - detail::scheduler::instance().run(); + detail::spin_mutex::scoped_lock lk( waiting_mtx_); + waiting_.push_back( + detail::scheduler::instance().active() ); + detail::scheduler::instance().wait( lk); + + if ( chrono::system_clock::now() >= abs_time) return false; } - state_ = RESET; - return chrono::system_clock::now() <= abs_time; + + return true; } bool auto_reset_event::try_wait() { - if ( SET == state_) - { - state_ = RESET; - return true; - } - return false; + BOOST_ASSERT( this_fiber::is_fiberized() ); + + return SET == state_.exchange( RESET, memory_order_acquire); } void auto_reset_event::set() { + BOOST_ASSERT( this_fiber::is_fiberized() ); + + state_ = SET; + + detail::spin_mutex::scoped_lock lk( waiting_mtx_); if ( ! waiting_.empty() ) { detail::fiber_base::ptr_t f; @@ -82,11 +84,8 @@ auto_reset_event::set() { f.swap( waiting_.front() ); waiting_.pop_front(); - } while ( f->is_terminated() ); - if ( f) - detail::scheduler::instance().notify( f); + } while ( ! f->is_ready() ); } - state_ = SET; } }} diff --git a/src/barrier.cpp b/src/barrier.cpp index 2dd4da6a..0feed300 100644 --- a/src/barrier.cpp +++ b/src/barrier.cpp @@ -8,8 +8,9 @@ #include "boost/fiber/barrier.hpp" -#include +#include +#include #include #ifdef BOOST_HAS_ABI_HEADERS @@ -25,7 +26,13 @@ barrier::barrier( std::size_t initial) : cycle_( true), mtx_(), cond_() -{ if ( initial == 0) throw std::invalid_argument("invalid barrier count"); } +{ + if ( 0 == initial) + boost::throw_exception( + invalid_argument( + system::errc::invalid_argument, + "boost fiber: zero initial barrier count") ); +} bool barrier::wait() diff --git a/src/condition.cpp b/src/condition.cpp index 077856b1..d6c2dba3 100644 --- a/src/condition.cpp +++ b/src/condition.cpp @@ -23,8 +23,7 @@ condition::condition() : cmd_( SLEEPING), waiters_( 0), enter_mtx_( false), - check_mtx_(), - waiting_() + check_mtx_() {} condition::~condition() @@ -33,6 +32,8 @@ condition::~condition() void condition::notify_one() { + BOOST_ASSERT( this_fiber::is_fiberized() ); + enter_mtx_.lock(); if ( 0 == waiters_) @@ -40,43 +41,41 @@ condition::notify_one() enter_mtx_.unlock(); return; } - - if ( ! waiting_.empty() ) + + command expected = NOTIFY_ONE; + while ( SLEEPING != cmd_.compare_exchange_strong( expected, SLEEPING) ) { - detail::fiber_base::ptr_t f; - do - { - f.swap( waiting_.front() ); - waiting_.pop_front(); - } while ( f->is_terminated() ); - if ( f) - detail::scheduler::instance().notify( f); + this_fiber::yield(); + expected = NOTIFY_ONE; } - cmd_ = NOTIFY_ONE; } void condition::notify_all() { + BOOST_ASSERT( this_fiber::is_fiberized() ); + + //This mutex guarantees that no other thread can enter to the + //do_timed_wait method logic, so that thread count will be + //constant until the function writes a NOTIFY_ALL command. + //It also guarantees that no other notification can be signaled + //on this spin_condition before this one ends enter_mtx_.lock(); + //Return if there are no waiters if ( 0 == waiters_) { enter_mtx_.unlock(); return; } - - if ( ! waiting_.empty() ) - { - BOOST_FOREACH( detail::fiber_base::ptr_t const& f, waiting_) - { - if ( ! f->is_terminated() ) - detail::scheduler::instance().notify( f); - } - waiting_.clear(); - } - cmd_ = NOTIFY_ALL; + //Notify that all threads should execute wait logic + command expected = NOTIFY_ALL; + while ( SLEEPING != cmd_.compare_exchange_strong( expected, SLEEPING) ) + { + this_fiber::yield(); + expected = NOTIFY_ALL; + } } }} diff --git a/src/count_down_event.cpp b/src/count_down_event.cpp index 9344a77e..42d2e6da 100644 --- a/src/count_down_event.cpp +++ b/src/count_down_event.cpp @@ -22,8 +22,9 @@ namespace boost { namespace fibers { count_down_event::count_down_event( std::size_t initial) : - initial_( initial), - current_( initial_), + initial_( initial), + current_( initial_), + waiting_mtx_(), waiting_() {} @@ -42,14 +43,12 @@ count_down_event::is_set() const void count_down_event::set() { - if ( 0 == current_) return; - if ( 0 == --current_) + if ( 0 == current_) return; //FIXME: set to initial_ instead? + if ( 0 == --current_) { + detail::spin_mutex::scoped_lock lk( waiting_mtx_); BOOST_FOREACH( detail::fiber_base::ptr_t const& f, waiting_) - { - if ( ! f->is_terminated() ) - detail::scheduler::instance().notify( f); - } + { f->set_ready(); } waiting_.clear(); } } @@ -57,34 +56,34 @@ count_down_event::set() void count_down_event::wait() { - while ( 0 != current_) - { - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( - detail::scheduler::instance().active() ); - detail::scheduler::instance().wait(); - } - else - detail::scheduler::instance().run(); - } + BOOST_ASSERT( this_fiber::is_fiberized() ); + + while ( 0 != current_) + { + detail::spin_mutex::scoped_lock lk( waiting_mtx_); + waiting_.push_back( + detail::scheduler::instance().active() ); + detail::scheduler::instance().wait( lk); + } } bool count_down_event::timed_wait( chrono::system_clock::time_point const& abs_time) { - while ( 0 != current_) - { - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( - detail::scheduler::instance().active() ); - detail::scheduler::instance().sleep( abs_time); - } - else - detail::scheduler::instance().run(); - } - return chrono::system_clock::now() <= abs_time; + BOOST_ASSERT( this_fiber::is_fiberized() ); + + if ( chrono::system_clock::now() >= abs_time) return false; + + while ( 0 != current_) + { + detail::spin_mutex::scoped_lock lk( waiting_mtx_); + waiting_.push_back( + detail::scheduler::instance().active() ); + detail::scheduler::instance().wait( lk); + + if ( chrono::system_clock::now() >= abs_time) return false; + } + return true; } }} diff --git a/src/detail/fiber_base.cpp b/src/detail/fiber_base.cpp index 53488fc0..29311e1b 100644 --- a/src/detail/fiber_base.cpp +++ b/src/detail/fiber_base.cpp @@ -38,7 +38,6 @@ fiber_base::resume() BOOST_ASSERT( ! is_terminated() ); BOOST_ASSERT( ! is_running() ); - set_running(); context::jump_fcontext( & caller_, callee_, 0, preserve_fpu() ); BOOST_ASSERT( ! is_running() ); @@ -49,7 +48,6 @@ fiber_base::suspend() { BOOST_ASSERT( is_running() ); - set_waiting(); context::jump_fcontext( callee_, & caller_, 0, preserve_fpu() ); if ( unwind_requested() ) @@ -61,7 +59,6 @@ fiber_base::yield() { BOOST_ASSERT( is_running() ); - set_ready(); context::jump_fcontext( callee_, & caller_, 0, preserve_fpu() ); if ( unwind_requested() ) diff --git a/src/detail/spin_mutex.cpp b/src/detail/spin_mutex.cpp index 3cbbe058..b14ea2c5 100644 --- a/src/detail/spin_mutex.cpp +++ b/src/detail/spin_mutex.cpp @@ -59,6 +59,6 @@ spin_mutex::try_lock() void spin_mutex::unlock() -{ state_.store( UNLOCKED); } +{ state_ = UNLOCKED; } }}} diff --git a/src/manual_reset_event.cpp b/src/manual_reset_event.cpp index 3568c13f..1c1ee3d2 100644 --- a/src/manual_reset_event.cpp +++ b/src/manual_reset_event.cpp @@ -25,28 +25,27 @@ manual_reset_event::manual_reset_event( bool isset) : state_( isset ? SET : RESET), waiters_( 0), enter_mtx_( false), + waiting_mtx_(), waiting_() {} void manual_reset_event::wait() { + BOOST_ASSERT( this_fiber::is_fiberized() ); + { mutex::scoped_lock lk( enter_mtx_); BOOST_ASSERT( lk); ++waiters_; } - while ( RESET == state_) + while ( RESET == state_.exchange( RESET, memory_order_acquire) ) { - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( + detail::spin_mutex::scoped_lock lk( waiting_mtx_); + waiting_.push_back( detail::scheduler::instance().active() ); - detail::scheduler::instance().wait(); - } - else - detail::scheduler::instance().run(); + detail::scheduler::instance().wait( lk); } if ( 0 == --waiters_) @@ -56,39 +55,44 @@ manual_reset_event::wait() bool manual_reset_event::timed_wait( chrono::system_clock::time_point const& abs_time) { + BOOST_ASSERT( this_fiber::is_fiberized() ); + + if ( chrono::system_clock::now() >= abs_time) return false; + { mutex::scoped_lock lk( enter_mtx_); BOOST_ASSERT( lk); ++waiters_; } - while ( RESET == state_) + while ( RESET == state_.exchange( RESET, memory_order_acquire) ) { - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( + detail::spin_mutex::scoped_lock lk( waiting_mtx_); + waiting_.push_back( detail::scheduler::instance().active() ); - detail::scheduler::instance().sleep( abs_time); - } - else - detail::scheduler::instance().run(); + detail::scheduler::instance().wait( lk); + + if ( chrono::system_clock::now() >= abs_time) return false; } if ( 0 == --waiters_) enter_mtx_.unlock(); - return chrono::system_clock::now() <= abs_time; + + return true; } bool manual_reset_event::try_wait() { + BOOST_ASSERT( this_fiber::is_fiberized() ); + { mutex::scoped_lock lk( enter_mtx_); BOOST_ASSERT( lk); ++waiters_; } - bool result = SET == state_; + bool result = SET == state_.exchange( RESET, memory_order_acquire); if ( 0 == --waiters_) enter_mtx_.unlock(); @@ -99,24 +103,27 @@ manual_reset_event::try_wait() void manual_reset_event::set() { + BOOST_ASSERT( this_fiber::is_fiberized() ); + mutex::scoped_lock lk( enter_mtx_); BOOST_ASSERT( lk); - if ( RESET == state_) + if ( RESET == state_.exchange( SET, memory_order_acquire) ) { - state_ = SET; + detail::spin_mutex::scoped_lock lk( waiting_mtx_); BOOST_FOREACH ( detail::fiber_base::ptr_t const& f, waiting_) - { - if ( ! f->is_terminated() ) - detail::scheduler::instance().notify( f); - } + { f->set_ready(); } waiting_.clear(); } } void manual_reset_event::reset() -{ state_ = RESET; } +{ + BOOST_ASSERT( this_fiber::is_fiberized() ); + + state_ = RESET; +} }} diff --git a/src/mutex.cpp b/src/mutex.cpp index dd3730ac..e7b0f517 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -23,6 +23,7 @@ namespace fibers { mutex::mutex( bool checked) : state_( UNLOCKED), owner_(), + mtx_(), waiting_(), checked_( checked) {} @@ -30,51 +31,41 @@ mutex::mutex( bool checked) : void mutex::lock() { + BOOST_ASSERT( this_fiber::is_fiberized() ); + while ( LOCKED == state_.exchange( LOCKED, memory_order_acquire) ) { - if ( this_fiber::is_fiberized() ) - { - waiting_.push_back( - detail::scheduler::instance().active() ); - detail::scheduler::instance().wait(); - } - else - detail::scheduler::instance().run(); + detail::spin_mutex::scoped_lock lk( mtx_); + waiting_.push_back( + detail::scheduler::instance().active() ); + detail::scheduler::instance().wait( lk); } - if ( this_fiber::is_fiberized() ) - owner_ = detail::scheduler::instance().active()->get_id(); - else - owner_ = detail::fiber_base::id(); + owner_ = detail::scheduler::instance().active()->get_id(); } bool mutex::try_lock() { - if ( LOCKED == state_.exchange( LOCKED, memory_order_acquire) ) return false; - if ( this_fiber::is_fiberized() ) - owner_ = detail::scheduler::instance().active()->get_id(); - else - owner_ = detail::fiber_base::id(); - return true; + BOOST_ASSERT( this_fiber::is_fiberized() ); + + return UNLOCKED == state_.exchange( LOCKED, memory_order_acquire); } void mutex::unlock() { + BOOST_ASSERT( this_fiber::is_fiberized() ); + if ( checked_) { - if ( this_fiber::is_fiberized() ) - { - if ( detail::scheduler::instance().active()->get_id() != owner_) - std::abort(); - } - else if ( detail::fiber_base::id() != owner_) - std::abort(); + if ( detail::scheduler::instance().active()->get_id() != owner_) + std::abort(); } - owner_ = detail::fiber_base::id(); - state_.store( UNLOCKED); + state_ = UNLOCKED; + + detail::spin_mutex::scoped_lock lk( mtx_); if ( ! waiting_.empty() ) { detail::fiber_base::ptr_t f; @@ -82,9 +73,7 @@ mutex::unlock() { f.swap( waiting_.front() ); waiting_.pop_front(); - } while ( f->is_terminated() ); - if ( f) - detail::scheduler::instance().notify( f); + } while ( ! f->set_ready() ); } } diff --git a/src/round_robin.cpp b/src/round_robin.cpp index e654ed61..24f95006 100644 --- a/src/round_robin.cpp +++ b/src/round_robin.cpp @@ -24,9 +24,8 @@ #define RESUME_FIBER( f_) \ BOOST_ASSERT( f_); \ BOOST_ASSERT( ! f_->is_terminated() ); \ - BOOST_ASSERT( ! f_->is_running() ); \ + f_->set_running(); \ f_->resume(); \ - BOOST_ASSERT( ! f_->is_running() ); namespace boost { namespace fibers { @@ -76,7 +75,13 @@ round_robin::join( detail::fiber_base::ptr_t const& f) // so that active-fiber gets suspended f->join( active_fiber_); // suspend active-fiber until f terminates - wait(); + // fiber will be added to waiting-queue + f_idx_.insert( schedulable( active_fiber_) ); + // set active_fiber to state_waiting + active_fiber_->set_waiting(); + // suspend fiber + active_fiber_->suspend(); + // fiber is resumed // f has teminated and active-fiber is resumed } else @@ -91,45 +96,28 @@ round_robin::join( detail::fiber_base::ptr_t const& f) void round_robin::cancel( detail::fiber_base::ptr_t const& f) { - BOOST_ASSERT( f); - BOOST_ASSERT( f != active_fiber_); - - // ignore completed fiber - if ( f->is_terminated() ) return; - - detail::fiber_base::ptr_t tmp = active_fiber_; - { - BOOST_SCOPE_EXIT( & tmp, & active_fiber_) { - active_fiber_ = tmp; - } BOOST_SCOPE_EXIT_END - active_fiber_ = f; - // terminate fiber means unwinding its stack - // so it becomes complete and joining fibers - // will be notified - active_fiber_->terminate(); - } - // erase completed fiber from waiting-queue - f_idx_.erase( f); - - BOOST_ASSERT( f->is_terminated() ); -} - -void -round_robin::notify( detail::fiber_base::ptr_t const& f) -{ - BOOST_ASSERT( f); - BOOST_ASSERT( ! f->is_terminated() ); - BOOST_ASSERT( ! f->is_running() ); - BOOST_ASSERT( f != active_fiber_); - - // remove fiber from wait-queue - f_idx_.erase( f); - // push fiber at the front of the runnable-queue - rqueue_.push_front( f); - - BOOST_ASSERT( ! f->is_terminated() ); - BOOST_ASSERT( ! f->is_running() ); - BOOST_ASSERT( f != active_fiber_); + BOOST_ASSERT_MSG( false, "not implemented"); +// BOOST_ASSERT( f); +// BOOST_ASSERT( f != active_fiber_); +// +// // ignore completed fiber +// if ( f->is_terminated() ) return; +// +// detail::fiber_base::ptr_t tmp = active_fiber_; +// { +// BOOST_SCOPE_EXIT( & tmp, & active_fiber_) { +// active_fiber_ = tmp; +// } BOOST_SCOPE_EXIT_END +// active_fiber_ = f; +// // terminate fiber means unwinding its stack +// // so it becomes complete and joining fibers +// // will be notified +// active_fiber_->terminate(); +// } +// // erase completed fiber from waiting-queue +// f_idx_.erase( f); +// +// BOOST_ASSERT( f->is_terminated() ); } bool @@ -167,13 +155,17 @@ round_robin::run() } void -round_robin::wait() +round_robin::wait( detail::spin_mutex::scoped_lock & lk) { BOOST_ASSERT( active_fiber_); BOOST_ASSERT( active_fiber_->is_running() ); // fiber will be added to waiting-queue f_idx_.insert( schedulable( active_fiber_) ); + // set active_fiber to state_waiting + active_fiber_->set_waiting(); + // unlock Lock assoc. with sync. primitive + lk.unlock(); // suspend fiber active_fiber_->suspend(); // fiber is resumed @@ -190,6 +182,9 @@ round_robin::yield() // yield() suspends the fiber and adds it // immediately to runnable-queue rqueue_.push_back( active_fiber_); + // set active_fiber to state_ready + active_fiber_->set_ready(); + // suspend fiber active_fiber_->yield(); // fiber is resumed @@ -207,6 +202,9 @@ round_robin::sleep( chrono::system_clock::time_point const& abs_time) // fiber is added with a dead-line and gets suspended // each call of run() will check if dead-line has reached wqueue_.insert( schedulable( active_fiber_, abs_time) ); + // set active_fiber to state_waiting + active_fiber_->set_waiting(); + // suspend fiber active_fiber_->suspend(); // fiber is resumed, dead-line has been reached }