diff --git a/build/Jamfile.v2 b/build/Jamfile.v2 index bf8ddcf4..7f055765 100644 --- a/build/Jamfile.v2 +++ b/build/Jamfile.v2 @@ -31,10 +31,7 @@ lib boost_fiber barrier.cpp condition.cpp context.cpp - detail/fifo.cpp detail/spinlock.cpp - detail/terminated_queue.cpp - detail/waiting_queue.cpp fiber.cpp future.cpp interruption.cpp diff --git a/examples/priority.cpp b/examples/priority.cpp index 5e81b122..e18d3d5b 100644 --- a/examples/priority.cpp +++ b/examples/priority.cpp @@ -6,6 +6,7 @@ #include #include +#include #include class Verbose: public boost::noncopyable { @@ -68,14 +69,13 @@ private: //[priority_scheduler class priority_scheduler : public boost::fibers::sched_algorithm_with_properties< priority_props > { private: - // Much as we would like, we don't use std::priority_queue because it - // doesn't appear to provide any way to alter the priority (and hence - // queue position) of a particular item. - boost::fibers::context * head_; + typedef boost::fibers::detail::state_queue< boost::fibers::context > rqueue_t; + + rqueue_t rqueue_; public: priority_scheduler() : - head_( nullptr) { + rqueue_() { } // For a subclass of sched_algorithm_with_properties<>, it's important to @@ -93,19 +93,16 @@ public: // we're handed a new context*, put it at the end of the fibers // with that same priority. In other words: search for the first fiber // in the queue with LOWER priority, and insert before that one. - boost::fibers::context ** fp = & head_; - for ( ; * fp; fp = & ( * fp)->nxt) { - if ( properties( * fp).get_priority() < f_priority) { - /*< Use the - [member_link sched_algorithm_with_properties..properties] - method to access properties for any ['other] fiber. >*/ - break; + if ( rqueue_.empty() ) { + rqueue_.push_back( * f); + } else { + rqueue_t::iterator e( rqueue_.end() ); + for ( rqueue_t::iterator i( rqueue_.begin() ); i != e; ++i) { + if ( properties( & ( * i) ).get_priority() < f_priority) { + rqueue_.insert( i, * f); + } } } - // It doesn't matter whether we hit the end of the list or found - // another fiber with lower priority. Either way, insert f here. - f->nxt = * fp; /*< Note use of the [data_member_link context..nxt] member. >*/ - * fp = f; //<- std::cout << "awakened(" << props.name << "): "; @@ -118,14 +115,11 @@ public: of the next fiber to run. >>*/ virtual boost::fibers::context * pick_next() { // if ready queue is empty, just tell caller - if ( ! head_) { + if ( rqueue_.empty() ) { return nullptr; } - // Here we have at least one ready fiber. Unlink and return that. - boost::fibers::context * f = head_; - head_ = f->nxt; - f->nxt = nullptr; - + boost::fibers::context * f( & rqueue_.front() ); + rqueue_.pop_front(); //<- std::cout << "pick_next() resuming " << properties( f).name << ": "; describe_ready_queue(); @@ -136,11 +130,7 @@ public: /*<< You must override [member_link sched_algorithm_with_properties..ready_fibers] to inform the fiber manager of the size of your ready queue. >>*/ virtual std::size_t ready_fibers() const noexcept { - std::size_t count = 0; - for ( boost::fibers::context * f = head_; f; f = f->nxt) { - ++count; - } - return count; + return rqueue_.size(); } /*<< Overriding [member_link sched_algorithm_with_properties..property_change] @@ -160,12 +150,12 @@ public: // Find 'f' in the queue. Note that it might not be in our queue at // all, if caller is changing the priority of (say) the running fiber. bool found = false; - for ( boost::fibers::context ** fp = & head_; * fp; fp = & ( * fp)->nxt) { - if ( * fp == f) { + rqueue_t::iterator e( rqueue_.end() ); + for ( rqueue_t::iterator i( rqueue_.begin() ); i != e; ++i) { + if ( & ( * i) == f) { // found the passed fiber in our list -- unlink it found = true; - * fp = ( * fp)->nxt; - f->nxt = nullptr; + rqueue_.erase( i); break; } } @@ -195,12 +185,12 @@ public: //<- void describe_ready_queue() { - if ( ! head_) { + if ( rqueue_.empty() ) { std::cout << "[empty]"; } else { const char * delim = ""; - for ( boost::fibers::context * f = head_; f; f = f->nxt) { - priority_props & props( properties( f) ); + for ( boost::fibers::context & f : rqueue_) { + priority_props & props( properties( & f) ); std::cout << delim << props.name << '(' << props.get_priority() << ')'; delim = ", "; } diff --git a/examples/work_sharing.cpp b/examples/work_sharing.cpp index 87979346..a58bd5cb 100644 --- a/examples/work_sharing.cpp +++ b/examples/work_sharing.cpp @@ -91,7 +91,7 @@ void whatevah(char me) { std::thread::id my_thread = std::this_thread::get_id(); { std::ostringstream buffer; - buffer << "fiber " << me << " started on thread " << my_thread << '\n'; + //buffer << "fiber " << me << " started on thread " << my_thread << '\n'; std::cout << buffer.str() << std::flush; } for (unsigned i = 0; i < 5; ++i) { @@ -100,7 +100,7 @@ void whatevah(char me) { if (new_thread != my_thread) { my_thread = new_thread; std::ostringstream buffer; - buffer << "fiber " << me << " switched to thread " << my_thread << '\n'; + //buffer << "fiber " << me << " switched to thread " << my_thread << '\n'; std::cout << buffer.str() << std::flush; } } diff --git a/include/boost/fiber/condition.hpp b/include/boost/fiber/condition.hpp index 144398f6..31cd7962 100644 --- a/include/boost/fiber/condition.hpp +++ b/include/boost/fiber/condition.hpp @@ -9,16 +9,16 @@ #include #include -#include #include #include #include +#include #include #include +#include #include -#include #include #include #include @@ -39,8 +39,10 @@ enum class cv_status { class BOOST_FIBERS_DECL condition { private: - detail::spinlock splk_; - std::deque< context * > waiting_; + typedef detail::wait_queue< context > wqueue_t; + + detail::spinlock splk_; + wqueue_t waiting_; public: condition(); @@ -68,10 +70,10 @@ public: // lock spinlock detail::spinlock_lock lk( splk_); - BOOST_ASSERT( waiting_.end() == std::find( waiting_.begin(), waiting_.end(), f) ); // store this fiber in waiting-queue // in order notify (resume) this fiber later - waiting_.push_back( f); + BOOST_ASSERT( ! f->wait_is_linked() ); + waiting_.push_back( * f); // unlock external lt.unlock(); @@ -79,17 +81,13 @@ public: // suspend this fiber // locked spinlock will be released if this fiber // was stored inside manager's waiting-queue - context::active()->do_wait( lk); + f->do_wait( lk); // lock external again before returning lt.lock(); } catch (...) { detail::spinlock_lock lk( splk_); - std::deque< context * >::iterator i( std::find( waiting_.begin(), waiting_.end(), f) ); - if ( waiting_.end() != i) { - // remove fiber from waiting-list - waiting_.erase( i); - } + f->wait_unlink(); throw; } } @@ -105,7 +103,8 @@ public: // store this fiber in waiting-queue // in order notify (resume) this fiber later - waiting_.push_back( f); + BOOST_ASSERT( ! f->wait_is_linked() ); + waiting_.push_back( * f); // unlock external lt.unlock(); @@ -113,15 +112,11 @@ public: // suspend this fiber // locked spinlock will be released if this fiber // was stored inside manager's waiting-queue - if ( ! context::active()->do_wait_until( timeout_time, lk) ) { + if ( ! f->do_wait_until( timeout_time, lk) ) { // this fiber was not notified before timeout // lock spinlock again detail::spinlock_lock lk( splk_); - std::deque< context * >::iterator i( std::find( waiting_.begin(), waiting_.end(), f) ); - if ( waiting_.end() != i) { - // remove fiber from waiting-list - waiting_.erase( i); - } + f->wait_unlink(); status = cv_status::timeout; } @@ -130,11 +125,7 @@ public: lt.lock(); } catch (...) { detail::spinlock_lock lk( splk_); - std::deque< context * >::iterator i( std::find( waiting_.begin(), waiting_.end(), f) ); - if ( waiting_.end() != i) { - // remove fiber from waiting-list - waiting_.erase( i); - } + f->wait_unlink(); throw; } diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index bb43a1cd..0b7ee273 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -24,10 +24,11 @@ #include #include +#include #include #include +#include #include -#include #include #ifdef BOOST_HAS_ABI_HEADERS @@ -39,8 +40,12 @@ namespace fibers { class fiber; class fiber_properties; +class scheduler; +struct sched_algorithm; -class BOOST_FIBERS_DECL context { +class BOOST_FIBERS_DECL context : public detail::state_hook, + public detail::yield_hook, + public detail::wait_hook { private: enum class fiber_status { ready = 0, @@ -99,6 +104,9 @@ private: std::chrono::steady_clock::time_point tp_; fiber_properties * properties_; + bool do_wait_until_( std::chrono::steady_clock::time_point const&, + detail::spinlock_lock &); + protected: virtual void deallocate() { } @@ -164,8 +172,6 @@ public: static void active( context * active) noexcept; - context * nxt; - // main fiber context() : use_count_( 1), // allocated on stack @@ -178,8 +184,7 @@ public: waiting_(), except_(), tp_( (std::chrono::steady_clock::time_point::max)() ), - properties_( nullptr), - nxt( nullptr) { + properties_( nullptr) { } // worker fiber @@ -223,8 +228,7 @@ public: waiting_(), except_(), tp_( (std::chrono::steady_clock::time_point::max)() ), - properties_( nullptr), - nxt( nullptr) { + properties_( nullptr) { } virtual ~context(); @@ -369,15 +373,17 @@ public: void do_wait( detail::spinlock_lock &); template< typename Clock, typename Duration > - bool do_wait_until( std::chrono::time_point< Clock, Duration > const& timeout_time, + bool do_wait_until( std::chrono::time_point< Clock, Duration > const& timeout_time_, detail::spinlock_lock & lk) { - return scheduler_->wait_until( this, timeout_time, lk); + std::chrono::steady_clock::time_point timeout_time( + detail::convert_tp( timeout_time_) ); + return do_wait_until_( timeout_time, lk); } template< typename Rep, typename Period > bool do_wait_for( std::chrono::duration< Rep, Period > const& timeout_duration, detail::spinlock_lock & lk) { - return scheduler_->wait_for( this, timeout_duration, lk); + return do_wait_until_( std::chrono::steady_clock::now() + timeout_duration, lk); } void do_yield(); @@ -390,11 +396,27 @@ public: template< typename Rep, typename Period > void do_wait_interval( std::chrono::duration< Rep, Period > const& wait_interval) noexcept { - scheduler_->wait_interval( wait_interval); + // wait_interval_( wait_interval); FIXME } std::chrono::steady_clock::duration do_wait_interval() noexcept; + bool state_is_linked() { + return detail::state_hook::is_linked(); + } + + bool yield_is_linked() { + return detail::yield_hook::is_linked(); + } + + void wait_unlink() { + detail::wait_hook::unlink(); + } + + bool wait_is_linked() { + return detail::wait_hook::is_linked(); + } + friend void intrusive_ptr_add_ref( context * f) { BOOST_ASSERT( nullptr != f); ++f->use_count_; diff --git a/include/boost/fiber/detail/fifo.hpp b/include/boost/fiber/detail/fifo.hpp deleted file mode 100644 index eb18bc3b..00000000 --- a/include/boost/fiber/detail/fifo.hpp +++ /dev/null @@ -1,72 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#ifndef BOOST_FIBERS_DETAIL_FIFO_H -#define BOOST_FIBERS_DETAIL_FIFO_H - -#include -#include - -#include - -#include - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { - -class context; - -namespace detail { - -class BOOST_FIBERS_DECL fifo { -public: - fifo() noexcept : - size_( 0), - head_( nullptr), - tail_( & head_) { - } - - fifo( fifo const&) = delete; - fifo & operator=( fifo const&) = delete; - - bool empty() const noexcept { - return nullptr == head_; - } - - std::size_t size() const noexcept { - return size_; - } - - void push( context * item) noexcept; - - context * pop() noexcept; - - void swap( fifo & other) { - std::swap( head_, other.head_); - std::swap( tail_, other.tail_); - } - -private: - std::size_t size_; - context * head_; - // tail_ points to the nxt field that contains the null that marks the end - // of the fifo. When the fifo is empty, tail_ points to head_. tail_ must - // never be null: it always points to a real context*. However, in - // normal use, (*tail_) is always null. - context ** tail_; -}; - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBERS_DETAIL_FIFO_H diff --git a/include/boost/fiber/detail/queues.hpp b/include/boost/fiber/detail/queues.hpp new file mode 100644 index 00000000..471622d8 --- /dev/null +++ b/include/boost/fiber/detail/queues.hpp @@ -0,0 +1,67 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_DETAIL_QUEUES_H +#define BOOST_FIBERS_DETAIL_QUEUES_H + +#include +#include + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace detail { + +struct state_tag; +typedef intrusive::list_base_hook< + intrusive::tag< state_tag >, + intrusive::link_mode< + intrusive::safe_link + > +> state_hook; +template< typename T > +using state_queue = intrusive::list< + T, + intrusive::base_hook< state_hook > >; + +struct yield_tag; +typedef intrusive::list_base_hook< + intrusive::tag< yield_tag >, + intrusive::link_mode< + intrusive::safe_link + > +> yield_hook; +template< typename T > +using yield_queue = intrusive::list< + T, + intrusive::base_hook< yield_hook >, + intrusive::constant_time_size< false > >; + +struct wait_tag; +typedef intrusive::list_base_hook< + intrusive::tag< wait_tag >, + intrusive::link_mode< + intrusive::auto_unlink + > +> wait_hook; +template< typename T > +using wait_queue = intrusive::list< + T, + intrusive::base_hook< wait_hook >, + intrusive::constant_time_size< false > >; + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_DETAIL_QUEUES_H diff --git a/include/boost/fiber/detail/terminated_queue.hpp b/include/boost/fiber/detail/terminated_queue.hpp deleted file mode 100644 index 358b26a5..00000000 --- a/include/boost/fiber/detail/terminated_queue.hpp +++ /dev/null @@ -1,66 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#ifndef BOOST_FIBERS_DETAIL_TERMINATED_QUEUE_H -#define BOOST_FIBERS_DETAIL_TERMINATED_QUEUE_H - -#include -#include - -#include - -#include - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { - -class context; - -namespace detail { - -class BOOST_FIBERS_DECL terminated_queue { -public: - terminated_queue() noexcept : - head_( nullptr), - tail_( & head_) { - } - - ~terminated_queue() { - clear(); - } - - terminated_queue( terminated_queue const&) = delete; - terminated_queue & operator=( terminated_queue const&) = delete; - - void push( context * item) noexcept; - - void clear() noexcept; - - void swap( terminated_queue & other) { - std::swap( head_, other.head_); - std::swap( tail_, other.tail_); - } - -private: - context * head_; - // tail_ points to the nxt field that contains the null that marks the end - // of the terminated_queue. When the terminated_queue is empty, tail_ points to head_. tail_ must - // never be null: it always points to a real context*. However, in - // normal use, (*tail_) is always null. - context ** tail_; -}; - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBERS_DETAIL_TERMINATED_QUEUE_H diff --git a/include/boost/fiber/detail/waiting_queue.hpp b/include/boost/fiber/detail/waiting_queue.hpp deleted file mode 100644 index 0c489185..00000000 --- a/include/boost/fiber/detail/waiting_queue.hpp +++ /dev/null @@ -1,71 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#ifndef BOOST_FIBERS_DETAIL_WAITING_QUEUE_H -#define BOOST_FIBERS_DETAIL_WAITING_QUEUE_H - -#include - -#include -#include - -#include - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { - -class context; -struct sched_algorithm; - -namespace detail { - -class BOOST_FIBERS_DECL waiting_queue { -public: - waiting_queue() noexcept : - head_( nullptr), - tail_( nullptr) { - } - - waiting_queue( waiting_queue const&) = delete; - waiting_queue & operator=( waiting_queue const&) = delete; - - bool empty() const noexcept { - return nullptr == head_; - } - - void push( context * item) noexcept; - - context * top() const noexcept { - BOOST_ASSERT( ! empty() ); - - return head_; - } - - void move_to( sched_algorithm *); - - void interrupt_all() noexcept; - - void swap( waiting_queue & other) noexcept { - std::swap( head_, other.head_); - std::swap( tail_, other.tail_); - } - -private: - context * head_; - context * tail_; -}; - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBERS_DETAIL_WAITING_QUEUE_H diff --git a/include/boost/fiber/mutex.hpp b/include/boost/fiber/mutex.hpp index 83794a87..dba3353f 100644 --- a/include/boost/fiber/mutex.hpp +++ b/include/boost/fiber/mutex.hpp @@ -7,13 +7,12 @@ #ifndef BOOST_FIBERS_MUTEX_H #define BOOST_FIBERS_MUTEX_H -#include - #include -#include -#include #include +#include +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -29,10 +28,12 @@ private: unlocked }; - detail::spinlock splk_; - mutex_status state_; - context::id owner_; - std::deque< context * > waiting_; + typedef detail::wait_queue< context > wqueue_t; + + detail::spinlock splk_; + mutex_status state_; + context::id owner_; + wqueue_t waiting_; bool lock_if_unlocked_(); diff --git a/include/boost/fiber/properties.hpp b/include/boost/fiber/properties.hpp index 254a027d..a5b41dc5 100644 --- a/include/boost/fiber/properties.hpp +++ b/include/boost/fiber/properties.hpp @@ -47,8 +47,8 @@ public: // pointer to its context. fiber_properties( context* f): fiber_( f), - sched_algo_( nullptr) - {} + sched_algo_( nullptr) { + } // We need a virtual destructor (hence a vtable) because fiber_properties // is stored polymorphically (as fiber_properties*) in context, and diff --git a/include/boost/fiber/recursive_mutex.hpp b/include/boost/fiber/recursive_mutex.hpp index 5b99e80d..c0ea0625 100644 --- a/include/boost/fiber/recursive_mutex.hpp +++ b/include/boost/fiber/recursive_mutex.hpp @@ -10,13 +10,13 @@ #define BOOST_FIBERS_RECURSIVE_MUTEX_H #include -#include #include -#include -#include #include +#include +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -32,11 +32,13 @@ private: unlocked }; - detail::spinlock splk_; - mutex_status state_; - context::id owner_; - std::size_t count_; - std::deque< context * > waiting_; + typedef detail::wait_queue< context > wqueue_t; + + detail::spinlock splk_; + mutex_status state_; + context::id owner_; + std::size_t count_; + wqueue_t waiting_; bool lock_if_unlocked_(); diff --git a/include/boost/fiber/recursive_timed_mutex.hpp b/include/boost/fiber/recursive_timed_mutex.hpp index 11fb8632..b978559b 100644 --- a/include/boost/fiber/recursive_timed_mutex.hpp +++ b/include/boost/fiber/recursive_timed_mutex.hpp @@ -11,14 +11,14 @@ #include #include -#include #include +#include #include #include +#include #include -#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -34,11 +34,13 @@ private: unlocked }; - detail::spinlock splk_; - mutex_status state_; - context::id owner_; - std::size_t count_; - std::deque< context * > waiting_; + typedef detail::wait_queue< context > wqueue_t; + + detail::spinlock splk_; + mutex_status state_; + context::id owner_; + std::size_t count_; + wqueue_t waiting_; bool lock_if_unlocked_(); diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp index 36f83651..a2bf56ad 100644 --- a/include/boost/fiber/round_robin.hpp +++ b/include/boost/fiber/round_robin.hpp @@ -11,8 +11,9 @@ #include #include +#include #include -#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -25,7 +26,7 @@ class context; class BOOST_FIBERS_DECL round_robin : public sched_algorithm { private: - typedef detail::fifo rqueue_t; + typedef detail::state_queue< context > rqueue_t; rqueue_t rqueue_; diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index b5f55a24..1a9d767d 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -14,11 +14,11 @@ #include #include +#include #include #include +#include #include -#include -#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -30,23 +30,21 @@ namespace fibers { class context; struct sched_algorithm; -struct BOOST_FIBERS_DECL scheduler { +class BOOST_FIBERS_DECL scheduler { private: - typedef detail::waiting_queue wqueue_t; - typedef detail::terminated_queue tqueue_t; + typedef detail::state_queue< context > tqueue_t; + typedef detail::state_queue< context > wqueue_t; + typedef detail::yield_queue< context > yqueue_t; std::unique_ptr< sched_algorithm > sched_algo_; context * main_context_; wqueue_t wqueue_; tqueue_t tqueue_; + yqueue_t yqueue_; std::chrono::steady_clock::duration wait_interval_; void resume_( context *, context *); - bool wait_until_( context *, - std::chrono::steady_clock::time_point const&, - detail::spinlock_lock &); - public: scheduler( context *) noexcept; @@ -61,22 +59,9 @@ public: void wait( context *, detail::spinlock_lock &); - template< typename Clock, typename Duration > - bool wait_until( context * af, - std::chrono::time_point< Clock, Duration > const& timeout_time_, - detail::spinlock_lock & lk) { - std::chrono::steady_clock::time_point timeout_time( - detail::convert_tp( timeout_time_) ); - return wait_until_( af, timeout_time, lk); - } - - template< typename Rep, typename Period > - bool wait_for( context * af, - std::chrono::duration< Rep, Period > const& timeout_duration, - detail::spinlock_lock & lk) { - return wait_until_( - af, std::chrono::steady_clock::now() + timeout_duration, lk); - } + bool wait_until( context *, + std::chrono::steady_clock::time_point const&, + detail::spinlock_lock &); void yield( context *); diff --git a/include/boost/fiber/timed_mutex.hpp b/include/boost/fiber/timed_mutex.hpp index 282e00b4..318ba260 100644 --- a/include/boost/fiber/timed_mutex.hpp +++ b/include/boost/fiber/timed_mutex.hpp @@ -8,14 +8,14 @@ #define BOOST_FIBERS_TIMED_MUTEX_H #include -#include #include +#include #include #include +#include #include -#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -31,10 +31,12 @@ private: unlocked }; - detail::spinlock splk_; - mutex_status state_; - context::id owner_; - std::deque< context * > waiting_; + typedef detail::wait_queue< context > wqueue_t; + + detail::spinlock splk_; + mutex_status state_; + context::id owner_; + wqueue_t waiting_; bool lock_if_unlocked_(); diff --git a/src/condition.cpp b/src/condition.cpp index edf73ceb..94208af8 100644 --- a/src/condition.cpp +++ b/src/condition.cpp @@ -31,7 +31,7 @@ condition::notify_one() { detail::spinlock_lock lk( splk_); // get one waiting fiber if ( ! waiting_.empty() ) { - f = waiting_.front(); + f = & waiting_.front(); waiting_.pop_front(); } lk.unlock(); @@ -44,7 +44,7 @@ condition::notify_one() { void condition::notify_all() { - std::deque< context * > waiting; + wqueue_t waiting; detail::spinlock_lock lk( splk_); // get all waiting fibers @@ -52,11 +52,9 @@ condition::notify_all() { lk.unlock(); // notify all waiting fibers - while ( ! waiting.empty() ) { - context * f( waiting.front() ); - waiting.pop_front(); - BOOST_ASSERT( nullptr != f); - f->set_ready(); + for ( context & f : waiting) { + f.set_ready(); + // f->wait_unlink(); ? } } diff --git a/src/context.cpp b/src/context.cpp index 116692d7..a3be082f 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -10,6 +10,7 @@ #include "boost/fiber/exceptions.hpp" #include "boost/fiber/fiber.hpp" #include "boost/fiber/properties.hpp" +#include "boost/fiber/scheduler.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -144,6 +145,15 @@ context::set_properties( fiber_properties * props) { properties_ = props; } +bool +context::do_wait_until_( std::chrono::steady_clock::time_point const& time_point, + detail::spinlock_lock & lk) { + BOOST_ASSERT( nullptr != scheduler_); + BOOST_ASSERT( this == active_); + + return scheduler_->wait_until( this, time_point, lk); +} + void context::do_spawn( fiber const& f) { BOOST_ASSERT( nullptr != scheduler_); diff --git a/src/detail/fifo.cpp b/src/detail/fifo.cpp deleted file mode 100644 index 68f8411f..00000000 --- a/src/detail/fifo.cpp +++ /dev/null @@ -1,52 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#include "boost/fiber/detail/fifo.hpp" - -#include - -#include - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { -namespace detail { - -void -fifo::push( context * item) noexcept { - BOOST_ASSERT( nullptr != item); - BOOST_ASSERT( nullptr == item->nxt); - - // * tail_ holds the null marking the end of the fifo. So we can extend - // the fifo by assigning to * tail_. - * tail_ = item; - // Advance tail_ to point to the new end marker. - tail_ = & item->nxt; - ++size_; -} - -context * -fifo::pop() noexcept { - BOOST_ASSERT( ! empty() ); - - context * item( head_); - head_ = head_->nxt; - if ( nullptr == head_) { - tail_ = & head_; - } - item->nxt = nullptr; - --size_; - return item; -} - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif diff --git a/src/detail/terminated_queue.cpp b/src/detail/terminated_queue.cpp deleted file mode 100644 index ab3681a2..00000000 --- a/src/detail/terminated_queue.cpp +++ /dev/null @@ -1,50 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#include "boost/fiber/detail/terminated_queue.hpp" - -#include - -#include "boost/fiber/context.hpp" - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { -namespace detail { - -void -terminated_queue::push( context * item) noexcept { - BOOST_ASSERT( nullptr != item); - BOOST_ASSERT( nullptr == item->nxt); - - // * tail_ holds the null marking the end of the terminated_queue. So we can extend - // the terminated_queue by assigning to * tail_. - * tail_ = item; - // Advance tail_ to point to the new end marker. - tail_ = & item->nxt; -} - -void -terminated_queue::clear() noexcept { - while ( nullptr != head_) { - context * item( head_); - head_ = head_->nxt; - if ( nullptr == head_) { - tail_ = & head_; - } - // should call ~context() - intrusive_ptr_release( item); - } -} - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif diff --git a/src/detail/waiting_queue.cpp b/src/detail/waiting_queue.cpp deleted file mode 100644 index b14ad1d8..00000000 --- a/src/detail/waiting_queue.cpp +++ /dev/null @@ -1,103 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#include "boost/fiber/detail/waiting_queue.hpp" - -#include -#include -#include - -#include -#include -#include - -#include "boost/fiber/algorithm.hpp" -#include "boost/fiber/detail/config.hpp" -#include "boost/fiber/context.hpp" - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { -namespace detail { - -void -waiting_queue::push( context * item) noexcept { - BOOST_ASSERT( nullptr != item); - BOOST_ASSERT( nullptr == item->nxt); - - if ( nullptr == head_) { - head_ = tail_ = item; - } else { - tail_->nxt = item; - tail_ = tail_->nxt; - } -} - -void -waiting_queue::move_to( sched_algorithm * sched_algo) { - BOOST_ASSERT( nullptr != sched_algo); - - std::chrono::steady_clock::time_point now( - std::chrono::steady_clock::now() ); - - context * prev = head_; - for ( context * f( head_); nullptr != f;) { - BOOST_ASSERT( ! f->is_running() ); - BOOST_ASSERT( ! f->is_terminated() ); - - // set fiber to state_ready if deadline was reached - // set fiber to state_ready if interruption was requested - if ( f->time_point() <= now || f->interruption_requested() ) { - f->set_ready(); - } - - if ( ! f->is_ready() ) { - prev = f; - f = f->nxt; - } else { - if ( head_ == f) { - head_ = f->nxt; - prev = head_; - context * item = f; - f = head_; - if ( nullptr == head_) { - tail_ = head_; - } - item->nxt = nullptr; - // Pass the newly-unlinked context* to sched_algo. - item->time_point_reset(); - sched_algo->awakened( item); - } else { - prev->nxt = f->nxt; - if ( nullptr == prev->nxt) { - tail_ = prev; - } - context * item = f; - f = f->nxt; - item->nxt = nullptr; - // Pass the newly-unlinked context* to sched_algo. - item->time_point_reset(); - sched_algo->awakened( item); - } - } - } -} - -void -waiting_queue::interrupt_all() noexcept { - for ( context * f( head_); nullptr != f; f = f->nxt) { - f->request_interruption( true); - } -} - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif diff --git a/src/mutex.cpp b/src/mutex.cpp index fd8a5877..5d2f1438 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -57,8 +57,8 @@ mutex::lock() { } // store this fiber in order to be notified later - BOOST_ASSERT( waiting_.end() == std::find( waiting_.begin(), waiting_.end(), f) ); - waiting_.push_back( f); + BOOST_ASSERT( ! f->wait_is_linked() ); + waiting_.push_back( * f); // suspend this fiber context::active()->do_wait( lk); @@ -88,7 +88,7 @@ mutex::unlock() { detail::spinlock_lock lk( splk_); context * f( nullptr); if ( ! waiting_.empty() ) { - f = waiting_.front(); + f = & waiting_.front(); waiting_.pop_front(); BOOST_ASSERT( nullptr != f); } diff --git a/src/recursive_mutex.cpp b/src/recursive_mutex.cpp index 0e5bc13e..f739a077 100644 --- a/src/recursive_mutex.cpp +++ b/src/recursive_mutex.cpp @@ -63,8 +63,8 @@ recursive_mutex::lock() { } // store this fiber in order to be notified later - BOOST_ASSERT( waiting_.end() == std::find( waiting_.begin(), waiting_.end(), f) ); - waiting_.push_back( f); + BOOST_ASSERT( ! f->wait_is_linked() ); + waiting_.push_back( * f); // suspend this fiber context::active()->do_wait( lk); @@ -95,7 +95,7 @@ recursive_mutex::unlock() { context * f( nullptr); if ( 0 == --count_) { if ( ! waiting_.empty() ) { - f = waiting_.front(); + f = & waiting_.front(); waiting_.pop_front(); BOOST_ASSERT( nullptr != f); } diff --git a/src/recursive_timed_mutex.cpp b/src/recursive_timed_mutex.cpp index 82a3b724..1bb1fae2 100644 --- a/src/recursive_timed_mutex.cpp +++ b/src/recursive_timed_mutex.cpp @@ -63,8 +63,8 @@ recursive_timed_mutex::lock() { } // store this fiber in order to be notified later - BOOST_ASSERT( waiting_.end() == std::find( waiting_.begin(), waiting_.end(), f) ); - waiting_.push_back( f); + BOOST_ASSERT( ! f->wait_is_linked() ); + waiting_.push_back( * f); // suspend this fiber context::active()->do_wait( lk); @@ -102,17 +102,13 @@ recursive_timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point co } // store this fiber in order to be notified later - BOOST_ASSERT( waiting_.end() == std::find( waiting_.begin(), waiting_.end(), f) ); - waiting_.push_back( f); + BOOST_ASSERT( ! f->wait_is_linked() ); + waiting_.push_back( * f); // suspend this fiber until notified or timed-out if ( ! context::active()->do_wait_until( timeout_time, lk) ) { lk.lock(); - std::deque< context * >::iterator i( std::find( waiting_.begin(), waiting_.end(), f) ); - if ( waiting_.end() != i) { - // remove fiber from waiting-list - waiting_.erase( i); - } + f->wait_unlink(); lk.unlock(); return false; } @@ -128,7 +124,7 @@ recursive_timed_mutex::unlock() { context * f( nullptr); if ( 0 == --count_) { if ( ! waiting_.empty() ) { - f = waiting_.front(); + f = & waiting_.front(); waiting_.pop_front(); BOOST_ASSERT( nullptr != f); } diff --git a/src/round_robin.cpp b/src/round_robin.cpp index cb1f3dc6..124e55de 100644 --- a/src/round_robin.cpp +++ b/src/round_robin.cpp @@ -8,8 +8,6 @@ #include -#include "boost/fiber/context.hpp" - #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX #endif @@ -21,14 +19,16 @@ void round_robin::awakened( context * f) { BOOST_ASSERT( nullptr != f); - rqueue_.push( f); + BOOST_ASSERT( ! f->state_is_linked() ); + rqueue_.push_back( * f); } context * round_robin::pick_next() { context * victim( nullptr); if ( ! rqueue_.empty() ) { - victim = rqueue_.pop(); + victim = & rqueue_.front(); + rqueue_.pop_front(); BOOST_ASSERT( nullptr != victim); } return victim; diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 51c86e52..514258aa 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -14,7 +14,6 @@ #include "boost/fiber/algorithm.hpp" #include "boost/fiber/exceptions.hpp" -#include "boost/fiber/context.hpp" #include "boost/fiber/interruption.hpp" #include "boost/fiber/round_robin.hpp" @@ -30,6 +29,7 @@ scheduler::scheduler( context * main_context) noexcept : main_context_( main_context), wqueue_(), tqueue_(), + yqueue_(), wait_interval_( std::chrono::milliseconds( 10) ) { } @@ -39,10 +39,35 @@ scheduler::~scheduler() noexcept { // NOTE: at this stage the fibers in the waiting-queue // can only be detached fibers // interrupt all waiting fibers (except main-fiber) - wqueue_.interrupt_all(); + for ( context & f : wqueue_) { + f.request_interruption( true); + } // move all fibers which are ready (state_ready) // from waiting-queue to the runnable-queue - wqueue_.move_to( sched_algo_.get() ); + std::chrono::steady_clock::time_point now( + std::chrono::steady_clock::now() ); + + wqueue_t::iterator e = wqueue_.end(); + for ( wqueue_t::iterator i = wqueue_.begin(); i != e;) { + context * f = & ( * i); + BOOST_ASSERT( ! f->is_running() ); + BOOST_ASSERT( ! f->is_terminated() ); + + // set fiber to state_ready if deadline was reached + // set fiber to state_ready if interruption was requested + if ( f->time_point() <= now || f->interruption_requested() ) { + f->set_ready(); + } + + if ( f->is_ready() ) { + i = wqueue_.erase( i); + // Pass the newly-unlinked context* to sched_algo. + f->time_point_reset(); + sched_algo_->awakened( f); + } else { + ++i; + } + } // pop new fiber from ready-queue context * f( sched_algo_->pick_next() ); if ( f) { @@ -56,7 +81,8 @@ scheduler::~scheduler() noexcept { // set main-fiber to state_waiting main_context_->set_waiting(); // push main-fiber to waiting-queue - wqueue_.push( main_context_); + BOOST_ASSERT( ! main_context_->state_is_linked() ); + wqueue_.push_back( * main_context_); // resume fiber f resume_( main_context_, f); } else if ( wqueue_.empty() ) { @@ -88,35 +114,13 @@ scheduler::resume_( context * af, context * f) { context::active( f); // push terminated fibers to terminated-queue if ( af->is_terminated() ) { - tqueue_.push( af); + BOOST_ASSERT( ! af->state_is_linked() ); + tqueue_.push_back( * af); } // resume active-fiber == f f->resume(); } -bool -scheduler::wait_until_( context * af, - std::chrono::steady_clock::time_point const& timeout_time, - detail::spinlock_lock & lk) { - BOOST_ASSERT( nullptr != af); - BOOST_ASSERT( context::active() == af); - BOOST_ASSERT( af->is_running() ); - // set active-fiber to state_waiting - af->set_waiting(); - // release lock - lk.unlock(); - // push active-fiber to waiting-queue - af->time_point( timeout_time); - wqueue_.push( af); - // switch to another fiber - run( af); - // fiber has been resumed - // check if fiber was interrupted - this_fiber::interruption_point(); - // check if timeout has reached - return std::chrono::steady_clock::now() < timeout_time; -} - void scheduler::spawn( context * f) { BOOST_ASSERT( nullptr != f); @@ -131,17 +135,56 @@ scheduler::run( context * af) { BOOST_ASSERT( nullptr != af); BOOST_ASSERT( context::active() == af); for (;;) { - // move all fibers which are ready (state_ready) - // from waiting-queue to the runnable-queue - wqueue_.move_to( sched_algo_.get() ); + { + // move all fibers in yield-queue to ready-queue + yqueue_t::iterator e = yqueue_.end(); + for ( yqueue_t::iterator i = yqueue_.begin(); i != e;) { + context * f = & ( * i); + BOOST_ASSERT( f->is_ready() ); + + i = yqueue_.erase( i); + sched_algo_->awakened( f); + } + } + { + // move all fibers which are ready (state_ready) + // from waiting-queue to the ready-queue + std::chrono::steady_clock::time_point now( + std::chrono::steady_clock::now() ); + wqueue_t::iterator e = wqueue_.end(); + for ( wqueue_t::iterator i = wqueue_.begin(); i != e;) { + context * f = & ( * i); + BOOST_ASSERT( ! f->is_running() ); + BOOST_ASSERT( ! f->is_terminated() ); + // set fiber to state_ready if deadline was reached + // set fiber to state_ready if interruption was requested + if ( f->time_point() <= now || f->interruption_requested() ) { + f->set_ready(); + } + if ( f->is_ready() ) { + i = wqueue_.erase( i); + // Pass the newly-unlinked context* to sched_algo. + f->time_point_reset(); + sched_algo_->awakened( f); + } else { + ++i; + } + } + } // pop new fiber from ready-queue context * f( sched_algo_->pick_next() ); if ( f) { BOOST_ASSERT_MSG( f->is_ready(), "fiber with invalid state in ready-queue"); // resume fiber f resume_( af, f); + // FIXME +#if 0 // destroy terminated fibers from terminated-queue - tqueue_.clear(); + for ( tqueue_t::iterator i = tqueue_.begin(); i != e;) { + //BOOST_ASSERT( i->is_terminated() ); + //i = tqueue_.erase( i); + } +#endif return; } else { // no fibers ready to run; the thread should sleep @@ -160,6 +203,30 @@ scheduler::wait( context * af, detail::spinlock_lock & lk) { lk); } +bool +scheduler::wait_until( context * af, + std::chrono::steady_clock::time_point const& timeout_time, + detail::spinlock_lock & lk) { + BOOST_ASSERT( nullptr != af); + BOOST_ASSERT( context::active() == af); + BOOST_ASSERT( af->is_running() ); + // set active-fiber to state_waiting + af->set_waiting(); + // release lock + lk.unlock(); + // push active-fiber to waiting-queue + af->time_point( timeout_time); + BOOST_ASSERT( ! af->state_is_linked() ); + wqueue_.push_back( * af); + // switch to another fiber + run( af); + // fiber has been resumed + // check if fiber was interrupted + this_fiber::interruption_point(); + // check if timeout has reached + return std::chrono::steady_clock::now() < timeout_time; +} + void scheduler::yield( context * af) { BOOST_ASSERT( nullptr != af); @@ -167,8 +234,9 @@ scheduler::yield( context * af) { BOOST_ASSERT( af->is_running() ); // set active-fiber to state_waiting af->set_ready(); - // push active-fiber to wqueue_ - wqueue_.push( af); + // push active-fiber to yqueue_ + BOOST_ASSERT( ! af->yield_is_linked() ); + yqueue_.push_back( * af); // switch to another fiber run( af); // fiber has been resumed @@ -186,7 +254,8 @@ scheduler::join( context * af, context * f) { // set active-fiber to state_waiting af->set_waiting(); // push active-fiber to waiting-queue - wqueue_.push( af); + BOOST_ASSERT( ! af->state_is_linked() ); + wqueue_.push_back( * af); // add active-fiber to joinig-list of f if ( ! f->join( af) ) { // f must be already terminated therefore we set diff --git a/src/timed_mutex.cpp b/src/timed_mutex.cpp index 835cbd82..81482e35 100644 --- a/src/timed_mutex.cpp +++ b/src/timed_mutex.cpp @@ -57,8 +57,8 @@ timed_mutex::lock() { } // store this fiber in order to be notified later - BOOST_ASSERT( waiting_.end() == std::find( waiting_.begin(), waiting_.end(), f) ); - waiting_.push_back( f); + BOOST_ASSERT( ! f->wait_is_linked() ); + waiting_.push_back( * f); // suspend this fiber context::active()->do_wait( lk); @@ -96,17 +96,13 @@ timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point const& timeo } // store this fiber in order to be notified later - BOOST_ASSERT( waiting_.end() == std::find( waiting_.begin(), waiting_.end(), f) ); - waiting_.push_back( f); + BOOST_ASSERT( ! f->wait_is_linked() ); + waiting_.push_back( * f); // suspend this fiber until notified or timed-out if ( ! context::active()->do_wait_until( timeout_time, lk) ) { lk.lock(); - std::deque< context * >::iterator i( std::find( waiting_.begin(), waiting_.end(), f) ); - if ( waiting_.end() != i) { - // remove fiber from waiting-list - waiting_.erase( i); - } + f->wait_unlink(); lk.unlock(); return false; } @@ -121,7 +117,7 @@ timed_mutex::unlock() { detail::spinlock_lock lk( splk_); context * f( nullptr); if ( ! waiting_.empty() ) { - f = waiting_.front(); + f = & waiting_.front(); waiting_.pop_front(); BOOST_ASSERT( nullptr != f); }