From 9f7f74f6629a892ea4a024c77a7b77b75beb0bbf Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Sat, 15 Mar 2014 19:16:13 +0100 Subject: [PATCH] use symmetric_coroutine<>::yield_type::operator()() --- examples/segmented_stack.cpp | 2 +- include/boost/fiber/algorithm.hpp | 2 +- include/boost/fiber/asio/round_robin.hpp | 2 +- include/boost/fiber/detail/worker_fiber.hpp | 82 ++++++++-- include/boost/fiber/detail/worker_object.hpp | 119 +++------------ include/boost/fiber/fiber.hpp | 48 +++--- include/boost/fiber/round_robin.hpp | 6 +- performance/preallocated_stack_allocator.hpp | 13 +- src/asio/round_robin.cpp | 6 +- src/detail/worker_fiber.cpp | 70 +++++---- src/round_robin.cpp | 150 +++++++++++-------- test/test_barrier.cpp | 4 - test/test_fiber.cpp | 8 +- test/test_mutex.cpp | 2 - 14 files changed, 256 insertions(+), 258 deletions(-) diff --git a/examples/segmented_stack.cpp b/examples/segmented_stack.cpp index 1e11cf4e..8859f000 100644 --- a/examples/segmented_stack.cpp +++ b/examples/segmented_stack.cpp @@ -56,7 +56,7 @@ int main( int argc, char * argv[]) std::cout << "application should not fail" << std::endl; #else std::cout << "using standard stacks: allocates " << count << " * 4kB == " << 4 * count << "kB on stack, "; - std::cout << "initial stack size = " << boost::coroutines::stack_allocator::default_stacksize() / 1024 << "kB" << std::endl; + std::cout << "initial stack size = " << boost::coroutines::stack_allocator::traits_type::default_size() / 1024 << "kB" << std::endl; std::cout << "application might fail" << std::endl; #endif diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp index 1444986a..ea9cc356 100644 --- a/include/boost/fiber/algorithm.hpp +++ b/include/boost/fiber/algorithm.hpp @@ -39,7 +39,7 @@ struct algorithm : private noncopyable virtual detail::worker_fiber::ptr_t active() BOOST_NOEXCEPT = 0; - virtual bool run() = 0; + virtual void run() = 0; virtual void wait( unique_lock< detail::spinlock > &) = 0; virtual bool wait_until( clock_type::time_point const&, diff --git a/include/boost/fiber/asio/round_robin.hpp b/include/boost/fiber/asio/round_robin.hpp index 4faaf7aa..45a2670f 100644 --- a/include/boost/fiber/asio/round_robin.hpp +++ b/include/boost/fiber/asio/round_robin.hpp @@ -79,7 +79,7 @@ public: detail::worker_fiber::ptr_t active() BOOST_NOEXCEPT { return active_fiber_; } - bool run(); + void run(); void wait( unique_lock< detail::spinlock > &); bool wait_until( clock_type::time_point const&, diff --git a/include/boost/fiber/detail/worker_fiber.hpp b/include/boost/fiber/detail/worker_fiber.hpp index cdc8d907..b293c7f9 100644 --- a/include/boost/fiber/detail/worker_fiber.hpp +++ b/include/boost/fiber/detail/worker_fiber.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,7 @@ #include #include #include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -40,6 +42,8 @@ namespace boost { namespace fibers { namespace detail { +namespace coro = boost::coroutines; + class BOOST_FIBERS_DECL worker_fiber : public fiber_base { public: @@ -74,21 +78,30 @@ private: }; typedef std::map< uintptr_t, fss_data > fss_data_t; + typedef coro::symmetric_coroutine< + void + > coro_t; fss_data_t fss_data_; + void trampoline_( coro_t::yield_type &); + protected: - atomic< state_t > state_; - atomic< int > flags_; - atomic< int > priority_; - exception_ptr except_; - spinlock splk_; - std::vector< ptr_t > waiting_; + coro_t::yield_type * callee_; + coro_t::call_type caller_; + atomic< state_t > state_; + atomic< int > flags_; + atomic< int > priority_; + exception_ptr except_; + spinlock splk_; + std::vector< ptr_t > waiting_; void release(); + virtual void run() = 0; + public: - worker_fiber(); + worker_fiber( attributes const&); virtual ~worker_fiber(); @@ -130,13 +143,29 @@ public: bool is_waiting() const BOOST_NOEXCEPT { return WAITING == state_; } - void set_terminated() BOOST_NOEXCEPT; + void set_terminated() BOOST_NOEXCEPT + { + state_t previous = state_.exchange( TERMINATED); + BOOST_ASSERT( RUNNING == previous); + } - void set_ready() BOOST_NOEXCEPT; + void set_ready() BOOST_NOEXCEPT + { + state_t previous = state_.exchange( READY); + BOOST_ASSERT( WAITING == previous || RUNNING == previous || READY == previous); + } - void set_running() BOOST_NOEXCEPT; + void set_running() BOOST_NOEXCEPT + { + state_t previous = state_.exchange( RUNNING); + BOOST_ASSERT( READY == previous); + } - void set_waiting() BOOST_NOEXCEPT; + void set_waiting() BOOST_NOEXCEPT + { + state_t previous = state_.exchange( WAITING); + BOOST_ASSERT( RUNNING == previous); + } void * get_fss_data( void const* vp) const; @@ -151,9 +180,36 @@ public: void rethrow() const; - virtual void resume() = 0; + void resume( worker_fiber * f) + { + if ( 0 == f) + { + BOOST_ASSERT( caller_); + BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm - virtual void suspend() = 0; + // called from main-fiber + caller_(); + } + else + { + // caller from worker-fiber f + BOOST_ASSERT( caller_); + BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm + BOOST_ASSERT( f->callee_); + + ( * f->callee_)( caller_); + } + } + + void suspend() + { + BOOST_ASSERT( callee_); + BOOST_ASSERT( * callee_); + + ( * callee_)(); + + BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm + } }; }}} diff --git a/include/boost/fiber/detail/worker_object.hpp b/include/boost/fiber/detail/worker_object.hpp index 4118a467..83dc778d 100644 --- a/include/boost/fiber/detail/worker_object.hpp +++ b/include/boost/fiber/detail/worker_object.hpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -35,81 +34,19 @@ namespace boost { namespace fibers { namespace detail { -namespace coro = boost::coroutines; - -template< typename Fn, typename StackAllocator, typename Allocator > +template< typename Fn, typename Allocator > class worker_object : public worker_fiber { public: typedef typename Allocator::template rebind< - worker_object< Fn, StackAllocator, Allocator > - >::other allocator_t; + worker_object< Fn, Allocator > + >::other allocator_t; -private: - typedef worker_fiber base_t; - typedef coro::symmetric_coroutine< - void, StackAllocator - > coro_t; - - Fn fn_; - typename coro_t::yield_type * callee_; - typename coro_t::call_type caller_; - allocator_t alloc_; - - static void destroy_( allocator_t & alloc, worker_object * p) - { - alloc.destroy( p); - alloc.deallocate( p, 1); - } - - worker_object( worker_object &); - worker_object & operator=( worker_object const&); - - void trampoline_( typename coro_t::yield_type & yield) - { - BOOST_ASSERT( yield); - BOOST_ASSERT( ! is_terminated() ); - - callee_ = & yield; - set_running(); - suspend(); - - try - { - BOOST_ASSERT( is_running() ); - fn_(); - BOOST_ASSERT( is_running() ); - } - catch ( coro::detail::forced_unwind const&) - { - set_terminated(); - release(); - throw; - } - catch ( fiber_interrupted const&) - { except_ = current_exception(); } - catch (...) - { std::terminate(); } - - set_terminated(); - release(); - suspend(); - - BOOST_ASSERT_MSG( false, "fiber already terminated"); - } - -public: #ifndef BOOST_NO_RVALUE_REFERENCES worker_object( Fn && fn, attributes const& attrs, - StackAllocator const& stack_alloc, - allocator_t const& alloc) : - base_t(), + allocator_t const& alloc) : + base_t( attrs), fn_( forward< Fn >( fn) ), - callee_( 0), - caller_( - boost::bind( & worker_object::trampoline_, this, _1), - attrs, - stack_alloc), alloc_( alloc) { BOOST_ASSERT( caller_); @@ -124,15 +61,9 @@ public: } #else worker_object( Fn fn, attributes const& attrs, - StackAllocator const& stack_alloc, - allocator_t const& alloc) : - base_t(), + allocator_t const& alloc) : + base_t( attrs), fn_( fn), - callee_( 0), - caller_( - boost::bind( & worker_object::trampoline_, this, _1), - attrs, - stack_alloc), alloc_( alloc) { BOOST_ASSERT( caller_); @@ -147,15 +78,9 @@ public: } worker_object( BOOST_RV_REF( Fn) fn, attributes const& attrs, - StackAllocator const& stack_alloc, - allocator_t const& alloc) : - base_t(), + allocator_t const& alloc) : + base_t( attrs), fn_( fn), - callee_( 0), - caller_( - boost::bind( & worker_object::trampoline_, this, _1), - attrs, - stack_alloc), alloc_( alloc) { BOOST_ASSERT( caller_); @@ -173,23 +98,23 @@ public: void deallocate_object() { destroy_( alloc_, this); } - void resume() - { - BOOST_ASSERT( caller_); - BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm +private: + typedef worker_fiber base_t; - caller_(); + Fn fn_; + allocator_t alloc_; + + static void destroy_( allocator_t & alloc, worker_object * p) + { + alloc.destroy( p); + alloc.deallocate( p, 1); } - void suspend() - { - BOOST_ASSERT( callee_); - BOOST_ASSERT( * callee_); + worker_object( worker_object &); + worker_object & operator=( worker_object const&); - ( * callee_)(); - - BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm - } + void run() + { fn_(); } }; }}} diff --git a/include/boost/fiber/fiber.hpp b/include/boost/fiber/fiber.hpp index fc5bb891..55e3f64b 100644 --- a/include/boost/fiber/fiber.hpp +++ b/include/boost/fiber/fiber.hpp @@ -78,12 +78,12 @@ public: impl_() { typedef detail::worker_object< - fiber_fn, stack_allocator, std::allocator< fiber > + fiber_fn, std::allocator< fiber > > object_t; object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( forward< fiber_fn >( fn), attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( forward< fiber_fn >( fn), attr, a) ); start_fiber_(); } @@ -94,12 +94,12 @@ public: impl_() { typedef detail::worker_object< - fiber_fn, StackAllocator, std::allocator< fiber > + fiber_fn, std::allocator< fiber > > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( forward< fiber_fn >( fn), attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( forward< fiber_fn >( fn), attr, a) ); start_fiber_(); } @@ -110,12 +110,12 @@ public: impl_() { typedef detail::worker_object< - fiber_fn, StackAllocator, Allocator + fiber_fn, Allocator > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( forward< fiber_fn >( fn), attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( forward< fiber_fn >( fn), attr, a) ); start_fiber_(); } #endif @@ -126,12 +126,12 @@ public: impl_() { typedef detail::worker_object< - Fn, stack_allocator, std::allocator< fiber > + Fn, std::allocator< fiber > > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( forward< Fn >( fn), attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( forward< Fn >( fn), attr, a) ); start_fiber_(); } @@ -142,12 +142,12 @@ public: impl_() { typedef detail::worker_object< - Fn, StackAllocator, std::allocator< fiber > + Fn, std::allocator< fiber > > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( forward< Fn >( fn), attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( forward< Fn >( fn), attr, a) ); start_fiber_(); } @@ -158,12 +158,12 @@ public: impl_() { typedef detail::worker_object< - Fn, StackAllocator, Allocator + Fn, Allocator > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( forward< Fn >( fn), attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( forward< Fn >( fn), attr, a) ); start_fiber_(); } #else @@ -174,12 +174,12 @@ public: impl_() { typedef detail::worker_object< - Fn, stack_allocator, std::allocator< fiber > + Fn, std::allocator< fiber > > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( fn, attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( fn, attr, a) ); start_fiber_(); } @@ -190,12 +190,12 @@ public: impl_() { typedef detail::worker_object< - Fn, StackAllocator, std::allocator< fiber > + Fn, std::allocator< fiber > > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( fn, attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( fn, attr, a) ); start_fiber_(); } @@ -206,12 +206,12 @@ public: impl_() { typedef detail::worker_object< - Fn, StackAllocator, Allocator + Fn, Allocator > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( fn, attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( fn, attr, a) ); start_fiber_(); } @@ -222,12 +222,12 @@ public: impl_() { typedef detail::worker_object< - Fn, stack_allocator, std::allocator< fiber > + Fn, std::allocator< fiber > > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( fn, attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( fn, attr, a) ); start_fiber_(); } @@ -238,12 +238,12 @@ public: impl_() { typedef detail::worker_object< - Fn, StackAllocator, std::allocator< fiber > + Fn, std::allocator< fiber > > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( fn, attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( fn, attr, a) ); start_fiber_(); } @@ -254,12 +254,12 @@ public: impl_() { typedef detail::worker_object< - Fn, StackAllocator, Allocator + Fn, Allocator > object_t; typename object_t::allocator_t a( alloc); impl_ = ptr_t( // placement new - ::new( a.allocate( 1) ) object_t( fn, attr, stack_alloc, a) ); + ::new( a.allocate( 1) ) object_t( fn, attr, a) ); start_fiber_(); } #endif diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp index 07ebb7aa..f8bb8c93 100644 --- a/include/boost/fiber/round_robin.hpp +++ b/include/boost/fiber/round_robin.hpp @@ -77,6 +77,10 @@ private: rqueue_t rqueue_; detail::main_fiber mn_; + detail::worker_fiber::ptr_t pick_next_(); + + void resume_( detail::worker_fiber::ptr_t const&); + public: round_robin() BOOST_NOEXCEPT; @@ -91,7 +95,7 @@ public: detail::worker_fiber::ptr_t active() BOOST_NOEXCEPT { return active_fiber_; } - bool run(); + void run(); void wait( unique_lock< detail::spinlock > &); bool wait_until( clock_type::time_point const&, diff --git a/performance/preallocated_stack_allocator.hpp b/performance/preallocated_stack_allocator.hpp index a2db1436..dcdb5e99 100644 --- a/performance/preallocated_stack_allocator.hpp +++ b/performance/preallocated_stack_allocator.hpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include #include @@ -33,7 +33,7 @@ public: stack_ctx_() { boost::coroutines::standard_stack_allocator allocator; - allocator.allocate( stack_ctx_, default_stacksize() ); + allocator.allocate( stack_ctx_, boost::coroutines::stack_allocator::traits_type::default_size() ); } void allocate( boost::coroutines::stack_context & ctx, std::size_t size) @@ -45,15 +45,6 @@ public: void deallocate( boost::coroutines::stack_context & ctx) { } - - static std::size_t maximum_stacksize() - { return boost::coroutines::standard_stack_allocator::maximum_stacksize(); } - - static std::size_t default_stacksize() - { return boost::coroutines::standard_stack_allocator::default_stacksize(); } - - static std::size_t minimum_stacksize() - { return boost::coroutines::standard_stack_allocator::minimum_stacksize(); } }; #ifdef BOOST_HAS_ABI_HEADERS diff --git a/src/asio/round_robin.cpp b/src/asio/round_robin.cpp index ac049ff4..fdd83369 100644 --- a/src/asio/round_robin.cpp +++ b/src/asio/round_robin.cpp @@ -58,7 +58,7 @@ round_robin::spawn( detail::worker_fiber::ptr_t const& f) // set active fiber to state_running active_fiber_->set_running(); // resume active fiber - active_fiber_->resume(); + active_fiber_->resume( 0); // fiber is resumed BOOST_ASSERT( f == active_fiber_); @@ -77,7 +77,7 @@ round_robin::evaluate_( detail::worker_fiber::ptr_t const& f) { else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue"); } -bool +void round_robin::run() { wqueue_t wqueue; @@ -106,7 +106,7 @@ round_robin::run() // exchange local with global waiting queue wqueue_.swap( wqueue); - return 0 < io_svc_.poll_one(); + io_svc_.poll_one(); } void diff --git a/src/detail/worker_fiber.cpp b/src/detail/worker_fiber.cpp index 566c3646..9e04a183 100644 --- a/src/detail/worker_fiber.cpp +++ b/src/detail/worker_fiber.cpp @@ -13,7 +13,7 @@ #include #include "boost/fiber/detail/scheduler.hpp" -#include +#include "boost/fiber/exceptions.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -23,9 +23,47 @@ namespace boost { namespace fibers { namespace detail { -worker_fiber::worker_fiber() : +void +worker_fiber::trampoline_( coro_t::yield_type & yield) +{ + BOOST_ASSERT( yield); + BOOST_ASSERT( ! is_terminated() ); + + callee_ = & yield; + set_running(); + suspend(); + + try + { + BOOST_ASSERT( is_running() ); + run(); + BOOST_ASSERT( is_running() ); + } + catch ( coro::detail::forced_unwind const&) + { + set_terminated(); + release(); + throw; + } + catch ( fiber_interrupted const&) + { except_ = current_exception(); } + catch (...) + { std::terminate(); } + + set_terminated(); + release(); + suspend(); + + BOOST_ASSERT_MSG( false, "fiber already terminated"); +} + +worker_fiber::worker_fiber( attributes const& attrs) : fiber_base(), fss_data_(), + callee_( 0), + caller_( + boost::bind( & worker_fiber::trampoline_, this, _1), + attrs), state_( READY), flags_( 0), priority_( 0), @@ -96,34 +134,6 @@ worker_fiber::thread_affinity( bool req) BOOST_NOEXCEPT flags_ &= ~flag_thread_affinity; } -void -worker_fiber::set_terminated() BOOST_NOEXCEPT -{ - state_t previous = state_.exchange( TERMINATED); - BOOST_ASSERT( RUNNING == previous); -} - -void -worker_fiber::set_ready() BOOST_NOEXCEPT -{ - state_t previous = state_.exchange( READY); - BOOST_ASSERT( WAITING == previous || RUNNING == previous || READY == previous); -} - -void -worker_fiber::set_running() BOOST_NOEXCEPT -{ - state_t previous = state_.exchange( RUNNING); - BOOST_ASSERT( READY == previous); -} - -void -worker_fiber::set_waiting() BOOST_NOEXCEPT -{ - state_t previous = state_.exchange( WAITING); - BOOST_ASSERT( RUNNING == previous); -} - void * worker_fiber::get_fss_data( void const* vp) const { diff --git a/src/round_robin.cpp b/src/round_robin.cpp index a02575a7..0a6f448e 100644 --- a/src/round_robin.cpp +++ b/src/round_robin.cpp @@ -28,8 +28,44 @@ namespace boost { namespace fibers { +detail::worker_fiber::ptr_t +round_robin::pick_next_() +{ + detail::worker_fiber::ptr_t victim; + if ( ! rqueue_.empty() ) + { + victim.swap( rqueue_.front() ); + rqueue_.pop_front(); + } + return victim; +} + +void +round_robin::resume_( detail::worker_fiber::ptr_t const& f) +{ + BOOST_ASSERT( f); + BOOST_ASSERT( f->is_ready() ); + + // store active fiber in local var + detail::worker_fiber::ptr_t tmp = active_fiber_; + // assign new fiber to active fiber + active_fiber_ = f; + // set active fiber to state_running + active_fiber_->set_running(); + // check if active-fiber calls itself + // this might happend if fiber calls yield() and no + // other fiber is in the ready-queue + if ( tmp != active_fiber_) + // resume active-fiber == start or yield to + active_fiber_->resume( tmp.get() ); + + //BOOST_ASSERT( f == active_fiber_); + // reset active fiber to previous + active_fiber_ = tmp; +} + round_robin::round_robin() BOOST_NOEXCEPT : - active_fiber_(), + active_fiber_( 0), wqueue_(), rqueue_(), mn_() @@ -52,66 +88,55 @@ round_robin::spawn( detail::worker_fiber::ptr_t const& f) BOOST_ASSERT( f); BOOST_ASSERT( f->is_ready() ); - // store active fiber in local var - detail::worker_fiber::ptr_t tmp = active_fiber_; - // assign new fiber to active fiber - active_fiber_ = f; - // set active fiber to state_running - active_fiber_->set_running(); - // resume active fiber - active_fiber_->resume(); - // fiber is resumed - - BOOST_ASSERT( f == active_fiber_); - // reset active fiber to previous - active_fiber_ = tmp; + rqueue_.push_back( f); + //resume_( f); } -bool +void round_robin::run() { - wqueue_t wqueue; - // move all fibers witch are ready (state_ready) - // from waiting-queue to the runnable-queue - for ( wqueue_t::iterator it = wqueue_.begin(), end = wqueue_.end(); it != end; ++it ) + for (;;) { - detail::worker_fiber::ptr_t f( it->f); - - BOOST_ASSERT( ! f->is_running() ); - BOOST_ASSERT( ! f->is_terminated() ); - - // set fiber to state_ready if dead-line was reached - // set fiber to state_ready if interruption was requested - if ( it->tp <= clock_type::now() || f->interruption_requested() ) - f->set_ready(); - if ( f->is_ready() ) - rqueue_.push_back( f); - else wqueue.push( * it); - } - // exchange local with global waiting queue - std::swap( wqueue_, wqueue); - - // pop new fiber from ready-queue which is not complete - // (example: fiber in ready-queue could be canceled by active-fiber) - detail::worker_fiber::ptr_t f; - do - { - if ( rqueue_.empty() ) + // move all fibers witch are ready (state_ready) + // from waiting-queue to the runnable-queue + wqueue_t wqueue; + for ( wqueue_t::iterator it = wqueue_.begin(), end = wqueue_.end(); it != end; ++it ) { - this_thread::yield(); - return false; + detail::worker_fiber::ptr_t f( it->f); + + BOOST_ASSERT( ! f->is_running() ); + BOOST_ASSERT( ! f->is_terminated() ); + + // set fiber to state_ready if dead-line was reached + // set fiber to state_ready if interruption was requested + if ( it->tp <= clock_type::now() || f->interruption_requested() ) + f->set_ready(); + if ( f->is_ready() ) + rqueue_.push_back( f); + else wqueue.push( * it); + } + + // exchange local with global waiting queue + std::swap( wqueue_, wqueue); + + // pop new fiber from ready-queue which is not complete + // (example: fiber in ready-queue could be canceled by active-fiber) + detail::worker_fiber::ptr_t f = pick_next_(); + if ( f) + { + BOOST_ASSERT_MSG( f->is_ready(), "fiber with invalid state in ready-queue"); + resume_( f); + return; + } + else + { + if ( active_fiber_) + active_fiber_->suspend(); + else + this_thread::yield(); + return; } - f.swap( rqueue_.front() ); - rqueue_.pop_front(); - if ( f->is_ready() ) break; - else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue"); } - while ( true); - - // resume fiber - spawn( f); - - return true; } void @@ -131,9 +156,8 @@ round_robin::wait_until( clock_type::time_point const& timeout_time, lk.unlock(); // push active fiber to wqueue_ wqueue_.push( schedulable( active_fiber_, timeout_time) ); - // suspend active fiber - active_fiber_->suspend(); - // fiber is resumed + // run next fiber + run(); return clock_type::now() < timeout_time; } @@ -148,9 +172,8 @@ round_robin::yield() active_fiber_->set_ready(); // push active fiber to wqueue_ wqueue_.push( schedulable( active_fiber_) ); - // suspend acitive fiber - active_fiber_->suspend(); - // fiber is resumed + // run next fiber + run(); } void @@ -171,18 +194,15 @@ round_robin::join( detail::worker_fiber::ptr_t const& f) // active fiber to state_ready // FIXME: better state_running and no suspend active_fiber_->set_ready(); - // suspend fiber until f terminates - active_fiber_->suspend(); - // fiber is resumed by f + // run next fiber + run(); } else { while ( ! f->is_terminated() ) - { // yield this thread if scheduler did not // resumed some fibers in the previous round - if ( ! run() ) this_thread::yield(); - } + run(); } BOOST_ASSERT( f->is_terminated() ); diff --git a/test/test_barrier.cpp b/test/test_barrier.cpp index ca227672..5708b1d2 100644 --- a/test/test_barrier.cpp +++ b/test/test_barrier.cpp @@ -57,14 +57,10 @@ void test_barrier() boost::fibers::fiber s1( boost::bind( fn1, boost::ref( b) ) ); - BOOST_CHECK( s1); - BOOST_CHECK_EQUAL( 1, value1); boost::fibers::fiber s2( boost::bind( fn2, boost::ref( b) ) ); - BOOST_CHECK( s2); - BOOST_CHECK_EQUAL( 1, value2); s1.join(); s2.join(); diff --git a/test/test_fiber.cpp b/test/test_fiber.cpp index 0a3438f8..7941a29a 100644 --- a/test/test_fiber.cpp +++ b/test/test_fiber.cpp @@ -218,7 +218,7 @@ void test_detach() { { boost::fibers::fiber s1( f1); - BOOST_CHECK( ! s1); + BOOST_CHECK( s1); s1.detach(); BOOST_CHECK( ! s1); BOOST_CHECK( ! s1.joinable() ); @@ -238,7 +238,7 @@ void test_replace() boost::fibers::round_robin ds; boost::fibers::set_scheduling_algorithm( & ds); boost::fibers::fiber s1( f1); - BOOST_CHECK( ! s1); + BOOST_CHECK( s1); boost::fibers::fiber s2( f2); BOOST_CHECK( s2); @@ -249,7 +249,7 @@ void test_replace() void test_complete() { boost::fibers::fiber s1( f1); - BOOST_CHECK( ! s1); + BOOST_CHECK( s1); boost::fibers::fiber s2( f2); BOOST_CHECK( s2); @@ -338,10 +338,8 @@ void test_fiber_interrupts_at_join() int i = 0; bool failed = false; boost::fibers::fiber f1( boost::bind( f7, boost::ref( i), boost::ref( failed) ) ); - BOOST_CHECK_EQUAL( 1, i); boost::fibers::fiber f2( boost::bind( interruption_point_join, boost::ref( f1) ) ); f1.interrupt(); - BOOST_CHECK_EQUAL( 1, i); f2.join(); BOOST_CHECK_EQUAL( 1, i); BOOST_CHECK( failed); diff --git a/test/test_mutex.cpp b/test/test_mutex.cpp index 74e000ee..2f8c3170 100644 --- a/test/test_mutex.cpp +++ b/test/test_mutex.cpp @@ -259,8 +259,6 @@ struct test_exclusive boost::bind( & fn2< mutex_type >, boost::ref( mtx) ) ); BOOST_ASSERT( f1); BOOST_ASSERT( f2); - BOOST_CHECK_EQUAL( 1, value1); - BOOST_CHECK_EQUAL( 1, value2); f1.join(); f2.join();