From 4187c68f4e1726e26a0be122e85dbebe993efd0c Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Sat, 5 Sep 2015 10:36:57 +0200 Subject: [PATCH] release mutex before signal condition-variable --- include/boost/fiber/bounded_channel.hpp | 40 ++++++---- .../fiber/future/detail/shared_state.hpp | 77 +++++++++++-------- include/boost/fiber/unbounded_channel.hpp | 32 ++++---- src/barrier.cpp | 1 + 4 files changed, 84 insertions(+), 66 deletions(-) diff --git a/include/boost/fiber/bounded_channel.hpp b/include/boost/fiber/bounded_channel.hpp index 4fbe82a7..0a084555 100644 --- a/include/boost/fiber/bounded_channel.hpp +++ b/include/boost/fiber/bounded_channel.hpp @@ -96,8 +96,9 @@ private: return queue_status::closed == state_; } - void close_() { + void close_( std::unique_lock< boost::fibers::mutex > & lk) { state_ = queue_status::closed; + lk.unlock(); not_empty_cond_.notify_all(); not_full_cond_.notify_all(); } @@ -115,7 +116,7 @@ private: } channel_op_status push_( typename node::ptr const& new_node, - std::unique_lock< boost::fibers::mutex > & lk ) { + std::unique_lock< boost::fibers::mutex > & lk) { if ( is_closed_() ) { return channel_op_status::closed; } @@ -124,10 +125,11 @@ private: not_full_cond_.wait( lk); } - return push_and_notify_( new_node); + return push_and_notify_( new_node, lk); } - channel_op_status try_push_( typename node::ptr const& new_node) { + channel_op_status try_push_( typename node::ptr const& new_node, + std::unique_lock< boost::fibers::mutex > & lk) { if ( is_closed_() ) { return channel_op_status::closed; } @@ -136,7 +138,7 @@ private: return channel_op_status::full; } - return push_and_notify_( new_node); + return push_and_notify_( new_node, lk); } template< typename Clock, typename Duration > @@ -153,17 +155,19 @@ private: } } - return push_and_notify_( new_node); + return push_and_notify_( new_node, lk); } - channel_op_status push_and_notify_( typename node::ptr const& new_node) { + channel_op_status push_and_notify_( typename node::ptr const& new_node, + std::unique_lock< boost::fibers::mutex > & lk) { try { push_tail_( new_node); + lk.unlock(); not_empty_cond_.notify_one(); return channel_op_status::success; } catch (...) { - close_(); + close_( lk); throw; } } @@ -174,15 +178,17 @@ private: ++count_; } - value_type value_pop_() { + value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) { BOOST_ASSERT( ! is_empty_() ); try { typename node::ptr old_head = pop_head_(); if ( size_() <= lwm_) { if ( lwm_ == hwm_) { + lk.unlock(); not_full_cond_.notify_one(); } else { + lk.unlock(); // more than one producer could be waiting // to push a value not_full_cond_.notify_all(); @@ -190,7 +196,7 @@ private: } return std::move( old_head->va); } catch (...) { - close_(); + close_( lk); throw; } } @@ -262,7 +268,7 @@ public: void close() { std::unique_lock< mutex > lk( mtx_); - close_(); + close_( lk); } channel_op_status push( value_type const& va) { @@ -315,14 +321,14 @@ public: typename node::ptr new_node( new ( alloc_.allocate( 1) ) node( va, alloc_) ); std::unique_lock< mutex > lk( mtx_); - return try_push_( new_node); + return try_push_( new_node, lk); } channel_op_status try_push( value_type && va) { typename node::ptr new_node( new ( alloc_.allocate( 1) ) node( std::forward< value_type >( va), alloc_) ); std::unique_lock< mutex > lk( mtx_); - return try_push_( new_node); + return try_push_( new_node, lk); } channel_op_status pop( value_type & va) { @@ -336,7 +342,7 @@ public: return channel_op_status::closed; } - va = value_pop_(); + va = value_pop_( lk); return channel_op_status::success; } @@ -351,7 +357,7 @@ public: throw logic_error("boost fiber: queue is closed"); } - return value_pop_(); + return value_pop_( lk); } channel_op_status try_pop( value_type & va) { @@ -371,7 +377,7 @@ public: return channel_op_status::empty; } - va = value_pop_(); + va = value_pop_( lk); return channel_op_status::success; } @@ -397,7 +403,7 @@ public: return channel_op_status::closed; } - va = value_pop_(); + va = value_pop_( lk); return channel_op_status::success; } }; diff --git a/include/boost/fiber/future/detail/shared_state.hpp b/include/boost/fiber/future/detail/shared_state.hpp index 517efbb0..4678ff43 100644 --- a/include/boost/fiber/future/detail/shared_state.hpp +++ b/include/boost/fiber/future/detail/shared_state.hpp @@ -43,39 +43,42 @@ private: optional< R > value_; std::exception_ptr except_; - void mark_ready_and_notify_() { + void mark_ready_and_notify_( std::unique_lock< mutex > & lk) { ready_ = true; + lk.unlock(); waiters_.notify_all(); } - void owner_destroyed_() { + void owner_destroyed_( std::unique_lock< mutex > & lk) { if ( ! ready_) { - set_exception_( std::make_exception_ptr( broken_promise() ) ); + set_exception_( + std::make_exception_ptr( broken_promise() ), + lk); } } - void set_value_( R const& value) { + void set_value_( R const& value, std::unique_lock< mutex > & lk) { if ( ready_) { throw promise_already_satisfied(); } value_ = value; - mark_ready_and_notify_(); + mark_ready_and_notify_( lk); } - void set_value_( R && value) { + void set_value_( R && value, std::unique_lock< mutex > & lk) { if ( ready_) { throw promise_already_satisfied(); } value_ = std::move( value); - mark_ready_and_notify_(); + mark_ready_and_notify_( lk); } - void set_exception_( std::exception_ptr except) { + void set_exception_( std::exception_ptr except, std::unique_lock< mutex > & lk) { if ( ready_) { throw promise_already_satisfied(); } except_ = except; - mark_ready_and_notify_(); + mark_ready_and_notify_( lk); } R const& get_( std::unique_lock< mutex > & lk) { @@ -139,22 +142,22 @@ public: void owner_destroyed() { std::unique_lock< mutex > lk( mtx_); - owner_destroyed_(); + owner_destroyed_( lk); } void set_value( R const& value) { std::unique_lock< mutex > lk( mtx_); - set_value_( value); + set_value_( value, lk); } void set_value( R && value) { std::unique_lock< mutex > lk( mtx_); - set_value_( std::move( value) ); + set_value_( std::move( value), lk); } void set_exception( std::exception_ptr except) { std::unique_lock< mutex > lk( mtx_); - set_exception_( except); + set_exception_( except, lk); } R const& get() { @@ -210,31 +213,34 @@ private: R * value_; std::exception_ptr except_; - void mark_ready_and_notify_() { + void mark_ready_and_notify_( std::unique_lock< mutex > & lk) { ready_ = true; + lk.unlock(); waiters_.notify_all(); } - void owner_destroyed_() { + void owner_destroyed_( std::unique_lock< mutex > & lk) { if ( ! ready_) { - set_exception_( std::make_exception_ptr( broken_promise() ) ); + set_exception_( + std::make_exception_ptr( broken_promise() ), + lk); } } - void set_value_( R & value) { + void set_value_( R & value, std::unique_lock< mutex > & lk) { if ( ready_) { throw promise_already_satisfied(); } value_ = & value; - mark_ready_and_notify_(); + mark_ready_and_notify_( lk); } - void set_exception_( std::exception_ptr except) { + void set_exception_( std::exception_ptr except, std::unique_lock< mutex > & lk) { if ( ready_) { throw promise_already_satisfied(); } except_ = except; - mark_ready_and_notify_(); + mark_ready_and_notify_( lk); } R & get_( std::unique_lock< mutex > & lk) { @@ -269,7 +275,7 @@ private: } future_status wait_until_( std::unique_lock< mutex > & lk, - std::chrono::high_resolution_clock::time_point const& timeout_time) const { + std::chrono::high_resolution_clock::time_point const& timeout_time) const { while ( ! ready_) { cv_status st( waiters_.wait_until( lk, timeout_time) ); if ( cv_status::timeout == st && ! ready_) { @@ -298,17 +304,17 @@ public: void owner_destroyed() { std::unique_lock< mutex > lk( mtx_); - owner_destroyed_(); + owner_destroyed_( lk); } void set_value( R & value) { std::unique_lock< mutex > lk( mtx_); - set_value_( value); + set_value_( value, lk); } void set_exception( std::exception_ptr except) { std::unique_lock< mutex > lk( mtx_); - set_exception_( except); + set_exception_( except, lk); } R & get() { @@ -364,33 +370,36 @@ private: std::exception_ptr except_; inline - void mark_ready_and_notify_() { + void mark_ready_and_notify_( std::unique_lock< mutex > & lk) { ready_ = true; + lk.unlock(); waiters_.notify_all(); } inline - void owner_destroyed_() { + void owner_destroyed_( std::unique_lock< mutex > & lk) { if ( ! ready_) { - set_exception_( std::make_exception_ptr( broken_promise() ) ); + set_exception_( + std::make_exception_ptr( broken_promise() ), + lk); } } inline - void set_value_() { + void set_value_( std::unique_lock< mutex > & lk) { if ( ready_) { throw promise_already_satisfied(); } - mark_ready_and_notify_(); + mark_ready_and_notify_( lk); } inline - void set_exception_( std::exception_ptr except) { + void set_exception_( std::exception_ptr except, std::unique_lock< mutex > & lk) { if ( ready_) { throw promise_already_satisfied(); } except_ = except; - mark_ready_and_notify_(); + mark_ready_and_notify_( lk); } inline @@ -457,19 +466,19 @@ public: inline void owner_destroyed() { std::unique_lock< mutex > lk( mtx_); - owner_destroyed_(); + owner_destroyed_( lk); } inline void set_value() { std::unique_lock< mutex > lk( mtx_); - set_value_(); + set_value_( lk); } inline void set_exception( std::exception_ptr except) { std::unique_lock< mutex > lk( mtx_); - set_exception_( except); + set_exception_( except, lk); } inline diff --git a/include/boost/fiber/unbounded_channel.hpp b/include/boost/fiber/unbounded_channel.hpp index 93cabb57..fb280d15 100644 --- a/include/boost/fiber/unbounded_channel.hpp +++ b/include/boost/fiber/unbounded_channel.hpp @@ -93,8 +93,9 @@ private: return queue_status::closed == state_; } - void close_() { + void close_( std::unique_lock< mutex > & lk) { state_ = queue_status::closed; + lk.unlock(); not_empty_cond_.notify_all(); } @@ -103,22 +104,24 @@ private: } channel_op_status push_( typename node::ptr const& new_node, - std::unique_lock< boost::fibers::mutex > & lk) { + std::unique_lock< mutex > & lk) { if ( is_closed_() ) { return channel_op_status::closed; } - return push_and_notify_( new_node); + return push_and_notify_( new_node, lk); } - channel_op_status push_and_notify_( typename node::ptr const& new_node) { + channel_op_status push_and_notify_( typename node::ptr const& new_node, + std::unique_lock< mutex > & lk) { try { push_tail_( new_node); + lk.unlock(); not_empty_cond_.notify_one(); return channel_op_status::success; } catch (...) { - close_(); + close_( lk); throw; } } @@ -128,14 +131,14 @@ private: tail_ = & new_node->nxt; } - value_type value_pop_() { + value_type value_pop_( std::unique_lock< mutex > & lk) { BOOST_ASSERT( ! is_empty_() ); try { typename node::ptr old_head = pop_head_(); return std::move( old_head->va); } catch (...) { - close_(); + close_( lk); throw; } } @@ -165,8 +168,7 @@ public: void close() { std::unique_lock< mutex > lk( mtx_); - - close_(); + close_( lk); } channel_op_status push( value_type const& va) { @@ -194,7 +196,7 @@ public: return channel_op_status::closed; } - va = value_pop_(); + va = value_pop_( lk); return channel_op_status::success; } @@ -209,7 +211,7 @@ public: throw logic_error("boost fiber: queue is closed"); } - return value_pop_(); + return value_pop_( lk); } channel_op_status try_pop( value_type & va) { @@ -229,19 +231,19 @@ public: return channel_op_status::empty; } - va = value_pop_(); + va = value_pop_( lk); return channel_op_status::success; } template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, - std::chrono::duration< Rep, Period > const& timeout_duration) { + std::chrono::duration< Rep, Period > const& timeout_duration) { return pop_wait_until( va, std::chrono::high_resolution_clock::now() + timeout_duration); } template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, - std::chrono::time_point< Clock, Duration > const& timeout_time) { + std::chrono::time_point< Clock, Duration > const& timeout_time) { std::unique_lock< mutex > lk( mtx_); while ( ! is_closed_() && is_empty_() ) { @@ -253,7 +255,7 @@ public: return channel_op_status::closed; } - va = value_pop_(); + va = value_pop_( lk); return channel_op_status::success; } }; diff --git a/src/barrier.cpp b/src/barrier.cpp index 042904ef..89707912 100644 --- a/src/barrier.cpp +++ b/src/barrier.cpp @@ -36,6 +36,7 @@ barrier::wait() { if ( 0 == --current_) { cycle_ = ! cycle_; current_ = initial_; + lk.unlock(); // no pessimization cond_.notify_all(); return true; } else {