diff --git a/include/boost/fiber/condition.hpp b/include/boost/fiber/condition.hpp index 0857f574..88964ca9 100644 --- a/include/boost/fiber/condition.hpp +++ b/include/boost/fiber/condition.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -19,8 +20,8 @@ #include #include #include +#include #include -#include #include #ifdef BOOST_HAS_ABI_HEADERS @@ -66,29 +67,34 @@ public: // check if context was interrupted this_fiber::interruption_point(); context * ctx = context::active(); + typename LockType::mutex_type * mtx = lt.mutex(); + if ( ctx != mtx->owner_) { + throw lock_error( static_cast< int >( std::errc::operation_not_permitted), + "boost fiber: no privilege to perform the operation"); + } + // store this fiber in waiting-queue + // in order notify (resume) this fiber later + detail::spinlock_lock lk( wait_queue_splk_); + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->wait_link( wait_queue_); + // unlock external + //lt.unlock(); + std::function< void() > func([&lk,<](){ + lk.unlock(); + lt.unlock(); + }); + // suspend this fiber + ctx->suspend( & func); try { - // store this fiber in waiting-queue - // in order notify (resume) this fiber later - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); - lk.unlock(); - // unlock external - lt.unlock(); - // suspend this fiber - ctx->suspend(); - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); // check if context was interrupted this_fiber::interruption_point(); - // lock external again before returning - lt.lock(); - } catch (...) { - detail::spinlock_lock lk( wait_queue_splk_); + } catch ( ... ) { ctx->wait_unlink(); throw; } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + // lock external again before returning + lt.lock(); } template< typename LockType, typename Clock, typename Duration > @@ -99,32 +105,37 @@ public: std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) ); context * ctx = context::active(); - try { - // store this fiber in waiting-queue - // in order notify (resume) this fiber later - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); - lk.unlock(); - // unlock external - lt.unlock(); - // suspend this fiber - if ( ! ctx->wait_until( timeout_time) ) { - status = cv_status::timeout; - } - // remove fiber from wait-queue - lk.lock(); + typename LockType::mutex_type * mtx = lt.mutex(); + if ( ctx != mtx->owner_) { + throw lock_error( static_cast< int >( std::errc::operation_not_permitted), + "boost fiber: no privilege to perform the operation"); + } + // store this fiber in waiting-queue + // in order notify (resume) this fiber later + detail::spinlock_lock lk( wait_queue_splk_); + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->wait_link( wait_queue_); + // unlock external + //lt.unlock(); + std::function< void() > func([&lk,<](){ + lk.unlock(); + lt.unlock(); + }); + // suspend this fiber + if ( ! ctx->wait_until( timeout_time, & func) ) { + status = cv_status::timeout; ctx->wait_unlink(); + } + try { // check if context was interrupted this_fiber::interruption_point(); - // lock external again before returning - lt.lock(); - } catch (...) { - detail::spinlock_lock lk( wait_queue_splk_); + } catch ( ... ) { ctx->wait_unlink(); throw; } - + BOOST_ASSERT( ! ctx->wait_is_linked() ); + // lock external again before returning + lt.lock(); return status; } diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index c3f22ecf..8c072aa5 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -109,14 +110,6 @@ typedef intrusive::list_member_hook< > > worker_hook; -struct yield_tag; -typedef intrusive::list_member_hook< - intrusive::tag< yield_tag >, - intrusive::link_mode< - intrusive::auto_unlink - > -> yield_hook; - } struct main_context_t {}; @@ -185,7 +178,6 @@ public: detail::terminated_hook terminated_hook_; detail::wait_hook wait_hook_; detail::worker_hook worker_hook_; - detail::yield_hook yield_hook_; std::chrono::steady_clock::time_point tp_; typedef intrusive::list< @@ -283,12 +275,16 @@ public: // mutable: generated operator() is not const -> enables std::move( fn) // std::make_tuple: stores decayed copies of its args, implicitly unwraps std::reference_wrapper [=,fn_=std::forward< Fn >( fn),tpl_=std::make_tuple( std::forward< Args >( args) ...), - ctx=boost::context::execution_context::current()] () mutable -> void { + ctx=boost::context::execution_context::current()] (void *) mutable -> void { try { auto fn( std::move( fn_) ); auto tpl( std::move( tpl_) ); // jump back after initialization - ctx(); + void * vp = ctx(); + if ( nullptr != vp) { + std::function< void() > * func( static_cast< std::function< void() > * >( vp) ); + ( * func)(); + } // check for unwinding if ( ! unwinding_requested() ) { boost::context::detail::do_invoke( fn, tpl); @@ -312,7 +308,6 @@ public: terminated_hook_(), wait_hook_(), worker_hook_(), - yield_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), wait_queue_(), @@ -328,9 +323,9 @@ public: id get_id() const noexcept; - void resume(); + std::function< void() > * resume( std::function< void() > *); - void suspend() noexcept; + void suspend( std::function< void() > * = nullptr) noexcept; void release() noexcept; @@ -338,7 +333,8 @@ public: void yield() noexcept; - bool wait_until( std::chrono::steady_clock::time_point const&) noexcept; + bool wait_until( std::chrono::steady_clock::time_point const&, + std::function< void() > * = nullptr) noexcept; void set_ready( context *) noexcept; @@ -402,8 +398,6 @@ public: bool worker_is_linked() const; - bool yield_is_linked() const; - template< typename List > void ready_link( List & lst) noexcept { lst.push_back( * this); @@ -434,11 +428,6 @@ public: lst.push_back( * this); } - template< typename List > - void yield_link( List & lst) noexcept { - lst.push_back( * this); - } - void ready_unlink() noexcept; void remote_ready_unlink() noexcept; @@ -449,8 +438,6 @@ public: void worker_unlink() noexcept; - void yield_unlink() noexcept; - void attach( context *); void detach( context *); diff --git a/include/boost/fiber/mutex.hpp b/include/boost/fiber/mutex.hpp index 669d823e..e279ebc6 100644 --- a/include/boost/fiber/mutex.hpp +++ b/include/boost/fiber/mutex.hpp @@ -7,8 +7,6 @@ #ifndef BOOST_FIBERS_MUTEX_H #define BOOST_FIBERS_MUTEX_H -#include - #include #include @@ -22,22 +20,18 @@ namespace boost { namespace fibers { +class condition; + class BOOST_FIBERS_DECL mutex { private: - enum class mutex_status { - locked = 0, - unlocked - }; + friend class condition; typedef context::wait_queue_t wait_queue_t; - std::atomic< mutex_status > state_; - std::atomic< context * > owner_; + context * owner_; wait_queue_t wait_queue_; detail::spinlock wait_queue_splk_; - bool lock_if_unlocked_(); - public: mutex(); diff --git a/include/boost/fiber/recursive_mutex.hpp b/include/boost/fiber/recursive_mutex.hpp index 552a8711..d9baa8bb 100644 --- a/include/boost/fiber/recursive_mutex.hpp +++ b/include/boost/fiber/recursive_mutex.hpp @@ -9,7 +9,6 @@ #ifndef BOOST_FIBERS_RECURSIVE_MUTEX_H #define BOOST_FIBERS_RECURSIVE_MUTEX_H -#include #include #include @@ -25,23 +24,19 @@ namespace boost { namespace fibers { +class condition; + class BOOST_FIBERS_DECL recursive_mutex { private: - enum class mutex_status { - locked = 0, - unlocked - }; + friend class condition; typedef context::wait_queue_t wait_queue_t; - std::atomic< mutex_status > state_; - std::atomic< context * > owner_; + context * owner_; std::size_t count_; wait_queue_t wait_queue_; detail::spinlock wait_queue_splk_; - bool lock_if_unlocked_(); - public: recursive_mutex(); diff --git a/include/boost/fiber/recursive_timed_mutex.hpp b/include/boost/fiber/recursive_timed_mutex.hpp index 64b0fb05..061f3aed 100644 --- a/include/boost/fiber/recursive_timed_mutex.hpp +++ b/include/boost/fiber/recursive_timed_mutex.hpp @@ -9,7 +9,6 @@ #ifndef BOOST_FIBERS_RECURSIVE_TIMED_MUTEX_H #define BOOST_FIBERS_RECURSIVE_TIMED_MUTEX_H -#include #include #include @@ -27,23 +26,19 @@ namespace boost { namespace fibers { +class condition; + class BOOST_FIBERS_DECL recursive_timed_mutex { private: - enum class mutex_status { - locked = 0, - unlocked - }; + friend class condition; typedef context::wait_queue_t wait_queue_t; - std::atomic< mutex_status > state_; - std::atomic< context * > owner_; + context * owner_; std::size_t count_; wait_queue_t wait_queue_; detail::spinlock wait_queue_splk_; - bool lock_if_unlocked_(); - bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time); public: diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index 283d248b..8cdee185 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -7,7 +7,9 @@ #define BOOST_FIBERS_FIBER_MANAGER_H #include +#include #include +#include #include #include @@ -44,11 +46,6 @@ public: intrusive::member_hook< context, detail::remote_ready_hook, & context::remote_ready_hook_ >, intrusive::constant_time_size< false > > remote_ready_queue_t; - typedef intrusive::list< - context, - intrusive::member_hook< - context, detail::yield_hook, & context::yield_hook_ >, - intrusive::constant_time_size< false > > yield_queue_t; typedef intrusive::set< context, intrusive::member_hook< @@ -85,12 +82,11 @@ private: remote_ready_queue_t remote_ready_queue_; // sleep-queue cotnains context' whic hahve been called // scheduler::wait_until() - yield_queue_t yield_queue_; sleep_queue_t sleep_queue_; bool shutdown_; detail::spinlock remote_ready_splk_; - void resume_( context *, context *); + void resume_( context *, context *, std::function< void() > *); context * get_next_() noexcept; @@ -98,8 +94,6 @@ private: void remote_ready2ready_(); - void yield2ready_(); - void sleep2ready_() noexcept; public: @@ -120,9 +114,12 @@ public: void yield( context *) noexcept; - bool wait_until( context *, std::chrono::steady_clock::time_point const&) noexcept; + bool wait_until( context *, + std::chrono::steady_clock::time_point const&, + std::function< void() > * = nullptr) noexcept; - void re_schedule( context *) noexcept; + void re_schedule( context *, + std::function< void() > * = nullptr) noexcept; bool has_ready_fibers() const noexcept; diff --git a/include/boost/fiber/timed_mutex.hpp b/include/boost/fiber/timed_mutex.hpp index e4d5ddbe..c78c10b7 100644 --- a/include/boost/fiber/timed_mutex.hpp +++ b/include/boost/fiber/timed_mutex.hpp @@ -7,7 +7,6 @@ #ifndef BOOST_FIBERS_TIMED_MUTEX_H #define BOOST_FIBERS_TIMED_MUTEX_H -#include #include #include @@ -24,22 +23,18 @@ namespace boost { namespace fibers { +class condition; + class BOOST_FIBERS_DECL timed_mutex { private: - enum class mutex_status { - locked = 0, - unlocked - }; + friend class condition; typedef context::wait_queue_t wait_queue_t; - std::atomic< mutex_status > state_; - std::atomic< context * > owner_; + context * owner_; wait_queue_t wait_queue_; detail::spinlock wait_queue_splk_; - bool lock_if_unlocked_(); - bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time); public: diff --git a/src/condition.cpp b/src/condition.cpp index 7be9bb91..1963526e 100644 --- a/src/condition.cpp +++ b/src/condition.cpp @@ -26,18 +26,15 @@ condition::~condition() { void condition::notify_one() { - context * ctx( nullptr); // get one context' from wait-queue detail::spinlock_lock lk( wait_queue_splk_); - if ( ! wait_queue_.empty() ) { - ctx = & wait_queue_.front(); - wait_queue_.pop_front(); + if ( wait_queue_.empty() ) { + return; } - lk.unlock(); + context * ctx = & wait_queue_.front(); + wait_queue_.pop_front(); // notify context - if ( nullptr != ctx) { - context::active()->set_ready( ctx); - } + context::active()->set_ready( ctx); } void @@ -45,13 +42,10 @@ condition::notify_all() { wait_queue_t tmp; // get all context' from wait-queue detail::spinlock_lock lk( wait_queue_splk_); - tmp.swap( wait_queue_); - lk.unlock(); // notify all context' - while ( ! tmp.empty() ) { - context * ctx = & tmp.front(); - tmp.pop_front(); - BOOST_ASSERT( nullptr != ctx); + while ( ! wait_queue_.empty() ) { + context * ctx = & wait_queue_.front(); + wait_queue_.pop_front(); context::active()->set_ready( ctx); } } diff --git a/src/context.cpp b/src/context.cpp index 3ad3a781..142a2330 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -172,7 +172,6 @@ context::context( main_context_t) : terminated_hook_(), wait_hook_(), worker_hook_(), - yield_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), wait_queue_(), @@ -187,7 +186,11 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall flags_( flag_dispatcher_context), scheduler_( nullptr), ctx_( std::allocator_arg, palloc, salloc, - [=] () -> void { + [=] (void * vp) -> void { + if ( nullptr != vp) { + std::function< void() > * func( static_cast< std::function< void() > * >( vp) ); + ( * func)(); + } // execute scheduler::dispatch() sched->dispatch(); // dispatcher context should never return from scheduler::dispatch() @@ -199,7 +202,6 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall terminated_hook_(), wait_hook_(), worker_hook_(), - yield_hook_(), tp_( (std::chrono::steady_clock::time_point::max)() ), fss_data_(), wait_queue_(), @@ -226,20 +228,19 @@ context::get_id() const noexcept { return id( const_cast< context * >( this) ); } -void -context::resume() { - ctx_(); +std::function< void() > * +context::resume( std::function< void() > * func) { + return static_cast< std::function< void() > * >( ctx_( func) ); } void -context::suspend() noexcept { - scheduler_->re_schedule( this); +context::suspend( std::function< void() > * func) noexcept { + scheduler_->re_schedule( this, func); } void context::release() noexcept { BOOST_ASSERT( is_terminated() ); - wait_queue_t tmp; // protect for concurrent access std::unique_lock< detail::spinlock > lk( splk_); @@ -296,11 +297,11 @@ context::yield() noexcept { } bool -context::wait_until( std::chrono::steady_clock::time_point const& tp) noexcept { +context::wait_until( std::chrono::steady_clock::time_point const& tp, + std::function< void() > * func) noexcept { BOOST_ASSERT( nullptr != scheduler_); BOOST_ASSERT( this == active_); - - return scheduler_->wait_until( this, tp); + return scheduler_->wait_until( this, tp, func); } void @@ -409,11 +410,6 @@ context::remote_ready_is_linked() const { return remote_ready_hook_.is_linked(); } -bool -context::yield_is_linked() const { - return yield_hook_.is_linked(); -} - bool context::sleep_is_linked() const { return sleep_hook_.is_linked(); @@ -439,11 +435,6 @@ context::remote_ready_unlink() noexcept { remote_ready_hook_.unlink(); } -void -context::yield_unlink() noexcept { - yield_hook_.unlink(); -} - void context::sleep_unlink() noexcept { sleep_hook_.unlink(); diff --git a/src/mutex.cpp b/src/mutex.cpp index c8007b0d..f9b15d41 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -7,9 +7,12 @@ #include "boost/fiber/mutex.hpp" #include +#include +#include #include +#include "boost/fiber/exceptions.hpp" #include "boost/fiber/scheduler.hpp" #ifdef BOOST_HAS_ABI_HEADERS @@ -19,84 +22,68 @@ namespace boost { namespace fibers { -bool -mutex::lock_if_unlocked_() { - if ( mutex_status::locked == state_.load( std::memory_order_relaxed) ) { - return false; - } - if ( mutex_status::unlocked != state_.exchange( mutex_status::locked, std::memory_order_acquire) ) { - return false; - } - BOOST_ASSERT( nullptr == owner_.load()); - owner_ = context::active(); - return true; -} - mutex::mutex() : - state_( mutex_status::unlocked), owner_( nullptr), wait_queue_(), wait_queue_splk_() { } mutex::~mutex() { - BOOST_ASSERT( nullptr == owner_.load()); + BOOST_ASSERT( nullptr == owner_); BOOST_ASSERT( wait_queue_.empty() ); } void mutex::lock() { context * ctx = context::active(); - for (;;) { - try { - if ( lock_if_unlocked_() ) { - return; - } - // store this fiber in order to be notified later - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); - lk.unlock(); - // suspend this fiber - ctx->suspend(); - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); - } catch (...) { - // remove fiber from wait-queue - detail::spinlock_lock lk( wait_queue_splk_); - ctx->wait_unlink(); - throw; - } + // store this fiber in order to be notified later + detail::spinlock_lock lk( wait_queue_splk_); + if ( ctx == owner_) { + throw lock_error( static_cast< int >( std::errc::resource_deadlock_would_occur), + "boost fiber: a deadlock is detected"); + } else if ( nullptr == owner_) { + owner_ = ctx; + return; } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->wait_link( wait_queue_); + std::function< void() > func([&lk](){ + lk.unlock(); + }); + // suspend this fiber + ctx->suspend( & func); + BOOST_ASSERT( ! ctx->wait_is_linked() ); } bool mutex::try_lock() { - if ( lock_if_unlocked_() ) { - return true; + context * ctx = context::active(); + detail::spinlock_lock lk( wait_queue_splk_); + if ( nullptr == owner_) { + owner_ = ctx; } + lk.unlock(); // let other fiber release the lock context::active()->yield(); - return false; + return ctx == owner_; } void mutex::unlock() { - BOOST_ASSERT( mutex_status::locked == state_); - BOOST_ASSERT( context::active() == owner_); + context * ctx = context::active(); detail::spinlock_lock lk( wait_queue_splk_); - context * ctx( nullptr); +// if ( ctx != owner_) { +// throw lock_error( static_cast< int >( std::errc::operation_not_permitted), +// "boost fiber: no privilege to perform the operation"); +// } if ( ! wait_queue_.empty() ) { - ctx = & wait_queue_.front(); + context * ctx = & wait_queue_.front(); wait_queue_.pop_front(); - BOOST_ASSERT( nullptr != ctx); - } - lk.unlock(); - owner_ = nullptr; - state_.store( mutex_status::unlocked, std::memory_order_release); - if ( nullptr != ctx) { + owner_ = ctx; context::active()->set_ready( ctx); + } else { + owner_ = nullptr; + return; } } diff --git a/src/recursive_mutex.cpp b/src/recursive_mutex.cpp index 94c1ebfb..b8a61cd4 100644 --- a/src/recursive_mutex.cpp +++ b/src/recursive_mutex.cpp @@ -7,9 +7,11 @@ #include "boost/fiber/recursive_mutex.hpp" #include +#include #include +#include "boost/fiber/exceptions.hpp" #include "boost/fiber/scheduler.hpp" #ifdef BOOST_HAS_ABI_HEADERS @@ -19,32 +21,7 @@ namespace boost { namespace fibers { -bool -recursive_mutex::lock_if_unlocked_() { - if ( mutex_status::locked == state_.load( std::memory_order_relaxed) ) { - if ( context::active() == owner_) { - ++count_; - return true; - } else { - return false; - } - } - if ( mutex_status::unlocked != state_.exchange( mutex_status::locked, std::memory_order_acquire) ) { - if ( context::active() == owner_) { - ++count_; - return true; - } else { - return false; - } - } - BOOST_ASSERT( nullptr == owner_.load()); - owner_ = context::active(); - ++count_; - return true; -} - recursive_mutex::recursive_mutex() : - state_( mutex_status::unlocked), owner_( nullptr), count_( 0), wait_queue_(), @@ -52,65 +29,68 @@ recursive_mutex::recursive_mutex() : } recursive_mutex::~recursive_mutex() { - BOOST_ASSERT( nullptr == owner_.load()); + BOOST_ASSERT( nullptr == owner_); BOOST_ASSERT( 0 == count_); BOOST_ASSERT( wait_queue_.empty() ); } void recursive_mutex::lock() { - context * ctx = context::active(); - for (;;) { - try { - if ( lock_if_unlocked_() ) { - return; - } - // store this fiber in order to be notified later - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); - lk.unlock(); - // suspend this fiber - ctx->suspend(); - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); - } catch (...) { - // remove fiber from wait-queue - detail::spinlock_lock lk( wait_queue_splk_); - ctx->wait_unlink(); - throw; - } + context * ctx = context::active(); + // store this fiber in order to be notified later + detail::spinlock_lock lk( wait_queue_splk_); + if ( ctx == owner_) { + ++count_; + return; + } else if ( nullptr == owner_) { + owner_ = ctx; + count_ = 1; + return; } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->wait_link( wait_queue_); + std::function< void() > func([&lk](){ + lk.unlock(); + }); + // suspend this fiber + ctx->suspend( & func); + BOOST_ASSERT( ! ctx->wait_is_linked() ); } bool recursive_mutex::try_lock() { - if ( lock_if_unlocked_() ) { - return true; + context * ctx = context::active(); + detail::spinlock_lock lk( wait_queue_splk_); + if ( nullptr == owner_) { + owner_ = ctx; + count_ = 1; + } else if ( ctx == owner_) { + ++count_; } + lk.unlock(); // let other fiber release the lock context::active()->yield(); - return false; + return ctx == owner_; } void recursive_mutex::unlock() { - BOOST_ASSERT( mutex_status::locked == state_); - BOOST_ASSERT( context::active() == owner_); + context * ctx = context::active(); detail::spinlock_lock lk( wait_queue_splk_); - context * ctx( nullptr); + if ( ctx != owner_) { + throw lock_error( static_cast< int >( std::errc::operation_not_permitted), + "boost fiber: no privilege to perform the operation"); + } if ( 0 == --count_) { if ( ! wait_queue_.empty() ) { - ctx = & wait_queue_.front(); + context * ctx = & wait_queue_.front(); wait_queue_.pop_front(); - BOOST_ASSERT( nullptr != ctx); - } - lk.unlock(); - owner_ = nullptr; - state_.store( mutex_status::unlocked, std::memory_order_release); - if ( nullptr != ctx) { + owner_ = ctx; + count_ = 1; context::active()->set_ready( ctx); + } else { + owner_ = nullptr; + return; } } } diff --git a/src/recursive_timed_mutex.cpp b/src/recursive_timed_mutex.cpp index 909e4bb3..e7291b92 100644 --- a/src/recursive_timed_mutex.cpp +++ b/src/recursive_timed_mutex.cpp @@ -7,11 +7,12 @@ #include "boost/fiber/recursive_timed_mutex.hpp" #include +#include #include +#include "boost/fiber/exceptions.hpp" #include "boost/fiber/scheduler.hpp" -#include "boost/fiber/interruption.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -20,32 +21,7 @@ namespace boost { namespace fibers { -bool -recursive_timed_mutex::lock_if_unlocked_() { - if ( mutex_status::locked == state_.load( std::memory_order_relaxed) ) { - if ( context::active() == owner_) { - ++count_; - return true; - } else { - return false; - } - } - if ( mutex_status::unlocked != state_.exchange( mutex_status::locked, std::memory_order_acquire) ) { - if ( context::active() == owner_) { - ++count_; - return true; - } else { - return false; - } - } - BOOST_ASSERT( nullptr == owner_.load()); - owner_ = context::active(); - ++count_; - return true; -} - recursive_timed_mutex::recursive_timed_mutex() : - state_( mutex_status::unlocked), owner_( nullptr), count_( 0), wait_queue_(), @@ -53,7 +29,7 @@ recursive_timed_mutex::recursive_timed_mutex() : } recursive_timed_mutex::~recursive_timed_mutex() { - BOOST_ASSERT( nullptr == owner_.load()); + BOOST_ASSERT( nullptr == owner_); BOOST_ASSERT( 0 == count_); BOOST_ASSERT( wait_queue_.empty() ); } @@ -61,92 +37,92 @@ recursive_timed_mutex::~recursive_timed_mutex() { void recursive_timed_mutex::lock() { context * ctx = context::active(); - for (;;) { - try { - if ( lock_if_unlocked_() ) { - return; - } - // store this fiber in order to be notified later - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); - lk.unlock(); - // suspend this fiber - ctx->suspend(); - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); - } catch (...) { - // remove fiber from wait-queue - detail::spinlock_lock lk( wait_queue_splk_); - ctx->wait_unlink(); - throw; - } + // store this fiber in order to be notified later + detail::spinlock_lock lk( wait_queue_splk_); + if ( ctx == owner_) { + ++count_; + return; + } else if ( nullptr == owner_) { + owner_ = ctx; + count_ = 1; + return; } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->wait_link( wait_queue_); + std::function< void() > func([&lk](){ + lk.unlock(); + }); + // suspend this fiber + ctx->suspend( & func); + BOOST_ASSERT( ! ctx->wait_is_linked() ); } bool recursive_timed_mutex::try_lock() { - if ( lock_if_unlocked_() ) { - return true; + context * ctx = context::active(); + detail::spinlock_lock lk( wait_queue_splk_); + if ( nullptr == owner_) { + owner_ = ctx; + count_ = 1; + } else if ( ctx == owner_) { + ++count_; } + lk.unlock(); // let other fiber release the lock context::active()->yield(); - return false; + return ctx == owner_; } bool recursive_timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) { - context * ctx = context::active(); - for (;;) { - try { - if ( std::chrono::steady_clock::now() > timeout_time) { - return false; - } - if ( lock_if_unlocked_() ) { - return true; - } - // store this fiber in order to be notified later - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); - lk.unlock(); - // suspend this fiber until notified or timed-out - if ( ! ctx->wait_until( timeout_time) ) { - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); - return false; - } - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); - } catch (...) { - // remove fiber from wait-queue - detail::spinlock_lock lk( wait_queue_splk_); - ctx->wait_unlink(); - throw; - } + if ( std::chrono::steady_clock::now() > timeout_time) { + return false; } + context * ctx = context::active(); + // store this fiber in order to be notified later + detail::spinlock_lock lk( wait_queue_splk_); + if ( ctx == owner_) { + ++count_; + return true; + } else if ( nullptr == owner_) { + owner_ = ctx; + count_ = 1; + return true; + } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->wait_link( wait_queue_); + std::function< void() > func([&lk](){ + lk.unlock(); + }); + // suspend this fiber until notified or timed-out + if ( ! context::active()->wait_until( timeout_time, & func) ) { + // remove fiber from wait-queue + lk.lock(); + ctx->wait_unlink(); + return false; + } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + return ctx == owner_; } void recursive_timed_mutex::unlock() { - BOOST_ASSERT( mutex_status::locked == state_); - BOOST_ASSERT( context::active() == owner_); + context * ctx = context::active(); detail::spinlock_lock lk( wait_queue_splk_); - context * ctx( nullptr); + if ( ctx != owner_) { + throw lock_error( static_cast< int >( std::errc::operation_not_permitted), + "boost fiber: no privilege to perform the operation"); + } if ( 0 == --count_) { if ( ! wait_queue_.empty() ) { - ctx = & wait_queue_.front(); + context * ctx = & wait_queue_.front(); wait_queue_.pop_front(); - BOOST_ASSERT( nullptr != ctx); - } - lk.unlock(); - owner_ = nullptr; - state_.store( mutex_status::unlocked, std::memory_order_release); - if ( nullptr != ctx) { + owner_ = ctx; + count_ = 1; context::active()->set_ready( ctx); + } else { + owner_ = nullptr; + return; } } } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 81263fc3..9a62a4ac 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -23,7 +23,7 @@ namespace boost { namespace fibers { void -scheduler::resume_( context * active_ctx, context * ctx) { +scheduler::resume_( context * active_ctx, context * ctx, std::function< void() > * func) { BOOST_ASSERT( nullptr != active_ctx); BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( main_ctx_ == active_ctx || @@ -43,19 +43,14 @@ scheduler::resume_( context * active_ctx, context * ctx) { // assign new fiber to active-fiber context::active( ctx); // resume active-fiber == ctx - ctx->resume(); + func = ctx->resume( func); BOOST_ASSERT( context::active() == active_ctx); BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); - // move yielded context' to ready-queue - // note that we cann't call simply yield2ready_() - // because the context might be moved to another - // thread during suspension - // `this`might be cached and thus points to the - // previous scheduler (cotnaiend in the previous thread) - active_ctx->get_scheduler()->yield2ready_(); - // check if unwinding was requested + if ( nullptr != func) { + ( * func)(); + } if ( active_ctx->unwinding_requested() ) { throw forced_unwind(); } @@ -107,17 +102,6 @@ scheduler::remote_ready2ready_() { } } -void -scheduler::yield2ready_() { - // get context from yield-queue - while ( ! yield_queue_.empty() ) { - context * ctx = & yield_queue_.front(); - yield_queue_.pop_front(); - BOOST_ASSERT( ! ctx->ready_is_linked() ); - set_ready( ctx); - } -} - void scheduler::sleep2ready_() noexcept { // move context which the deadline has reached @@ -157,7 +141,6 @@ scheduler::scheduler() noexcept : worker_queue_(), terminated_queue_(), remote_ready_queue_(), - yield_queue_(), sleep_queue_(), shutdown_( false), remote_ready_splk_() { @@ -170,13 +153,12 @@ scheduler::~scheduler() noexcept { // signal dispatcher-context termination shutdown_ = true; // resume pending fibers - resume_( main_ctx_, get_next_() ); + resume_( main_ctx_, get_next_(), nullptr); // no context' in worker-queue BOOST_ASSERT( worker_queue_.empty() ); BOOST_ASSERT( terminated_queue_.empty() ); BOOST_ASSERT( ! sched_algo_->has_ready_fibers() ); BOOST_ASSERT( remote_ready_queue_.empty() ); - BOOST_ASSERT( yield_queue_.empty() ); BOOST_ASSERT( sleep_queue_.empty() ); // set active context to nullptr context::reset_active(); @@ -190,8 +172,6 @@ void scheduler::dispatch() { BOOST_ASSERT( context::active() == dispatcher_ctx_); while ( ! shutdown_) { - // move yielded context' to ready-queue - yield2ready_(); // release termianted context' release_terminated_(); // get sleeping context' @@ -224,7 +204,7 @@ scheduler::dispatch() { // push dispatcher-context to ready-queue // so that ready-queue never becomes empty sched_algo_->awakened( dispatcher_ctx_.get() ); - resume_( dispatcher_ctx_.get(), ctx); + resume_( dispatcher_ctx_.get(), ctx, nullptr); BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); } // loop till all context' have been terminated @@ -247,7 +227,7 @@ scheduler::dispatch() { // release termianted context' release_terminated_(); // return to main-context - resume_( dispatcher_ctx_.get(), main_ctx_); + resume_( dispatcher_ctx_.get(), main_ctx_, nullptr); } void @@ -322,18 +302,22 @@ scheduler::yield( context * active_ctx) noexcept { // we do not test for wait-queue because // context::wait_is_linked() is not sychronized // with other threads - // push active context to yield-queue + // defer passing active context to set_ready() // in work-sharing context (multiple threads read // from one ready-queue) the context must be // already suspended until another thread resumes it - active_ctx->yield_link( yield_queue_); + // (== maked as ready) + std::function< void() > func([=](){ + set_ready( active_ctx); + }); // resume another fiber - resume_( active_ctx, get_next_() ); + resume_( active_ctx, get_next_(), & func); } bool scheduler::wait_until( context * active_ctx, - std::chrono::steady_clock::time_point const& sleep_tp) noexcept { + std::chrono::steady_clock::time_point const& sleep_tp, + std::function< void() > * func) noexcept { BOOST_ASSERT( nullptr != active_ctx); BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || @@ -356,20 +340,21 @@ scheduler::wait_until( context * active_ctx, active_ctx->tp_ = sleep_tp; active_ctx->sleep_link( sleep_queue_); // resume another context - resume_( active_ctx, get_next_() ); + resume_( active_ctx, get_next_(), func); // context has been resumed // check if deadline has reached return std::chrono::steady_clock::now() < sleep_tp; } void -scheduler::re_schedule( context * active_ctx) noexcept { +scheduler::re_schedule( context * active_ctx, + std::function< void() > * func) noexcept { BOOST_ASSERT( nullptr != active_ctx); BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); // resume another context - resume_( active_ctx, get_next_() ); + resume_( active_ctx, get_next_(), func); } bool @@ -424,7 +409,6 @@ scheduler::attach_worker_context( context * ctx) noexcept { BOOST_ASSERT( ! ctx->terminated_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); BOOST_ASSERT( ! ctx->worker_is_linked() ); - BOOST_ASSERT( ! ctx->yield_is_linked() ); ctx->worker_link( worker_queue_); ctx->scheduler_ = this; } @@ -437,7 +421,6 @@ scheduler::detach_worker_context( context * ctx) noexcept { BOOST_ASSERT( ! ctx->sleep_is_linked() ); BOOST_ASSERT( ! ctx->terminated_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); - BOOST_ASSERT( ! ctx->yield_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->worker_unlink(); } diff --git a/src/timed_mutex.cpp b/src/timed_mutex.cpp index 8ecc5e1f..c7c21538 100644 --- a/src/timed_mutex.cpp +++ b/src/timed_mutex.cpp @@ -7,11 +7,12 @@ #include "boost/fiber/timed_mutex.hpp" #include +#include #include +#include "boost/fiber/exceptions.hpp" #include "boost/fiber/scheduler.hpp" -#include "boost/fiber/interruption.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -20,119 +21,99 @@ namespace boost { namespace fibers { -bool -timed_mutex::lock_if_unlocked_() { - if ( mutex_status::locked == state_.load( std::memory_order_relaxed) ) { - return false; - } - if ( mutex_status::unlocked != state_.exchange( mutex_status::locked, std::memory_order_acquire) ) { - return false; - } - BOOST_ASSERT( nullptr == owner_.load()); - owner_ = context::active(); - return true; -} - timed_mutex::timed_mutex() : - state_( mutex_status::unlocked), owner_( nullptr), wait_queue_(), wait_queue_splk_() { } timed_mutex::~timed_mutex() { - BOOST_ASSERT( nullptr == owner_.load()); + BOOST_ASSERT( nullptr == owner_); BOOST_ASSERT( wait_queue_.empty() ); } void timed_mutex::lock() { context * ctx = context::active(); - for (;;) { - try { - if ( lock_if_unlocked_() ) { - return; - } - // store this fiber in order to be notified later - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); - lk.unlock(); - // suspend this fiber - ctx->suspend(); - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); - } catch (...) { - // remove fiber from wait-queue - detail::spinlock_lock lk( wait_queue_splk_); - ctx->wait_unlink(); - throw; - } + // store this fiber in order to be notified later + detail::spinlock_lock lk( wait_queue_splk_); + if ( ctx == owner_) { + throw lock_error( static_cast< int >( std::errc::resource_deadlock_would_occur), + "boost fiber: a deadlock is detected"); + } else if ( nullptr == owner_) { + owner_ = ctx; + return; } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->wait_link( wait_queue_); + std::function< void() > func([&lk](){ + lk.unlock(); + }); + // suspend this fiber + ctx->suspend( & func); + BOOST_ASSERT( ! ctx->wait_is_linked() ); } bool timed_mutex::try_lock() { - if ( lock_if_unlocked_() ) { - return true; + context * ctx = context::active(); + detail::spinlock_lock lk( wait_queue_splk_); + if ( nullptr == owner_) { + owner_ = ctx; } + lk.unlock(); // let other fiber release the lock context::active()->yield(); - return false; + return ctx == owner_; } bool timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) { - context * ctx = context::active(); - for (;;) { - try { - if ( std::chrono::steady_clock::now() > timeout_time) { - return false; - } - if ( lock_if_unlocked_() ) { - return true; - } - // store this fiber in order to be notified later - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); - lk.unlock(); - // suspend this fiber until notified or timed-out - if ( ! context::active()->wait_until( timeout_time) ) { - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); - return false; - } - // remove fiber from wait-queue - lk.lock(); - ctx->wait_unlink(); - } catch (...) { - // remove fiber from wait-queue - detail::spinlock_lock lk( wait_queue_splk_); - ctx->wait_unlink(); - throw; - } + if ( std::chrono::steady_clock::now() > timeout_time) { + return false; } + context * ctx = context::active(); + // store this fiber in order to be notified later + detail::spinlock_lock lk( wait_queue_splk_); + if ( ctx == owner_) { + throw lock_error( static_cast< int >( std::errc::resource_deadlock_would_occur), + "boost fiber: a deadlock is detected"); + } else if ( nullptr == owner_) { + owner_ = ctx; + return true; + } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + ctx->wait_link( wait_queue_); + std::function< void() > func([&lk](){ + lk.unlock(); + }); + // suspend this fiber until notified or timed-out + if ( ! context::active()->wait_until( timeout_time, & func) ) { + // remove fiber from wait-queue + lk.lock(); + ctx->wait_unlink(); + return false; + } + BOOST_ASSERT( ! ctx->wait_is_linked() ); + return true; } void timed_mutex::unlock() { - BOOST_ASSERT( mutex_status::locked == state_); - BOOST_ASSERT( context::active() == owner_); + context * ctx = context::active(); detail::spinlock_lock lk( wait_queue_splk_); - context * ctx( nullptr); - if ( ! wait_queue_.empty() ) { - ctx = & wait_queue_.front(); - wait_queue_.pop_front(); - BOOST_ASSERT( nullptr != ctx); + if ( ctx != owner_) { + throw lock_error( static_cast< int >( std::errc::operation_not_permitted), + "boost fiber: no privilege to perform the operation"); } - lk.unlock(); - owner_ = nullptr; - state_.store( mutex_status::unlocked, std::memory_order_relaxed); - if ( nullptr != ctx) { + if ( ! wait_queue_.empty() ) { + context * ctx = & wait_queue_.front(); + wait_queue_.pop_front(); + owner_ = ctx; context::active()->set_ready( ctx); + } else { + owner_ = nullptr; + return; } } diff --git a/test/Jamfile.v2 b/test/Jamfile.v2 index 23e9d00e..518cb397 100644 --- a/test/Jamfile.v2 +++ b/test/Jamfile.v2 @@ -155,50 +155,50 @@ run test_future.cpp : cxx11_variadic_templates cxx14_initialized_lambda_captures ] ; -#run test_mutex_mt.cpp : -# : : -# [ requires cxx11_constexpr -# cxx11_decltype -# cxx11_deleted_functions -# cxx11_explicit_conversion_operators -# cxx11_hdr_tuple -# cxx11_lambdas -# cxx11_noexcept -# cxx11_nullptr -# cxx11_template_aliases -# cxx11_rvalue_references -# cxx11_variadic_macros -# cxx11_variadic_templates -# cxx14_initialized_lambda_captures ] ; -# -#run test_condition_mt.cpp : -#: : -#[ requires cxx11_constexpr -# cxx11_decltype -# cxx11_deleted_functions -# cxx11_explicit_conversion_operators -# cxx11_hdr_tuple -# cxx11_lambdas -# cxx11_noexcept -# cxx11_nullptr -# cxx11_template_aliases -# cxx11_rvalue_references -# cxx11_variadic_macros -# cxx11_variadic_templates -# cxx14_initialized_lambda_captures ] ; -# -#run test_future_mt.cpp : -#: : -#[ requires cxx11_constexpr -# cxx11_decltype -# cxx11_deleted_functions -# cxx11_explicit_conversion_operators -# cxx11_hdr_tuple -# cxx11_lambdas -# cxx11_noexcept -# cxx11_nullptr -# cxx11_template_aliases -# cxx11_rvalue_references -# cxx11_variadic_macros -# cxx11_variadic_templates -# cxx14_initialized_lambda_captures ] ; +run test_mutex_mt.cpp : + : : + [ requires cxx11_constexpr + cxx11_decltype + cxx11_deleted_functions + cxx11_explicit_conversion_operators + cxx11_hdr_tuple + cxx11_lambdas + cxx11_noexcept + cxx11_nullptr + cxx11_template_aliases + cxx11_rvalue_references + cxx11_variadic_macros + cxx11_variadic_templates + cxx14_initialized_lambda_captures ] ; + +run test_condition_mt.cpp : +: : +[ requires cxx11_constexpr + cxx11_decltype + cxx11_deleted_functions + cxx11_explicit_conversion_operators + cxx11_hdr_tuple + cxx11_lambdas + cxx11_noexcept + cxx11_nullptr + cxx11_template_aliases + cxx11_rvalue_references + cxx11_variadic_macros + cxx11_variadic_templates + cxx14_initialized_lambda_captures ] ; + +run test_future_mt.cpp : +: : +[ requires cxx11_constexpr + cxx11_decltype + cxx11_deleted_functions + cxx11_explicit_conversion_operators + cxx11_hdr_tuple + cxx11_lambdas + cxx11_noexcept + cxx11_nullptr + cxx11_template_aliases + cxx11_rvalue_references + cxx11_variadic_macros + cxx11_variadic_templates + cxx14_initialized_lambda_captures ] ;