diff --git a/include/boost/fiber/detail/container.hpp b/include/boost/fiber/detail/container.hpp index 90ecc0f6..c6f85f6c 100644 --- a/include/boost/fiber/detail/container.hpp +++ b/include/boost/fiber/detail/container.hpp @@ -86,6 +86,9 @@ public: void erase( iterator const& i) { base_.erase( i); } + void erase( iterator const& i, iterator const& e) + { base_.erase( i, e); } + void swap( container & other) { base_.swap( other); } diff --git a/include/boost/fiber/detail/fiber_base.hpp b/include/boost/fiber/detail/fiber_base.hpp index 45705ed7..53678f59 100644 --- a/include/boost/fiber/detail/fiber_base.hpp +++ b/include/boost/fiber/detail/fiber_base.hpp @@ -128,7 +128,7 @@ public: void yield(); - void terminate(); + void release(); bool join( ptr_t const&); @@ -182,14 +182,38 @@ public: // it is set after the fiber-function was left == at the end of exec() void set_terminated() BOOST_NOEXCEPT { + // other thread could have called set_ready() before + // case: - this fiber has joined another fiber running in another thread, + // - other fiber terminated and releases its joining fibers + // - this fiber was interrupted before and therefore resumed + // and throws fiber_interrupted + // - fiber_interrupted was not catched and swallowed + // - other fiber calls set_ready() on this fiber happend before this + // fiber calls set_terminated() + // - this fiber stack gets unwound and set_terminated() is called at the end state_t previous = state_.exchange( state_terminated, memory_order_release); BOOST_ASSERT( state_running == previous || state_ready == previous); } void set_ready() BOOST_NOEXCEPT { - state_t previous = state_.exchange( state_ready, memory_order_release); - BOOST_ASSERT( state_running == previous || state_waiting == previous); + // this fiber calls set_ready(): - only transition from state_waiting (wake-up) + // - or transition from state_running (yield) allowed + // other fiber calls set_ready(): - only if this fiber was joinig other fiber + // - if this fiber was not interrupted then this fiber + // should in state_waiting + // - if this fiber was interrupted the this fiber might + // be in state_ready, state_running or already in + // state_terminated + for (;;) + { + state_t expected = state_waiting; + bool result = state_.compare_exchange_strong( expected, state_ready, memory_order_release); + if ( result || state_terminated == expected || state_ready == expected) return; + expected = state_running; + result = state_.compare_exchange_strong( expected, state_ready, memory_order_release); + if ( result || state_terminated == expected || state_ready == expected) return; + } } void set_running() BOOST_NOEXCEPT @@ -200,8 +224,17 @@ public: void set_waiting() BOOST_NOEXCEPT { + // other thread could have called set_ready() before + // case: - this fiber has joined another fiber running in another thread, + // - other fiber terminated and releases its joining fibers + // - this fiber was interrupted and therefore resumed and + // throws fiber_interrupted + // - fiber_interrupted was catched and swallowed + // - other fiber calls set_ready() on this fiber happend before this + // fiber calls set_waiting() + // - this fiber might wait on some sync. primitive calling set_waiting() state_t previous = state_.exchange( state_waiting, memory_order_release); - BOOST_ASSERT( state_running == previous); + BOOST_ASSERT( state_running == previous || state_ready == previous); } state_t state() const BOOST_NOEXCEPT diff --git a/include/boost/fiber/detail/fiber_object.hpp b/include/boost/fiber/detail/fiber_object.hpp index 9162adbd..c6dd01e1 100644 --- a/include/boost/fiber/detail/fiber_object.hpp +++ b/include/boost/fiber/detail/fiber_object.hpp @@ -135,7 +135,7 @@ public: #endif ~fiber_object() - { terminate(); } + { release(); } void exec() { @@ -225,7 +225,7 @@ public: { enter_(); } ~fiber_object() - { terminate(); } + { release(); } void exec() { @@ -315,7 +315,7 @@ public: { enter_(); } ~fiber_object() - { terminate(); } + { release(); } void exec() { diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp index 0971d2de..18f2df08 100644 --- a/include/boost/fiber/round_robin.hpp +++ b/include/boost/fiber/round_robin.hpp @@ -35,11 +35,11 @@ namespace fibers { class BOOST_FIBERS_DECL round_robin : public algorithm { private: - typedef detail::container<> container_t; + typedef detail::container<> wqueue_t; typedef std::deque< detail::fiber_base::ptr_t > rqueue_t; detail::fiber_base::ptr_t active_fiber_; - container_t fibers_; + wqueue_t wqueue_; detail::spinlock rqueue_mtx_; rqueue_t rqueue_; diff --git a/src/detail/fiber_base.cpp b/src/detail/fiber_base.cpp index 2db2fa1b..c73e31cc 100644 --- a/src/detail/fiber_base.cpp +++ b/src/detail/fiber_base.cpp @@ -70,7 +70,7 @@ fiber_base::yield() } void -fiber_base::terminate() +fiber_base::release() { if ( ! is_terminated() ) unwind_stack(); @@ -79,7 +79,7 @@ fiber_base::terminate() // protect against concurrent access to joining_ unique_lock< spinlock > lk( joining_mtx_); BOOST_FOREACH( fiber_base::ptr_t & p, joining_) - { if ( ! p->is_terminated() ) p->set_ready(); } + { p->set_ready(); } } bool diff --git a/src/interruption.cpp b/src/interruption.cpp index 4d4d9c44..594cd2f1 100644 --- a/src/interruption.cpp +++ b/src/interruption.cpp @@ -61,14 +61,11 @@ bool interruption_requested() BOOST_NOEXCEPT void interruption_point() { - if ( interruption_requested() ) - { - if ( interruption_enabled() ) + if ( interruption_requested() && interruption_enabled() ) { fibers::detail::scheduler::instance().active()->request_interruption( false); throw fibers::fiber_interrupted(); } - } } }} diff --git a/src/round_robin.cpp b/src/round_robin.cpp index 3cd38547..f3096a01 100644 --- a/src/round_robin.cpp +++ b/src/round_robin.cpp @@ -1,9 +1,10 @@ + // Copyright Oliver Kowalke 2009. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) -#define BOOST_FIBERS_SOURCE +#define BOOST_wqueue_SOURCE #include @@ -30,7 +31,7 @@ namespace fibers { round_robin::round_robin() : active_fiber_(), - fibers_(), + wqueue_(), rqueue_mtx_(), rqueue_() {} @@ -39,15 +40,15 @@ round_robin::~round_robin() { BOOST_ASSERT( ! active_fiber_); - unique_lock< detail::spinlock > lk( rqueue_mtx_); - rqueue_.clear(); - lk.unlock(); - - BOOST_FOREACH( detail::fiber_base::ptr_t const& p, fibers_) + BOOST_FOREACH( detail::fiber_base::ptr_t const& p, wqueue_) { p->request_interruption( true); - while ( ! fibers_.empty() ) run(); } + + unique_lock< detail::spinlock > lk( rqueue_mtx_); + rqueue_.insert( rqueue_.end(), wqueue_.begin(), wqueue_.end() ); + lk.unlock(); + while ( ! wqueue_.empty() ) run(); } void @@ -64,10 +65,14 @@ round_robin::spawn( detail::fiber_base::ptr_t const& f) active_fiber_ = f; active_fiber_->set_running(); active_fiber_->resume(); - if ( ! f->is_terminated() ) - { - fibers_.push_back( f); - } + // after return from fiber::resume() the fiber is: + // ready -> already in requeue_ + // waiting -> already in wqueue_ + // terminated -> not stored in round_robin/will be deleted + // call terminate() in order to release + // joining fibers + if ( active_fiber_->is_terminated() ) + f->release(); } void @@ -88,16 +93,19 @@ round_robin::join( detail::fiber_base::ptr_t const& f) if ( active_fiber_) { - // set active_fiber to state_waiting + // set active fiber to state_waiting active_fiber_->set_waiting(); - // add active_fiber_ to joinig-list of f + // push active fiber to wqueue_ + wqueue_.push_back( active_fiber_); + // add active fiber to joinig-list of f if ( ! f->join( active_fiber_) ) - //FIXME: in-performant -> better state changed to running + // f must be already terminated therefore we set + // active fiber to state_ready + // FIXME: better state_running and no suspend active_fiber_->set_ready(); - // suspend active-fiber until f terminates + // suspend active fiber until f terminates active_fiber_->suspend(); - // fiber is resumed - // f has teminated and active-fiber is resumed + // active fiber is resumed and f has teminated // check if fiber was interrupted this_fiber::interruption_point(); @@ -106,7 +114,7 @@ round_robin::join( detail::fiber_base::ptr_t const& f) { while ( ! f->is_terminated() ) { - //FIXME: this_thread::yield() ? + //FIXME: call this_thread::yield() before ? run(); } } @@ -118,50 +126,36 @@ bool round_robin::run() { // stable-sort has n*log(n) complexity if n*log(n) extra space is available - std::size_t n = fibers_.size(); + std::size_t n = wqueue_.size(); if ( 1 < n) { std::size_t new_capacity = n * std::log10( n) + n; - if ( fibers_.capacity() < new_capacity) - fibers_.reserve( new_capacity); + if ( wqueue_.capacity() < new_capacity) + wqueue_.reserve( new_capacity); - // sort fibers_ depending on state - fibers_.sort(); + // sort wqueue_ depending on state + wqueue_.sort(); } - // check which waiting fdiber should be interrupted + // check which waiting fiber should be interrupted // make it ready and add it to rqueue_ - std::pair< container_t::iterator, container_t::iterator > p = - fibers_.equal_range( detail::state_waiting); - for ( container_t::iterator i = p.first; i != p.second; ++i) + std::pair< wqueue_t::iterator, wqueue_t::iterator > p = + wqueue_.equal_range( detail::state_waiting); + for ( wqueue_t::iterator i = p.first; i != p.second; ++i) { if ( ( * i)->interruption_requested() ) - { ( * i)->set_ready(); - unique_lock< detail::spinlock > lk( rqueue_mtx_); - rqueue_.push_back( * i); - } } // copy all ready fibers to rqueue_ - p = fibers_.equal_range( detail::state_ready); + // and remove fibers from wqueue_ + p = wqueue_.equal_range( detail::state_ready); if ( p.first != p.second) { unique_lock< detail::spinlock > lk( rqueue_mtx_); rqueue_.insert( rqueue_.end(), p.first, p.second); - } - - // remove all terminated fibers from fibers_ - p = fibers_.equal_range( detail::state_terminated); - for ( container_t::iterator i = p.first; i != p.second; ++i) - { - ( * i)->terminate(); - fibers_.erase( i); - } - - { - unique_lock< detail::spinlock > lk( rqueue_mtx_); - if ( rqueue_.empty() ) return false; + lk.unlock(); + wqueue_.erase( p.first, p.second); } // pop new fiber from runnable-queue which is not complete @@ -184,6 +178,10 @@ round_robin::run() // resume new active fiber active_fiber_->set_running(); active_fiber_->resume(); + // call terminate() in order to release + // joining fibers if resumed fiber has terminated + if ( active_fiber_->is_terminated() ) + f->release(); return true; } @@ -196,8 +194,10 @@ round_robin::wait( unique_lock< detail::spinlock > & lk) // set active_fiber to state_waiting active_fiber_->set_waiting(); - // unlock Lock assoc. with sync. primitive + // unlock assoc. sync. primitive lk.unlock(); + // push active fiber to wqueue_ + wqueue_.push_back( active_fiber_); // suspend fiber active_fiber_->suspend(); // fiber is resumed @@ -211,13 +211,14 @@ round_robin::yield() BOOST_ASSERT( active_fiber_); BOOST_ASSERT( active_fiber_->is_running() ); - // yield() suspends the fiber and adds it - // immediately to ready-queue - unique_lock< detail::spinlock > lk( rqueue_mtx_); - rqueue_.push_back( active_fiber_); - lk.unlock(); // set active_fiber to state_ready active_fiber_->set_ready(); + // suspends active fiber and adds it to wqueue_ + // Note: adding to rqueue_ could result in a raise + // between adding to rqueue_ and calling yield another + // thread could steel fiber from rqueue_ and resume it + // at the same time as yield is called + wqueue_.push_back( active_fiber_); // suspend fiber active_fiber_->yield(); // fiber is resumed @@ -231,8 +232,7 @@ round_robin::exec_in( detail::fiber_base::ptr_t const& f) BOOST_ASSERT( f); BOOST_ASSERT( f->is_ready() ); - unique_lock< detail::spinlock > lk( rqueue_mtx_); - rqueue_.push_back( f); + wqueue_.push_back( f); } detail::fiber_base::ptr_t diff --git a/test/test_fiber.cpp b/test/test_fiber.cpp index 3bb88760..bedba4c4 100644 --- a/test/test_fiber.cpp +++ b/test/test_fiber.cpp @@ -76,7 +76,6 @@ void f2() void f4() { boost::fibers::fiber s( f2); - std::cout << s.get_id() << "\n"; BOOST_CHECK( s); BOOST_CHECK( s.joinable() ); s.join(); @@ -269,9 +268,8 @@ void test_join_in_fiber() // s' yields in its fiber-fn // s joins s' and gets suspended (waiting on s') boost::fibers::fiber s( f4); - std::cout << s.get_id() << "\n"; // run() resumes s + s' which completes - if ( s.joinable() ) s.join(); + s.join(); BOOST_CHECK( ! s); } @@ -354,6 +352,7 @@ boost::unit_test::test_suite * init_unit_test_suite( int, char* []) { boost::unit_test::test_suite * test = BOOST_TEST_SUITE("Boost.Fiber: fiber test suite"); + test->add( BOOST_TEST_CASE( & test_move) ); test->add( BOOST_TEST_CASE( & test_id) ); test->add( BOOST_TEST_CASE( & test_priority) ); @@ -368,5 +367,6 @@ boost::unit_test::test_suite * init_unit_test_suite( int, char* []) test->add( BOOST_TEST_CASE( & test_fiber_interrupts_at_interruption_point) ); test->add( BOOST_TEST_CASE( & test_fiber_no_interrupt_if_interrupts_disabled_at_interruption_point) ); test->add( BOOST_TEST_CASE( & test_fiber_interrupts_at_join) ); + return test; }