diff --git a/build/Jamfile.v2 b/build/Jamfile.v2
index 24652250..1df9bc1f 100644
--- a/build/Jamfile.v2
+++ b/build/Jamfile.v2
@@ -45,6 +45,7 @@ lib boost_fiber
recursive_mutex.cpp
recursive_timed_mutex.cpp
round_robin.cpp
+ round_robin_ws.cpp
timed_mutex.cpp
: shared:../../atomic/build//boost_atomic
shared:../../coroutine/build//boost_coroutine
diff --git a/examples/simple.cpp b/examples/simple.cpp
index 0e4c485f..4c6338ba 100644
--- a/examples/simple.cpp
+++ b/examples/simple.cpp
@@ -2,10 +2,28 @@
#include
#include
+#include
#include
+#include
#include
+struct X
+{
+ typedef boost::intrusive_ptr< X > ptr_t;
+
+ std::size_t use_count;
+
+ X() : use_count( 0) {}
+
+ friend inline void intrusive_ptr_add_ref( X * p) BOOST_NOEXCEPT
+ { ++p->use_count; }
+
+ friend inline void intrusive_ptr_release( X * p)
+ { if ( 0 == --p->use_count) delete p; }
+
+};
+
inline
void fn( std::string const& str, int n)
{
@@ -23,11 +41,42 @@ int main()
try
{
+#if 0
boost::fibers::fiber f1( boost::bind( fn, "abc", 5) );
boost::fibers::fiber f2( boost::bind( fn, "xyz", 7) );
f1.join();
f2.join();
+#endif
+
+ X::ptr_t x1( new X() );
+ X::ptr_t x2( new X() );
+ std::cout << "x1->use-count: " << x1->use_count << std::endl;
+ std::cout << "x2->use-count: " << x2->use_count << std::endl;
+
+ intrusive_ptr_add_ref( x1.get() );
+ intrusive_ptr_add_ref( x2.get() );
+ std::cout << "x1->use-count: " << x1->use_count << std::endl;
+ std::cout << "x2->use-count: " << x2->use_count << std::endl;
+
+ X::ptr_t x3( 0);
+ boost::atomic< X::ptr_t > av1( x1);
+ boost::atomic< X::ptr_t > av2( x3);
+ std::cout << "x1->use-count: " << x1->use_count << std::endl;
+ std::cout << "x2->use-count: " << x2->use_count << std::endl;
+
+ BOOST_ASSERT( av1.compare_exchange_strong( x1, x2) );
+ std::cout << "x1->use-count: " << x1->use_count << std::endl;
+ std::cout << "x2->use-count: " << x2->use_count << std::endl;
+ //BOOST_ASSERT( av1.is_lock_free() );
+ BOOST_ASSERT( av1.load() != x1);
+ BOOST_ASSERT( av1.load() == x2);
+
+ BOOST_ASSERT( av2.compare_exchange_strong( x3, x1) );
+ std::cout << "x1->use-count: " << x1->use_count << std::endl;
+ std::cout << "x2->use-count: " << x2->use_count << std::endl;
+ BOOST_ASSERT( av2.load() == x1);
+ BOOST_ASSERT( av2.load() != x2);
std::cout << "done." << std::endl;
diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp
index 8ed66cb7..3e85e2ee 100644
--- a/include/boost/fiber/algorithm.hpp
+++ b/include/boost/fiber/algorithm.hpp
@@ -51,9 +51,6 @@ struct algorithm : private noncopyable
virtual void yield() = 0;
virtual ~algorithm() {}
-
- virtual fiber steal_from() = 0;
- virtual void migrate_to( fiber const&) = 0;
};
}}
diff --git a/include/boost/fiber/all.hpp b/include/boost/fiber/all.hpp
index de5d0472..abc8dafb 100644
--- a/include/boost/fiber/all.hpp
+++ b/include/boost/fiber/all.hpp
@@ -27,6 +27,7 @@
#include
#include
#include
+#include
#include
#include
diff --git a/include/boost/fiber/asio/round_robin.hpp b/include/boost/fiber/asio/round_robin.hpp
index 05b7130c..f8869fda 100644
--- a/include/boost/fiber/asio/round_robin.hpp
+++ b/include/boost/fiber/asio/round_robin.hpp
@@ -8,13 +8,11 @@
#define BOOST_FIBERS_ASIO_ROUND_ROBIN_HPP
#include
-#include
#include
#include
#include
#include
-#include
#include
#include
@@ -83,11 +81,6 @@ public:
unique_lock< detail::spinlock > &);
void yield();
-
- fiber steal_from()
- { BOOST_THROW_EXCEPTION( std::domain_error("not implemented") ); }
- void migrate_to( fiber const&)
- { BOOST_THROW_EXCEPTION( std::domain_error("not implemented") ); }
};
}}}
diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp
index f7ada302..cf887cc1 100644
--- a/include/boost/fiber/round_robin.hpp
+++ b/include/boost/fiber/round_robin.hpp
@@ -20,7 +20,6 @@
#include
#include
#include
-#include
#include
#ifdef BOOST_HAS_ABI_HEADERS
@@ -51,10 +50,11 @@ private:
};
typedef std::deque< schedulable > wqueue_t;
+ typedef std::deque< detail::fiber_base::ptr_t > rqueue_t;
detail::fiber_base::ptr_t active_fiber_;
wqueue_t wqueue_;
- detail::ws_queue rqueue_;
+ rqueue_t rqueue_;
public:
round_robin() BOOST_NOEXCEPT;
@@ -77,9 +77,6 @@ public:
unique_lock< detail::spinlock > &);
void yield();
-
- fiber steal_from();
- void migrate_to( fiber const&);
};
}}
diff --git a/include/boost/fiber/round_robin_ws.hpp b/include/boost/fiber/round_robin_ws.hpp
new file mode 100644
index 00000000..3924a3ab
--- /dev/null
+++ b/include/boost/fiber/round_robin_ws.hpp
@@ -0,0 +1,95 @@
+// Copyright Oliver Kowalke 2013.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_FIBERS_DEFAULT_SCHEDULER_WS_H
+#define BOOST_FIBERS_DEFAULT_SCHEDULER_WS_H
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_PREFIX
+#endif
+
+# if defined(BOOST_MSVC)
+# pragma warning(push)
+# pragma warning(disable:4251 4275)
+# endif
+
+namespace boost {
+namespace fibers {
+
+class BOOST_FIBERS_DECL round_robin_ws : public algorithm
+{
+private:
+ struct schedulable
+ {
+ detail::fiber_base::ptr_t f;
+ clock_type::time_point tp;
+
+ schedulable( detail::fiber_base::ptr_t const& f_,
+ clock_type::time_point const& tp_ =
+ (clock_type::time_point::max)() ) :
+ f( f_), tp( tp_)
+ { BOOST_ASSERT( f); }
+ };
+
+ typedef std::deque< schedulable > wqueue_t;
+
+ detail::fiber_base::ptr_t active_fiber_;
+ wqueue_t wqueue_;
+ detail::ws_queue rqueue_;
+
+public:
+ round_robin_ws() BOOST_NOEXCEPT;
+
+ ~round_robin_ws() BOOST_NOEXCEPT;
+
+ void spawn( detail::fiber_base::ptr_t const&);
+
+ void priority( detail::fiber_base::ptr_t const&, int);
+
+ void join( detail::fiber_base::ptr_t const&);
+
+ detail::fiber_base::ptr_t active() BOOST_NOEXCEPT
+ { return active_fiber_; }
+
+ bool run();
+
+ void wait( unique_lock< detail::spinlock > &);
+ bool wait_until( clock_type::time_point const&,
+ unique_lock< detail::spinlock > &);
+
+ void yield();
+
+ fiber steal_from();
+ void migrate_to( fiber const&);
+};
+
+}}
+
+# if defined(BOOST_MSVC)
+# pragma warning(pop)
+# endif
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_SUFFIX
+#endif
+
+#endif // BOOST_FIBERS_DEFAULT_SCHEDULER_WS_H
diff --git a/src/round_robin.cpp b/src/round_robin.cpp
index b7fe52a5..df48f684 100644
--- a/src/round_robin.cpp
+++ b/src/round_robin.cpp
@@ -86,7 +86,7 @@ round_robin::run()
if ( f->interruption_requested() )
f->set_ready();
if ( f->is_ready() )
- rqueue_.push( f);
+ rqueue_.push_back( f);
else wqueue.push_back( s);
}
// exchange local with global waiting queue
@@ -97,7 +97,9 @@ round_robin::run()
detail::fiber_base::ptr_t f;
do
{
- if ( ! rqueue_.try_pop( f) ) return false;
+ if ( rqueue_.empty() ) return false;
+ f.swap( rqueue_.front() );
+ rqueue_.pop_front();
if ( f->is_ready() ) break;
else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue");
}
@@ -210,22 +212,6 @@ round_robin::priority( detail::fiber_base::ptr_t const& f, int prio)
f->priority( prio);
}
-fiber
-round_robin::steal_from()
-{
- detail::fiber_base::ptr_t f;
- if ( ! rqueue_.try_pop( f) ) return fiber();
- return fiber( f);
-}
-
-void
-round_robin::migrate_to( fiber const& f)
-{
- BOOST_ASSERT( f);
-
- spawn( detail::scheduler::extract( f) );
-}
-
}}
#ifdef BOOST_HAS_ABI_HEADERS
diff --git a/src/round_robin_ws.cpp b/src/round_robin_ws.cpp
new file mode 100644
index 00000000..9de16028
--- /dev/null
+++ b/src/round_robin_ws.cpp
@@ -0,0 +1,233 @@
+
+// Copyright Oliver Kowalke 2013.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include "boost/fiber/round_robin_ws.hpp"
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "boost/fiber/detail/scheduler.hpp"
+#include "boost/fiber/exceptions.hpp"
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_PREFIX
+#endif
+
+namespace boost {
+namespace fibers {
+
+round_robin_ws::round_robin_ws() BOOST_NOEXCEPT :
+ active_fiber_(),
+ wqueue_(),
+ rqueue_()
+{}
+
+round_robin_ws::~round_robin_ws() BOOST_NOEXCEPT
+{
+ // fibers will be destroyed (stack-unwinding)
+ // if last reference goes out-of-scope
+ // therefore destructing wqueue_ && rqueue_
+ // will destroy the fibers in this scheduler
+ // if not referenced on other places
+ if ( detail::scheduler::instance() == this)
+ detail::scheduler::replace( 0);
+}
+
+void
+round_robin_ws::spawn( detail::fiber_base::ptr_t const& f)
+{
+ BOOST_ASSERT( f);
+ BOOST_ASSERT( f->is_ready() );
+
+ // store active fiber in local var
+ detail::fiber_base::ptr_t tmp = active_fiber_;
+ // assign new fiber to active fiber
+ active_fiber_ = f;
+ // set active fiber to state_running
+ active_fiber_->set_running();
+ // resume active fiber
+ active_fiber_->resume();
+ // fiber is resumed
+
+ BOOST_ASSERT( f == active_fiber_);
+ // reset active fiber to previous
+ active_fiber_ = tmp;
+}
+
+bool
+round_robin_ws::run()
+{
+ wqueue_t wqueue;
+ // move all fibers witch are ready (state_ready)
+ // from waiting-queue to the runnable-queue
+ BOOST_FOREACH( schedulable const& s, wqueue_)
+ {
+ detail::fiber_base::ptr_t f( s.f);
+
+ BOOST_ASSERT( ! f->is_running() );
+ BOOST_ASSERT( ! f->is_terminated() );
+
+ // set fiber to state_ready if dead-line was reached
+ if ( s.tp <= clock_type::now() )
+ f->set_ready();
+ // set fiber to state_ready if interruption was requested
+ if ( f->interruption_requested() )
+ f->set_ready();
+ if ( f->is_ready() )
+ rqueue_.push( f);
+ else wqueue.push_back( s);
+ }
+ // exchange local with global waiting queue
+ wqueue_.swap( wqueue);
+
+ // pop new fiber from ready-queue which is not complete
+ // (example: fiber in ready-queue could be canceled by active-fiber)
+ detail::fiber_base::ptr_t f;
+ do
+ {
+ if ( ! rqueue_.try_pop( f) ) return false;
+ if ( f->is_ready() ) break;
+ else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue");
+ }
+ while ( true);
+
+ // resume fiber
+ spawn( f);
+
+ return true;
+}
+
+void
+round_robin_ws::wait( unique_lock< detail::spinlock > & lk)
+{ wait_until( clock_type::time_point( (clock_type::duration::max)() ), lk); }
+
+bool
+round_robin_ws::wait_until( clock_type::time_point const& timeout_time,
+ unique_lock< detail::spinlock > & lk)
+{
+ clock_type::time_point start( clock_type::now() );
+
+ BOOST_ASSERT( active_fiber_);
+ BOOST_ASSERT( active_fiber_->is_running() );
+
+ // set active fiber to state_waiting
+ active_fiber_->set_waiting();
+ // release lock
+ lk.unlock();
+ // push active fiber to wqueue_
+ wqueue_.push_back( schedulable( active_fiber_, timeout_time) );
+ // store active fiber in local var
+ detail::fiber_base::ptr_t tmp = active_fiber_;
+ // suspend active fiber
+ active_fiber_->suspend();
+ // fiber is resumed
+
+ BOOST_ASSERT( tmp == detail::scheduler::instance()->active() );
+ BOOST_ASSERT( tmp->is_running() );
+
+ return clock_type::now() < timeout_time;
+}
+
+void
+round_robin_ws::yield()
+{
+ BOOST_ASSERT( active_fiber_);
+ BOOST_ASSERT( active_fiber_->is_running() );
+
+ // set active fiber to state_waiting
+ active_fiber_->set_ready();
+ // push active fiber to wqueue_
+ wqueue_.push_back( schedulable( active_fiber_) );
+ // store active fiber in local var
+ detail::fiber_base::ptr_t tmp = active_fiber_;
+ // suspend acitive fiber
+ active_fiber_->suspend();
+ // fiber is resumed
+
+ BOOST_ASSERT( tmp == detail::scheduler::instance()->active() );
+ BOOST_ASSERT( tmp->is_running() );
+}
+
+void
+round_robin_ws::join( detail::fiber_base::ptr_t const& f)
+{
+ BOOST_ASSERT( f);
+ BOOST_ASSERT( f != active_fiber_);
+
+ if ( active_fiber_)
+ {
+ // set active fiber to state_waiting
+ active_fiber_->set_waiting();
+ // push active fiber to wqueue_
+ wqueue_.push_back( schedulable( active_fiber_) );
+ // add active fiber to joinig-list of f
+ if ( ! f->join( active_fiber_) )
+ // f must be already terminated therefore we set
+ // active fiber to state_ready
+ // FIXME: better state_running and no suspend
+ active_fiber_->set_ready();
+ // store active fiber in local var
+ detail::fiber_base::ptr_t tmp = active_fiber_;
+ // suspend fiber until f terminates
+ active_fiber_->suspend();
+ // fiber is resumed by f
+
+ BOOST_ASSERT( tmp == detail::scheduler::instance()->active() );
+ BOOST_ASSERT( tmp->is_running() );
+ }
+ else
+ {
+ while ( ! f->is_terminated() )
+ {
+ // yield this thread if scheduler did not
+ // resumed some fibers in the previous round
+ if ( ! run() ) this_thread::yield();
+ }
+ }
+
+ BOOST_ASSERT( f->is_terminated() );
+}
+
+void
+round_robin_ws::priority( detail::fiber_base::ptr_t const& f, int prio)
+{
+ BOOST_ASSERT( f);
+
+ // set only priority to fiber
+ // round-robin does not respect priorities
+ f->priority( prio);
+}
+
+fiber
+round_robin_ws::steal_from()
+{
+ detail::fiber_base::ptr_t f;
+ if ( ! rqueue_.try_pop( f) ) return fiber();
+ return fiber( f);
+}
+
+void
+round_robin_ws::migrate_to( fiber const& f)
+{
+ BOOST_ASSERT( f);
+
+ spawn( detail::scheduler::extract( f) );
+}
+
+}}
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_SUFFIX
+#endif
diff --git a/test/test_migration.cpp b/test/test_migration.cpp
index 5b9205b5..e9d9534e 100644
--- a/test/test_migration.cpp
+++ b/test/test_migration.cpp
@@ -21,7 +21,7 @@
#include
boost::atomic< bool > fini( false);
-boost::fibers::algorithm * other_ds = 0;
+boost::fibers::round_robin_ws * other_ds = 0;
boost::fibers::future< int > fibonacci( int);
@@ -55,7 +55,7 @@ int create_fiber( int n)
return fibonacci( n).get();
}
-void fn_create_fibers( boost::fibers::round_robin * ds, boost::barrier * b)
+void fn_create_fibers( boost::fibers::round_robin_ws * ds, boost::barrier * b)
{
boost::fibers::set_scheduling_algorithm( ds);
@@ -68,10 +68,10 @@ void fn_create_fibers( boost::fibers::round_robin * ds, boost::barrier * b)
fini = true;
}
-void fn_migrate_fibers( boost::fibers::round_robin * other_ds, boost::barrier * b, int * count)
+void fn_migrate_fibers( boost::fibers::round_robin_ws * other_ds, boost::barrier * b, int * count)
{
BOOST_ASSERT( other_ds);
- boost::fibers::round_robin ds;
+ boost::fibers::round_robin_ws ds;
boost::fibers::set_scheduling_algorithm( & ds);
b->wait();
@@ -97,7 +97,7 @@ void test_migrate_fiber()
fini = false;
int count = 0;
- boost::fibers::round_robin * ds = new boost::fibers::round_robin();
+ boost::fibers::round_robin_ws * ds = new boost::fibers::round_robin_ws();
boost::barrier b( 2);
boost::thread t1( boost::bind( fn_create_fibers, ds, &b) );
boost::thread t2( boost::bind( fn_migrate_fibers, ds, &b, &count) );