From 436883ca3358fe10955a4da3f2eb1289cd5b4efa Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Fri, 25 Oct 2013 18:01:56 +0200 Subject: [PATCH] add workstealing round_robin --- build/Jamfile.v2 | 1 + examples/simple.cpp | 49 +++++ include/boost/fiber/algorithm.hpp | 3 - include/boost/fiber/all.hpp | 1 + include/boost/fiber/asio/round_robin.hpp | 7 - include/boost/fiber/round_robin.hpp | 7 +- include/boost/fiber/round_robin_ws.hpp | 95 +++++++++ src/round_robin.cpp | 22 +-- src/round_robin_ws.cpp | 233 +++++++++++++++++++++++ test/test_migration.cpp | 10 +- 10 files changed, 390 insertions(+), 38 deletions(-) create mode 100644 include/boost/fiber/round_robin_ws.hpp create mode 100644 src/round_robin_ws.cpp diff --git a/build/Jamfile.v2 b/build/Jamfile.v2 index 24652250..1df9bc1f 100644 --- a/build/Jamfile.v2 +++ b/build/Jamfile.v2 @@ -45,6 +45,7 @@ lib boost_fiber recursive_mutex.cpp recursive_timed_mutex.cpp round_robin.cpp + round_robin_ws.cpp timed_mutex.cpp : shared:../../atomic/build//boost_atomic shared:../../coroutine/build//boost_coroutine diff --git a/examples/simple.cpp b/examples/simple.cpp index 0e4c485f..4c6338ba 100644 --- a/examples/simple.cpp +++ b/examples/simple.cpp @@ -2,10 +2,28 @@ #include #include +#include #include +#include #include +struct X +{ + typedef boost::intrusive_ptr< X > ptr_t; + + std::size_t use_count; + + X() : use_count( 0) {} + + friend inline void intrusive_ptr_add_ref( X * p) BOOST_NOEXCEPT + { ++p->use_count; } + + friend inline void intrusive_ptr_release( X * p) + { if ( 0 == --p->use_count) delete p; } + +}; + inline void fn( std::string const& str, int n) { @@ -23,11 +41,42 @@ int main() try { +#if 0 boost::fibers::fiber f1( boost::bind( fn, "abc", 5) ); boost::fibers::fiber f2( boost::bind( fn, "xyz", 7) ); f1.join(); f2.join(); +#endif + + X::ptr_t x1( new X() ); + X::ptr_t x2( new X() ); + std::cout << "x1->use-count: " << x1->use_count << std::endl; + std::cout << "x2->use-count: " << x2->use_count << std::endl; + + intrusive_ptr_add_ref( x1.get() ); + intrusive_ptr_add_ref( x2.get() ); + std::cout << "x1->use-count: " << x1->use_count << std::endl; + std::cout << "x2->use-count: " << x2->use_count << std::endl; + + X::ptr_t x3( 0); + boost::atomic< X::ptr_t > av1( x1); + boost::atomic< X::ptr_t > av2( x3); + std::cout << "x1->use-count: " << x1->use_count << std::endl; + std::cout << "x2->use-count: " << x2->use_count << std::endl; + + BOOST_ASSERT( av1.compare_exchange_strong( x1, x2) ); + std::cout << "x1->use-count: " << x1->use_count << std::endl; + std::cout << "x2->use-count: " << x2->use_count << std::endl; + //BOOST_ASSERT( av1.is_lock_free() ); + BOOST_ASSERT( av1.load() != x1); + BOOST_ASSERT( av1.load() == x2); + + BOOST_ASSERT( av2.compare_exchange_strong( x3, x1) ); + std::cout << "x1->use-count: " << x1->use_count << std::endl; + std::cout << "x2->use-count: " << x2->use_count << std::endl; + BOOST_ASSERT( av2.load() == x1); + BOOST_ASSERT( av2.load() != x2); std::cout << "done." << std::endl; diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp index 8ed66cb7..3e85e2ee 100644 --- a/include/boost/fiber/algorithm.hpp +++ b/include/boost/fiber/algorithm.hpp @@ -51,9 +51,6 @@ struct algorithm : private noncopyable virtual void yield() = 0; virtual ~algorithm() {} - - virtual fiber steal_from() = 0; - virtual void migrate_to( fiber const&) = 0; }; }} diff --git a/include/boost/fiber/all.hpp b/include/boost/fiber/all.hpp index de5d0472..abc8dafb 100644 --- a/include/boost/fiber/all.hpp +++ b/include/boost/fiber/all.hpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include diff --git a/include/boost/fiber/asio/round_robin.hpp b/include/boost/fiber/asio/round_robin.hpp index 05b7130c..f8869fda 100644 --- a/include/boost/fiber/asio/round_robin.hpp +++ b/include/boost/fiber/asio/round_robin.hpp @@ -8,13 +8,11 @@ #define BOOST_FIBERS_ASIO_ROUND_ROBIN_HPP #include -#include #include #include #include #include -#include #include #include @@ -83,11 +81,6 @@ public: unique_lock< detail::spinlock > &); void yield(); - - fiber steal_from() - { BOOST_THROW_EXCEPTION( std::domain_error("not implemented") ); } - void migrate_to( fiber const&) - { BOOST_THROW_EXCEPTION( std::domain_error("not implemented") ); } }; }}} diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp index f7ada302..cf887cc1 100644 --- a/include/boost/fiber/round_robin.hpp +++ b/include/boost/fiber/round_robin.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #ifdef BOOST_HAS_ABI_HEADERS @@ -51,10 +50,11 @@ private: }; typedef std::deque< schedulable > wqueue_t; + typedef std::deque< detail::fiber_base::ptr_t > rqueue_t; detail::fiber_base::ptr_t active_fiber_; wqueue_t wqueue_; - detail::ws_queue rqueue_; + rqueue_t rqueue_; public: round_robin() BOOST_NOEXCEPT; @@ -77,9 +77,6 @@ public: unique_lock< detail::spinlock > &); void yield(); - - fiber steal_from(); - void migrate_to( fiber const&); }; }} diff --git a/include/boost/fiber/round_robin_ws.hpp b/include/boost/fiber/round_robin_ws.hpp new file mode 100644 index 00000000..3924a3ab --- /dev/null +++ b/include/boost/fiber/round_robin_ws.hpp @@ -0,0 +1,95 @@ +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_DEFAULT_SCHEDULER_WS_H +#define BOOST_FIBERS_DEFAULT_SCHEDULER_WS_H + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +# if defined(BOOST_MSVC) +# pragma warning(push) +# pragma warning(disable:4251 4275) +# endif + +namespace boost { +namespace fibers { + +class BOOST_FIBERS_DECL round_robin_ws : public algorithm +{ +private: + struct schedulable + { + detail::fiber_base::ptr_t f; + clock_type::time_point tp; + + schedulable( detail::fiber_base::ptr_t const& f_, + clock_type::time_point const& tp_ = + (clock_type::time_point::max)() ) : + f( f_), tp( tp_) + { BOOST_ASSERT( f); } + }; + + typedef std::deque< schedulable > wqueue_t; + + detail::fiber_base::ptr_t active_fiber_; + wqueue_t wqueue_; + detail::ws_queue rqueue_; + +public: + round_robin_ws() BOOST_NOEXCEPT; + + ~round_robin_ws() BOOST_NOEXCEPT; + + void spawn( detail::fiber_base::ptr_t const&); + + void priority( detail::fiber_base::ptr_t const&, int); + + void join( detail::fiber_base::ptr_t const&); + + detail::fiber_base::ptr_t active() BOOST_NOEXCEPT + { return active_fiber_; } + + bool run(); + + void wait( unique_lock< detail::spinlock > &); + bool wait_until( clock_type::time_point const&, + unique_lock< detail::spinlock > &); + + void yield(); + + fiber steal_from(); + void migrate_to( fiber const&); +}; + +}} + +# if defined(BOOST_MSVC) +# pragma warning(pop) +# endif + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_DEFAULT_SCHEDULER_WS_H diff --git a/src/round_robin.cpp b/src/round_robin.cpp index b7fe52a5..df48f684 100644 --- a/src/round_robin.cpp +++ b/src/round_robin.cpp @@ -86,7 +86,7 @@ round_robin::run() if ( f->interruption_requested() ) f->set_ready(); if ( f->is_ready() ) - rqueue_.push( f); + rqueue_.push_back( f); else wqueue.push_back( s); } // exchange local with global waiting queue @@ -97,7 +97,9 @@ round_robin::run() detail::fiber_base::ptr_t f; do { - if ( ! rqueue_.try_pop( f) ) return false; + if ( rqueue_.empty() ) return false; + f.swap( rqueue_.front() ); + rqueue_.pop_front(); if ( f->is_ready() ) break; else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue"); } @@ -210,22 +212,6 @@ round_robin::priority( detail::fiber_base::ptr_t const& f, int prio) f->priority( prio); } -fiber -round_robin::steal_from() -{ - detail::fiber_base::ptr_t f; - if ( ! rqueue_.try_pop( f) ) return fiber(); - return fiber( f); -} - -void -round_robin::migrate_to( fiber const& f) -{ - BOOST_ASSERT( f); - - spawn( detail::scheduler::extract( f) ); -} - }} #ifdef BOOST_HAS_ABI_HEADERS diff --git a/src/round_robin_ws.cpp b/src/round_robin_ws.cpp new file mode 100644 index 00000000..9de16028 --- /dev/null +++ b/src/round_robin_ws.cpp @@ -0,0 +1,233 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include "boost/fiber/round_robin_ws.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "boost/fiber/detail/scheduler.hpp" +#include "boost/fiber/exceptions.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { + +round_robin_ws::round_robin_ws() BOOST_NOEXCEPT : + active_fiber_(), + wqueue_(), + rqueue_() +{} + +round_robin_ws::~round_robin_ws() BOOST_NOEXCEPT +{ + // fibers will be destroyed (stack-unwinding) + // if last reference goes out-of-scope + // therefore destructing wqueue_ && rqueue_ + // will destroy the fibers in this scheduler + // if not referenced on other places + if ( detail::scheduler::instance() == this) + detail::scheduler::replace( 0); +} + +void +round_robin_ws::spawn( detail::fiber_base::ptr_t const& f) +{ + BOOST_ASSERT( f); + BOOST_ASSERT( f->is_ready() ); + + // store active fiber in local var + detail::fiber_base::ptr_t tmp = active_fiber_; + // assign new fiber to active fiber + active_fiber_ = f; + // set active fiber to state_running + active_fiber_->set_running(); + // resume active fiber + active_fiber_->resume(); + // fiber is resumed + + BOOST_ASSERT( f == active_fiber_); + // reset active fiber to previous + active_fiber_ = tmp; +} + +bool +round_robin_ws::run() +{ + wqueue_t wqueue; + // move all fibers witch are ready (state_ready) + // from waiting-queue to the runnable-queue + BOOST_FOREACH( schedulable const& s, wqueue_) + { + detail::fiber_base::ptr_t f( s.f); + + BOOST_ASSERT( ! f->is_running() ); + BOOST_ASSERT( ! f->is_terminated() ); + + // set fiber to state_ready if dead-line was reached + if ( s.tp <= clock_type::now() ) + f->set_ready(); + // set fiber to state_ready if interruption was requested + if ( f->interruption_requested() ) + f->set_ready(); + if ( f->is_ready() ) + rqueue_.push( f); + else wqueue.push_back( s); + } + // exchange local with global waiting queue + wqueue_.swap( wqueue); + + // pop new fiber from ready-queue which is not complete + // (example: fiber in ready-queue could be canceled by active-fiber) + detail::fiber_base::ptr_t f; + do + { + if ( ! rqueue_.try_pop( f) ) return false; + if ( f->is_ready() ) break; + else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue"); + } + while ( true); + + // resume fiber + spawn( f); + + return true; +} + +void +round_robin_ws::wait( unique_lock< detail::spinlock > & lk) +{ wait_until( clock_type::time_point( (clock_type::duration::max)() ), lk); } + +bool +round_robin_ws::wait_until( clock_type::time_point const& timeout_time, + unique_lock< detail::spinlock > & lk) +{ + clock_type::time_point start( clock_type::now() ); + + BOOST_ASSERT( active_fiber_); + BOOST_ASSERT( active_fiber_->is_running() ); + + // set active fiber to state_waiting + active_fiber_->set_waiting(); + // release lock + lk.unlock(); + // push active fiber to wqueue_ + wqueue_.push_back( schedulable( active_fiber_, timeout_time) ); + // store active fiber in local var + detail::fiber_base::ptr_t tmp = active_fiber_; + // suspend active fiber + active_fiber_->suspend(); + // fiber is resumed + + BOOST_ASSERT( tmp == detail::scheduler::instance()->active() ); + BOOST_ASSERT( tmp->is_running() ); + + return clock_type::now() < timeout_time; +} + +void +round_robin_ws::yield() +{ + BOOST_ASSERT( active_fiber_); + BOOST_ASSERT( active_fiber_->is_running() ); + + // set active fiber to state_waiting + active_fiber_->set_ready(); + // push active fiber to wqueue_ + wqueue_.push_back( schedulable( active_fiber_) ); + // store active fiber in local var + detail::fiber_base::ptr_t tmp = active_fiber_; + // suspend acitive fiber + active_fiber_->suspend(); + // fiber is resumed + + BOOST_ASSERT( tmp == detail::scheduler::instance()->active() ); + BOOST_ASSERT( tmp->is_running() ); +} + +void +round_robin_ws::join( detail::fiber_base::ptr_t const& f) +{ + BOOST_ASSERT( f); + BOOST_ASSERT( f != active_fiber_); + + if ( active_fiber_) + { + // set active fiber to state_waiting + active_fiber_->set_waiting(); + // push active fiber to wqueue_ + wqueue_.push_back( schedulable( active_fiber_) ); + // add active fiber to joinig-list of f + if ( ! f->join( active_fiber_) ) + // f must be already terminated therefore we set + // active fiber to state_ready + // FIXME: better state_running and no suspend + active_fiber_->set_ready(); + // store active fiber in local var + detail::fiber_base::ptr_t tmp = active_fiber_; + // suspend fiber until f terminates + active_fiber_->suspend(); + // fiber is resumed by f + + BOOST_ASSERT( tmp == detail::scheduler::instance()->active() ); + BOOST_ASSERT( tmp->is_running() ); + } + else + { + while ( ! f->is_terminated() ) + { + // yield this thread if scheduler did not + // resumed some fibers in the previous round + if ( ! run() ) this_thread::yield(); + } + } + + BOOST_ASSERT( f->is_terminated() ); +} + +void +round_robin_ws::priority( detail::fiber_base::ptr_t const& f, int prio) +{ + BOOST_ASSERT( f); + + // set only priority to fiber + // round-robin does not respect priorities + f->priority( prio); +} + +fiber +round_robin_ws::steal_from() +{ + detail::fiber_base::ptr_t f; + if ( ! rqueue_.try_pop( f) ) return fiber(); + return fiber( f); +} + +void +round_robin_ws::migrate_to( fiber const& f) +{ + BOOST_ASSERT( f); + + spawn( detail::scheduler::extract( f) ); +} + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif diff --git a/test/test_migration.cpp b/test/test_migration.cpp index 5b9205b5..e9d9534e 100644 --- a/test/test_migration.cpp +++ b/test/test_migration.cpp @@ -21,7 +21,7 @@ #include boost::atomic< bool > fini( false); -boost::fibers::algorithm * other_ds = 0; +boost::fibers::round_robin_ws * other_ds = 0; boost::fibers::future< int > fibonacci( int); @@ -55,7 +55,7 @@ int create_fiber( int n) return fibonacci( n).get(); } -void fn_create_fibers( boost::fibers::round_robin * ds, boost::barrier * b) +void fn_create_fibers( boost::fibers::round_robin_ws * ds, boost::barrier * b) { boost::fibers::set_scheduling_algorithm( ds); @@ -68,10 +68,10 @@ void fn_create_fibers( boost::fibers::round_robin * ds, boost::barrier * b) fini = true; } -void fn_migrate_fibers( boost::fibers::round_robin * other_ds, boost::barrier * b, int * count) +void fn_migrate_fibers( boost::fibers::round_robin_ws * other_ds, boost::barrier * b, int * count) { BOOST_ASSERT( other_ds); - boost::fibers::round_robin ds; + boost::fibers::round_robin_ws ds; boost::fibers::set_scheduling_algorithm( & ds); b->wait(); @@ -97,7 +97,7 @@ void test_migrate_fiber() fini = false; int count = 0; - boost::fibers::round_robin * ds = new boost::fibers::round_robin(); + boost::fibers::round_robin_ws * ds = new boost::fibers::round_robin_ws(); boost::barrier b( 2); boost::thread t1( boost::bind( fn_create_fibers, ds, &b) ); boost::thread t2( boost::bind( fn_migrate_fibers, ds, &b, &count) );