diff --git a/build/Jamfile.v2 b/build/Jamfile.v2 index 1140e359..04dae2a8 100644 --- a/build/Jamfile.v2 +++ b/build/Jamfile.v2 @@ -27,6 +27,7 @@ lib boost_fibers count_down_event.cpp detail/fiber_base.cpp detail/scheduler.cpp + detail/spin_mutex.cpp fiber.cpp manual_reset_event.cpp mutex.cpp diff --git a/include/boost/fiber/attributes.hpp b/include/boost/fiber/attributes.hpp index df651cac..50c200c7 100644 --- a/include/boost/fiber/attributes.hpp +++ b/include/boost/fiber/attributes.hpp @@ -24,54 +24,27 @@ namespace fibers { struct attributes { std::size_t size; - flag_unwind_t do_unwind; flag_fpu_t preserve_fpu; attributes() BOOST_NOEXCEPT : size( stack_allocator::default_stacksize() ), - do_unwind( stack_unwind), preserve_fpu( fpu_preserved) {} explicit attributes( std::size_t size_) BOOST_NOEXCEPT : size( size_), - do_unwind( stack_unwind), - preserve_fpu( fpu_preserved) - {} - - explicit attributes( flag_unwind_t do_unwind_) BOOST_NOEXCEPT : - size( stack_allocator::default_stacksize() ), - do_unwind( do_unwind_), preserve_fpu( fpu_preserved) {} explicit attributes( flag_fpu_t preserve_fpu_) BOOST_NOEXCEPT : size( stack_allocator::default_stacksize() ), - do_unwind( stack_unwind), preserve_fpu( preserve_fpu_) {} - explicit attributes( - std::size_t size_, - flag_unwind_t do_unwind_) BOOST_NOEXCEPT : - size( size_), - do_unwind( do_unwind_), - preserve_fpu( fpu_preserved) - {} - explicit attributes( std::size_t size_, flag_fpu_t preserve_fpu_) BOOST_NOEXCEPT : size( size_), - do_unwind( stack_unwind), - preserve_fpu( preserve_fpu_) - {} - - explicit attributes( - flag_unwind_t do_unwind_, - flag_fpu_t preserve_fpu_) BOOST_NOEXCEPT : - size( stack_allocator::default_stacksize() ), - do_unwind( do_unwind_), preserve_fpu( preserve_fpu_) {} }; diff --git a/include/boost/fiber/detail/fiber_base.hpp b/include/boost/fiber/detail/fiber_base.hpp index 017af8bf..55333b25 100644 --- a/include/boost/fiber/detail/fiber_base.hpp +++ b/include/boost/fiber/detail/fiber_base.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -22,6 +23,8 @@ #include #include +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -40,16 +43,16 @@ private: template< typename X, typename Y, typename Z > friend class fiber_object; - std::size_t use_count_; + atomic< std::size_t > use_count_; + atomic< state_t > state_; + atomic< int > priority_; context::fcontext_t caller_; context::fcontext_t * callee_; - int priority_; int flags_; exception_ptr except_; + spin_mutex mtx_; std::vector< ptr_t > joining_; - void notify_(); - protected: virtual void deallocate_object() = 0; virtual void unwind_stack() = 0; @@ -106,7 +109,7 @@ public: { return 0 == impl_; } }; - fiber_base( context::fcontext_t *, bool, bool); + fiber_base( context::fcontext_t *, bool); virtual ~fiber_base() {} @@ -114,15 +117,17 @@ public: { return id( ptr_t( const_cast< fiber_base * >( this) ) ); } int priority() const BOOST_NOEXCEPT - { return priority_; } + { return priority_.load(); } void priority( int prio) BOOST_NOEXCEPT - { priority_ = prio; } + { priority_.store( prio); } void resume(); void suspend(); + void yield(); + void terminate(); void join( ptr_t const&); @@ -136,20 +141,56 @@ public: bool preserve_fpu() const BOOST_NOEXCEPT { return 0 != ( flags_ & flag_preserve_fpu); } - bool is_complete() const BOOST_NOEXCEPT - { return 0 != ( flags_ & flag_complete); } + bool is_terminated() const BOOST_NOEXCEPT + { return state_terminated == state_.load(); } - bool is_canceled() const BOOST_NOEXCEPT - { return 0 != ( flags_ & flag_canceled); } + bool is_ready() const BOOST_NOEXCEPT + { return state_ready == state_.load(); } - bool is_resumed() const BOOST_NOEXCEPT - { return 0 != ( flags_ & flag_resumed); } + bool is_running() const BOOST_NOEXCEPT + { return state_running == state_.load(); } + + bool is_waiting() const BOOST_NOEXCEPT + { return state_waiting == state_.load(); } + + bool set_terminated() BOOST_NOEXCEPT + { + state_t expected = state_running; + return state_.compare_exchange_weak( expected, state_terminated, memory_order_release); + } + + bool set_ready() BOOST_NOEXCEPT + { + state_t expected = state_waiting; + bool result = state_.compare_exchange_weak( expected, state_ready, memory_order_release); + if ( ! result && state_running == expected) + result = state_.compare_exchange_weak( expected, state_ready, memory_order_release); + return result; + } + + bool set_running() BOOST_NOEXCEPT + { + state_t expected = state_ready; + return state_.compare_exchange_weak( expected, state_running, memory_order_release); + } + + bool set_waiting() BOOST_NOEXCEPT + { + state_t expected = state_running; + return state_.compare_exchange_weak( expected, state_waiting, memory_order_release); + } friend inline void intrusive_ptr_add_ref( fiber_base * p) BOOST_NOEXCEPT - { ++p->use_count_; } + { p->use_count_.fetch_add( 1, memory_order_relaxed); } friend inline void intrusive_ptr_release( fiber_base * p) - { if ( --p->use_count_ == 0) p->deallocate_object(); } + { + if ( 1 == p->use_count_.fetch_sub( 1, memory_order_release) ) + { + atomic_thread_fence( memory_order_acquire); + p->deallocate_object(); + } + } }; }}} diff --git a/include/boost/fiber/detail/fiber_object.hpp b/include/boost/fiber/detail/fiber_object.hpp index f19f13cd..e5a384f7 100644 --- a/include/boost/fiber/detail/fiber_object.hpp +++ b/include/boost/fiber/detail/fiber_object.hpp @@ -67,27 +67,27 @@ private: void enter_() { - flags_ |= flag_resumed; + set_running(); context::jump_fcontext( & caller_, callee_, reinterpret_cast< intptr_t >( this), preserve_fpu() ); - if ( except_) rethrow_exception( except_); + BOOST_ASSERT( ! except_); } protected: void unwind_stack() BOOST_NOEXCEPT { - BOOST_ASSERT( ! is_complete() ); + BOOST_ASSERT( ! is_terminated() ); flags_ |= flag_unwind_stack; - flags_ |= flag_resumed; + set_running(); context::jump_fcontext( & caller_, callee_, 0, preserve_fpu() ); flags_ &= ~flag_unwind_stack; - BOOST_ASSERT( is_complete() ); + BOOST_ASSERT( is_terminated() ); } public: @@ -99,7 +99,6 @@ public: context::make_fcontext( stack_alloc.allocate( attr.size), attr.size, trampoline< fiber_object >), - stack_unwind == attr.do_unwind, fpu_preserved == attr.preserve_fpu), fn_( forward< Fn >( fn) ), stack_( fiber_base::callee_->fc_stack), @@ -114,7 +113,6 @@ public: context::make_fcontext( stack_alloc.allocate( attr.size), attr.size, trampoline< fiber_object >), - stack_unwind == attr.do_unwind, fpu_preserved == attr.preserve_fpu), fn_( fn), stack_( fiber_base::callee_->fc_stack), @@ -129,7 +127,6 @@ public: context::make_fcontext( stack_alloc.allocate( attr.size), attr.size, trampoline< fiber_object >), - stack_unwind == attr.do_unwind, fpu_preserved == attr.preserve_fpu), fn_( fn), stack_( fiber_base::callee_->fc_stack), @@ -143,20 +140,19 @@ public: void exec() { - BOOST_ASSERT( ! is_complete() ); - - suspend(); + BOOST_ASSERT( ! is_terminated() ); try - { fn_(); } + { + yield(); + fn_(); + } catch ( forced_unwind const&) {} catch (...) { except_ = current_exception(); } - flags_ &= ~flag_resumed; - flags_ |= flag_complete; - + set_terminated(); context::jump_fcontext( callee_, & caller_, 0, preserve_fpu() ); BOOST_ASSERT_MSG( false, "fiber is complete"); } @@ -192,27 +188,27 @@ private: void enter_() { - flags_ |= flag_resumed; + set_running(); context::jump_fcontext( & caller_, callee_, reinterpret_cast< intptr_t >( this), preserve_fpu() ); - if ( except_) rethrow_exception( except_); + BOOST_ASSERT( ! except_); } protected: void unwind_stack() BOOST_NOEXCEPT { - BOOST_ASSERT( ! is_complete() ); + BOOST_ASSERT( ! is_terminated() ); flags_ |= flag_unwind_stack; - flags_ |= flag_resumed; + set_running(); context::jump_fcontext( & caller_, callee_, 0, preserve_fpu() ); flags_ &= ~flag_unwind_stack; - BOOST_ASSERT( is_complete() ); + BOOST_ASSERT( is_terminated() ); } public: @@ -223,7 +219,6 @@ public: context::make_fcontext( stack_alloc.allocate( attr.size), attr.size, trampoline< fiber_object >), - stack_unwind == attr.do_unwind, fpu_preserved == attr.preserve_fpu), fn_( fn), stack_( fiber_base::callee_->fc_stack), @@ -236,20 +231,19 @@ public: void exec() { - BOOST_ASSERT( ! is_complete() ); - - suspend(); + BOOST_ASSERT( ! is_terminated() ); try - { fn_(); } + { + yield(); + fn_(); + } catch ( forced_unwind const&) {} catch (...) { except_ = current_exception(); } - flags_ &= ~flag_resumed; - flags_ |= flag_complete; - + set_terminated(); context::jump_fcontext( callee_, & caller_, 0, preserve_fpu() ); BOOST_ASSERT_MSG( false, "fiber is complete"); } @@ -285,27 +279,27 @@ private: void enter_() { - flags_ |= flag_resumed; + set_running(); context::jump_fcontext( & caller_, callee_, reinterpret_cast< intptr_t >( this), preserve_fpu() ); - if ( except_) rethrow_exception( except_); + BOOST_ASSERT( ! except_); } protected: void unwind_stack() BOOST_NOEXCEPT { - BOOST_ASSERT( ! is_complete() ); + BOOST_ASSERT( ! is_terminated() ); flags_ |= flag_unwind_stack; - flags_ |= flag_resumed; + set_running(); context::jump_fcontext( & caller_, callee_, 0, preserve_fpu() ); flags_ &= ~flag_unwind_stack; - BOOST_ASSERT( is_complete() ); + BOOST_ASSERT( is_terminated() ); } public: @@ -316,7 +310,6 @@ public: context::make_fcontext( stack_alloc.allocate( attr.size), attr.size, trampoline< fiber_object >), - stack_unwind == attr.do_unwind, fpu_preserved == attr.preserve_fpu), fn_( fn), stack_( fiber_base::callee_->fc_stack), @@ -329,20 +322,19 @@ public: void exec() { - BOOST_ASSERT( ! is_complete() ); - - suspend(); + BOOST_ASSERT( ! is_terminated() ); try - { fn_(); } + { + yield(); + fn_(); + } catch ( forced_unwind const&) {} catch (...) { except_ = current_exception(); } - flags_ &= ~flag_resumed; - flags_ |= flag_complete; - + set_terminated(); context::jump_fcontext( callee_, & caller_, 0, preserve_fpu() ); BOOST_ASSERT_MSG( false, "fiber is complete"); } diff --git a/include/boost/fiber/detail/flags.hpp b/include/boost/fiber/detail/flags.hpp index 0bd257aa..5e300f5f 100644 --- a/include/boost/fiber/detail/flags.hpp +++ b/include/boost/fiber/detail/flags.hpp @@ -23,10 +23,7 @@ enum flag_t { flag_force_unwind = 1 << 1, flag_unwind_stack = 1 << 2, - flag_preserve_fpu = 1 << 3, - flag_complete = 1 << 4, - flag_resumed = 1 << 5, - flag_canceled = 1 << 6 + flag_preserve_fpu = 1 << 3 }; }}} diff --git a/include/boost/fiber/detail/queue.hpp b/include/boost/fiber/detail/queue.hpp new file mode 100644 index 00000000..34b5058e --- /dev/null +++ b/include/boost/fiber/detail/queue.hpp @@ -0,0 +1,132 @@ + +// Copyright Oliver Kowalke 2009. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// node-base locking based on 'C++ Concurrency in Action', Anthony Williams + +#ifndef BOOST_FIBERS_DETAIL_QUEUE_H +#define BOOST_FIBERS_DETAIL_QUEUE_H + +#include + +#include +#include +#include + +#include +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace detail { + +struct node +{ + typedef intrusive_ptr< node > ptr_t; + + std::size_t use_count; + fiber_base::ptr_t f; + ptr next; + + node() : + use_count( 0), + f(), + next() + {} +}; + +inline +void intrusive_ptr_add_ref( node * p) +{ ++p->use_count; } + +inline +void intrusive_ptr_release( node * p) +{ if ( 0 == --p->use_count) delete p; } + +class queue : private noncopyable +{ +private: + node::ptr_t head_; + mutable spin_mutex head_mtx_; + node::ptr_t tail_; + mutable spin_mutex tail_mtx_; + + bool empty_() const + { return head_ == get_tail_(); } + + node_type::ptr get_tail_() const + { + spin_mutex::scoped_lock lk( tail_mtx_); + node::ptr_t tmp = tail_; + return tmp; + } + + node::ptr_t pop_head_() + { + node_type::ptr_t old_head = head_; + head_ = old_head->next; + return old_head; + } + +public: + queue() : + head_( new node_type() ), + head_mtx_(), + tail_( head_), + tail_mtx_() + {} + + bool empty() const + { + spin_mutex::scoped_lock lk( head_mtx_); + return empty_(); + } + + void push( fiber_base::ptr_t const& f) + { + node::ptr_t new_node( new node_type() ); + { + spin_mutex::scoped_lock lk( tail_mtx_); + tail_->f = f; + tail_->next = new_node; + tail_ = new_node; + } + } + + void notify_one() + { + spin_mutex::scoped_lock lk( head_mtx_); + while ( ! empty_() ) + { + BOOST_ASSERT( head_->f); + bool result = head_->f->set_ready(); + pop_head_(); + if ( result) break; + } + } + + void notify_all() + { + spin_mutex::scoped_lock lk( head_mtx_); + while ( ! empty_() ) + { + BOOST_ASSERT( head_->f); + head_->f->set_ready(); + pop_head_(); + } + } +}; + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_DETAIL_QUEUE_H diff --git a/include/boost/fiber/detail/spin_mutex.hpp b/include/boost/fiber/detail/spin_mutex.hpp new file mode 100644 index 00000000..48ae8d99 --- /dev/null +++ b/include/boost/fiber/detail/spin_mutex.hpp @@ -0,0 +1,54 @@ + +// Copyright Oliver Kowalke 2009. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +// based on boost::interprocess::sync::interprocess_spin::mutex + +#ifndef BOOST_FIBERS_SPIN_MUTEX_H +#define BOOST_FIBERS_SPIN_MUTEX_H + +#include +#include +#include +#include + +#include + +namespace boost { +namespace fibers { +namespace detail { + +class BOOST_FIBERS_DECL spin_mutex : private noncopyable +{ +private: + enum state + { + LOCKED = 0, + UNLOCKED + }; + + atomic< state > state_; + +public: + typedef unique_lock< spin_mutex > scoped_lock; + + spin_mutex(); + + void lock(); + + bool try_lock(); + + bool timed_lock( chrono::system_clock::time_point const& abs_time); + + template< typename TimeDuration > + bool timed_lock( TimeDuration const& rel_time) + { return timed_lock( chrono::system_clock::now() + rel_time); } + + void unlock(); +}; + +}}} + +#endif // BOOST_FIBERS_SPIN_MUTEX_H diff --git a/include/boost/fiber/detail/states.hpp b/include/boost/fiber/detail/states.hpp new file mode 100644 index 00000000..285298c4 --- /dev/null +++ b/include/boost/fiber/detail/states.hpp @@ -0,0 +1,36 @@ + +// Copyright Oliver Kowalke 2009. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_DETAIL_STATES_H +#define BOOST_FIBERS_DETAIL_STATES_H + +#include + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace detail { + +enum state_t +{ + state_terminated = 0, + state_ready = 1 << 1, + state_running = 1 << 2, + state_waiting = 1 << 3 +}; + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_DETAILSTATEGS_H diff --git a/include/boost/fiber/fiber.hpp b/include/boost/fiber/fiber.hpp index 08af1ca5..46ebb567 100644 --- a/include/boost/fiber/fiber.hpp +++ b/include/boost/fiber/fiber.hpp @@ -39,8 +39,6 @@ namespace fibers { class BOOST_FIBERS_DECL fiber { private: - friend void migrate( fiber &); - struct dummy { void nonnull() {} }; @@ -305,16 +303,17 @@ public: fiber & operator=( BOOST_RV_REF( fiber) other) BOOST_NOEXCEPT { + if ( joinable() ) std::terminate(); fiber tmp( move( other) ); swap( tmp); return * this; } operator safe_bool() const BOOST_NOEXCEPT - { return ( ! empty() && ! impl_->is_complete() ) ? & dummy::nonnull : 0; } + { return ( ! empty() && ! impl_->is_terminated() ) ? & dummy::nonnull : 0; } bool operator!() const BOOST_NOEXCEPT - { return empty() || impl_->is_complete(); } + { return empty() || impl_->is_terminated(); } void swap( fiber & other) BOOST_NOEXCEPT { impl_.swap( other.impl_); } diff --git a/include/boost/fiber/mutex.hpp b/include/boost/fiber/mutex.hpp index ada8ef16..e8878a13 100644 --- a/include/boost/fiber/mutex.hpp +++ b/include/boost/fiber/mutex.hpp @@ -11,6 +11,7 @@ #include +#include #include #include #include @@ -33,13 +34,13 @@ namespace fibers { class BOOST_FIBERS_DECL mutex : private noncopyable { private: - enum state - { - LOCKED = 0, - UNLOCKED - }; + enum state + { + LOCKED = 0, + UNLOCKED + }; - state state_; + atomic< state > state_; detail::fiber_base::id owner_; std::deque< detail::fiber_base::ptr_t @@ -47,15 +48,15 @@ private: bool checked_; public: - typedef unique_lock< mutex > scoped_lock; + typedef unique_lock< mutex > scoped_lock; - mutex( bool = true); + mutex( bool = true); - void lock(); + void lock(); - bool try_lock(); + bool try_lock(); - void unlock(); + void unlock(); }; typedef mutex try_mutex; diff --git a/src/auto_reset_event.cpp b/src/auto_reset_event.cpp index beb540c8..ba5495ec 100644 --- a/src/auto_reset_event.cpp +++ b/src/auto_reset_event.cpp @@ -82,7 +82,7 @@ auto_reset_event::set() { f.swap( waiting_.front() ); waiting_.pop_front(); - } while ( f->is_complete() ); + } while ( f->is_terminated() ); if ( f) detail::scheduler::instance().notify( f); } diff --git a/src/condition.cpp b/src/condition.cpp index 35d74204..077856b1 100644 --- a/src/condition.cpp +++ b/src/condition.cpp @@ -48,7 +48,7 @@ condition::notify_one() { f.swap( waiting_.front() ); waiting_.pop_front(); - } while ( f->is_complete() ); + } while ( f->is_terminated() ); if ( f) detail::scheduler::instance().notify( f); } @@ -70,7 +70,7 @@ condition::notify_all() { BOOST_FOREACH( detail::fiber_base::ptr_t const& f, waiting_) { - if ( ! f->is_complete() ) + if ( ! f->is_terminated() ) detail::scheduler::instance().notify( f); } waiting_.clear(); diff --git a/src/count_down_event.cpp b/src/count_down_event.cpp index 5051b312..9344a77e 100644 --- a/src/count_down_event.cpp +++ b/src/count_down_event.cpp @@ -47,7 +47,7 @@ count_down_event::set() { BOOST_FOREACH( detail::fiber_base::ptr_t const& f, waiting_) { - if ( ! f->is_complete() ) + if ( ! f->is_terminated() ) detail::scheduler::instance().notify( f); } waiting_.clear(); diff --git a/src/detail/fiber_base.cpp b/src/detail/fiber_base.cpp index 5c8517da..53488fc0 100644 --- a/src/detail/fiber_base.cpp +++ b/src/detail/fiber_base.cpp @@ -20,84 +20,76 @@ namespace boost { namespace fibers { namespace detail { -void -fiber_base::notify_() -{ - BOOST_ASSERT( is_complete() ); - BOOST_ASSERT( ! is_resumed() ); - - BOOST_FOREACH( fiber_base::ptr_t & p, joining_) - { boost::fibers::detail::scheduler::instance().notify( p); } - joining_.clear(); -} - -fiber_base::fiber_base( context::fcontext_t * callee, bool unwind, bool preserve_fpu) : +fiber_base::fiber_base( context::fcontext_t * callee, bool preserve_fpu) : use_count_( 0), + state_( state_ready), + priority_( 0), caller_(), callee_( callee), - priority_( 0), flags_( 0), except_(), + mtx_(), joining_() -{ - if ( unwind) flags_ |= flag_force_unwind; - if ( preserve_fpu) flags_ |= flag_preserve_fpu; -} +{ if ( preserve_fpu) flags_ |= flag_preserve_fpu; } void fiber_base::resume() { - BOOST_ASSERT( ! is_complete() ); - BOOST_ASSERT( ! is_resumed() ); + BOOST_ASSERT( ! is_terminated() ); + BOOST_ASSERT( ! is_running() ); - flags_ |= flag_resumed; + set_running(); context::jump_fcontext( & caller_, callee_, 0, preserve_fpu() ); - if ( is_complete() ) notify_(); - - BOOST_ASSERT( ! is_resumed() ); + BOOST_ASSERT( ! is_running() ); } void fiber_base::suspend() { - BOOST_ASSERT( ! is_complete() ); - BOOST_ASSERT( is_resumed() ); + BOOST_ASSERT( is_running() ); - flags_ &= ~flag_resumed; + set_waiting(); context::jump_fcontext( callee_, & caller_, 0, preserve_fpu() ); - BOOST_ASSERT( is_resumed() ); + if ( unwind_requested() ) + throw forced_unwind(); +} - if ( 0 != ( flags_ & flag_unwind_stack) ) +void +fiber_base::yield() +{ + BOOST_ASSERT( is_running() ); + + set_ready(); + context::jump_fcontext( callee_, & caller_, 0, preserve_fpu() ); + + if ( unwind_requested() ) throw forced_unwind(); } void fiber_base::terminate() { - BOOST_ASSERT( ! is_resumed() ); + BOOST_ASSERT( is_terminated() ); - if ( ! is_complete() ) - { - flags_ |= flag_canceled; - if ( ! is_complete() && force_unwind() ) - unwind_stack(); - } + unwind_stack(); - notify_(); - - BOOST_ASSERT( is_complete() ); - BOOST_ASSERT( ! is_resumed() ); - BOOST_ASSERT( joining_.empty() ); + // fiber_base::terminate() is called by ~fiber_object() + // therefore protecting by mtx_ is not required + // and joining_ is not required to be cleared + BOOST_FOREACH( fiber_base::ptr_t & p, joining_) + { p->set_ready(); } } void fiber_base::join( ptr_t const& p) { - BOOST_ASSERT( ! p->is_complete() ); - BOOST_ASSERT( p->is_resumed() ); + BOOST_ASSERT( p->is_running() ); + // protect against concurrent access to joining_ + spin_mutex::scoped_lock lk( mtx_); + if ( is_terminated() ) return; joining_.push_back( p); } diff --git a/src/detail/spin_mutex.cpp b/src/detail/spin_mutex.cpp new file mode 100644 index 00000000..3cbbe058 --- /dev/null +++ b/src/detail/spin_mutex.cpp @@ -0,0 +1,64 @@ + +// Copyright Oliver Kowalke 2009. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#define BOOST_FIBERS_SOURCE + +#include + +#include +#include + +namespace boost { +namespace fibers { +namespace detail { + +spin_mutex::spin_mutex() : + state_( UNLOCKED) +{} + +void +spin_mutex::lock() +{ + while ( LOCKED == state_.exchange( LOCKED, memory_order_acquire) ) + { + // busy-wait + BOOST_ASSERT( this_fiber::is_fiberized() ); + this_fiber::yield(); + } +} + +bool +spin_mutex::timed_lock( chrono::system_clock::time_point const& abs_time) +{ + if ( chrono::system_clock::now() >= abs_time) + return false; + + for (;;) + { + if ( try_lock() ) break; + + if ( chrono::system_clock::now() >= abs_time) + return false; + + //this_fiber::interruption_point(); + //FIXME: what to do if not a fiber + BOOST_ASSERT( this_fiber::is_fiberized() ); + this_fiber::yield(); + //this_fiber::interruption_point(); + } + + return true; +} + +bool +spin_mutex::try_lock() +{ return UNLOCKED == state_.exchange( LOCKED, memory_order_acquire); } + +void +spin_mutex::unlock() +{ state_.store( UNLOCKED); } + +}}} diff --git a/src/fiber.cpp b/src/fiber.cpp index 99d48254..885acbd9 100644 --- a/src/fiber.cpp +++ b/src/fiber.cpp @@ -40,7 +40,7 @@ fiber::priority( int prio) { BOOST_ASSERT( impl_); - impl_->priority( prio); + detail::scheduler::instance().priority( impl_, prio); } void diff --git a/src/manual_reset_event.cpp b/src/manual_reset_event.cpp index 983bd8a2..3568c13f 100644 --- a/src/manual_reset_event.cpp +++ b/src/manual_reset_event.cpp @@ -107,7 +107,7 @@ manual_reset_event::set() state_ = SET; BOOST_FOREACH ( detail::fiber_base::ptr_t const& f, waiting_) { - if ( ! f->is_complete() ) + if ( ! f->is_terminated() ) detail::scheduler::instance().notify( f); } waiting_.clear(); diff --git a/src/mutex.cpp b/src/mutex.cpp index 8ebda073..dd3730ac 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -30,7 +30,7 @@ mutex::mutex( bool checked) : void mutex::lock() { - while ( UNLOCKED != state_) + while ( LOCKED == state_.exchange( LOCKED, memory_order_acquire) ) { if ( this_fiber::is_fiberized() ) { @@ -41,7 +41,6 @@ mutex::lock() else detail::scheduler::instance().run(); } - state_ = LOCKED; if ( this_fiber::is_fiberized() ) owner_ = detail::scheduler::instance().active()->get_id(); else @@ -51,8 +50,7 @@ mutex::lock() bool mutex::try_lock() { - if ( LOCKED == state_) return false; - state_ = LOCKED; + if ( LOCKED == state_.exchange( LOCKED, memory_order_acquire) ) return false; if ( this_fiber::is_fiberized() ) owner_ = detail::scheduler::instance().active()->get_id(); else @@ -74,6 +72,9 @@ mutex::unlock() std::abort(); } + owner_ = detail::fiber_base::id(); + state_.store( UNLOCKED); + if ( ! waiting_.empty() ) { detail::fiber_base::ptr_t f; @@ -81,12 +82,10 @@ mutex::unlock() { f.swap( waiting_.front() ); waiting_.pop_front(); - } while ( f->is_complete() ); + } while ( f->is_terminated() ); if ( f) detail::scheduler::instance().notify( f); } - state_ = UNLOCKED; - owner_ = detail::fiber_base::id(); } }} diff --git a/src/round_robin.cpp b/src/round_robin.cpp index 3c3037cc..e654ed61 100644 --- a/src/round_robin.cpp +++ b/src/round_robin.cpp @@ -23,10 +23,10 @@ #define RESUME_FIBER( f_) \ BOOST_ASSERT( f_); \ - BOOST_ASSERT( ! f_->is_complete() ); \ - BOOST_ASSERT( ! f_->is_resumed() ); \ + BOOST_ASSERT( ! f_->is_terminated() ); \ + BOOST_ASSERT( ! f_->is_running() ); \ f_->resume(); \ - BOOST_ASSERT( ! f_->is_resumed() ); + BOOST_ASSERT( ! f_->is_running() ); namespace boost { namespace fibers { @@ -43,7 +43,7 @@ void round_robin::spawn( detail::fiber_base::ptr_t const& f) { BOOST_ASSERT( f); - BOOST_ASSERT( ! f->is_complete() ); + BOOST_ASSERT( ! f->is_terminated() ); BOOST_ASSERT( f != active_fiber_); detail::fiber_base::ptr_t tmp = active_fiber_; @@ -75,17 +75,17 @@ round_robin::join( detail::fiber_base::ptr_t const& f) // detail::fiber_base::join() calls round_robin::wait() // so that active-fiber gets suspended f->join( active_fiber_); - // suspend active-fiber until f becomes complete + // suspend active-fiber until f terminates wait(); - // f is complete and active-fiber is resumed + // f has teminated and active-fiber is resumed } else { - while ( ! f->is_complete() ) + while ( ! f->is_terminated() ) run(); } - BOOST_ASSERT( f->is_complete() ); + BOOST_ASSERT( f->is_terminated() ); } void @@ -95,7 +95,7 @@ round_robin::cancel( detail::fiber_base::ptr_t const& f) BOOST_ASSERT( f != active_fiber_); // ignore completed fiber - if ( f->is_complete() ) return; + if ( f->is_terminated() ) return; detail::fiber_base::ptr_t tmp = active_fiber_; { @@ -111,15 +111,15 @@ round_robin::cancel( detail::fiber_base::ptr_t const& f) // erase completed fiber from waiting-queue f_idx_.erase( f); - BOOST_ASSERT( f->is_complete() ); + BOOST_ASSERT( f->is_terminated() ); } void round_robin::notify( detail::fiber_base::ptr_t const& f) { BOOST_ASSERT( f); - BOOST_ASSERT( ! f->is_complete() ); - BOOST_ASSERT( ! f->is_resumed() ); + BOOST_ASSERT( ! f->is_terminated() ); + BOOST_ASSERT( ! f->is_running() ); BOOST_ASSERT( f != active_fiber_); // remove fiber from wait-queue @@ -127,8 +127,8 @@ round_robin::notify( detail::fiber_base::ptr_t const& f) // push fiber at the front of the runnable-queue rqueue_.push_front( f); - BOOST_ASSERT( ! f->is_complete() ); - BOOST_ASSERT( ! f->is_resumed() ); + BOOST_ASSERT( ! f->is_terminated() ); + BOOST_ASSERT( ! f->is_running() ); BOOST_ASSERT( f != active_fiber_); } @@ -155,7 +155,7 @@ round_robin::run() rqueue_.pop_front(); BOOST_ASSERT( f_idx_.end() == f_idx_.find( f) ); } - while ( f->is_complete() ); + while ( f->is_terminated() ); //FIXME: test for state_terminated correct? detail::fiber_base::ptr_t tmp = active_fiber_; BOOST_SCOPE_EXIT( & tmp, & active_fiber_) { active_fiber_ = tmp; @@ -170,41 +170,37 @@ void round_robin::wait() { BOOST_ASSERT( active_fiber_); - BOOST_ASSERT( ! active_fiber_->is_complete() ); - BOOST_ASSERT( active_fiber_->is_resumed() ); + BOOST_ASSERT( active_fiber_->is_running() ); // fiber will be added to waiting-queue f_idx_.insert( schedulable( active_fiber_) ); // suspend fiber active_fiber_->suspend(); - // fiber was notified + // fiber is resumed - BOOST_ASSERT( ! active_fiber_->is_complete() ); - BOOST_ASSERT( active_fiber_->is_resumed() ); + BOOST_ASSERT( active_fiber_->is_running() ); } void round_robin::yield() { BOOST_ASSERT( active_fiber_); - BOOST_ASSERT( ! active_fiber_->is_complete() ); - BOOST_ASSERT( active_fiber_->is_resumed() ); + BOOST_ASSERT( active_fiber_->is_running() ); // yield() suspends the fiber and adds it // immediately to runnable-queue rqueue_.push_back( active_fiber_); - active_fiber_->suspend(); + active_fiber_->yield(); // fiber is resumed - BOOST_ASSERT( active_fiber_->is_resumed() ); + BOOST_ASSERT( active_fiber_->is_running() ); } void round_robin::sleep( chrono::system_clock::time_point const& abs_time) { BOOST_ASSERT( active_fiber_); - BOOST_ASSERT( ! active_fiber_->is_complete() ); - BOOST_ASSERT( active_fiber_->is_resumed() ); + BOOST_ASSERT( active_fiber_->is_running() ); if ( abs_time > chrono::system_clock::now() ) { @@ -212,17 +208,17 @@ round_robin::sleep( chrono::system_clock::time_point const& abs_time) // each call of run() will check if dead-line has reached wqueue_.insert( schedulable( active_fiber_, abs_time) ); active_fiber_->suspend(); - // fiber was resumed, dead-line has been reached + // fiber is resumed, dead-line has been reached } - BOOST_ASSERT( ! active_fiber_->is_complete() ); - BOOST_ASSERT( active_fiber_->is_resumed() ); + BOOST_ASSERT( active_fiber_->is_running() ); } void round_robin::migrate_to( detail::fiber_base::ptr_t const& f) { BOOST_ASSERT( f); + BOOST_ASSERT( f->is_ready() ); rqueue_.push_back( f); } @@ -236,6 +232,7 @@ round_robin::migrate_from() { f.swap( rqueue_.front() ); rqueue_.pop_front(); + BOOST_ASSERT( f->is_ready() ); } return f;