2
0
mirror of https://github.com/boostorg/thread.git synced 2026-02-08 23:22:13 +00:00

Compare commits

...

33 Commits

Author SHA1 Message Date
Vicente J. Botet Escriba
bde5a1ef55 merge from develop. 2015-05-28 19:59:13 +02:00
Vicente J. Botet Escriba
a741bd1bba Merge branch 'develop' into fix/make_executors_copyable 2015-04-18 07:11:12 +02:00
Vicente J. Botet Escriba
caaa7b4cc2 store executor by value. 2015-03-08 23:30:41 +01:00
Vicente J. Botet Escriba
9a05211faa fix issue with c++03 compilers. Pass Executors by const& instead of by &. 2015-03-04 07:59:27 +01:00
Vicente J. Botet Escriba
7ffcec448c uncomment more tests. 2015-03-03 19:29:50 +01:00
Vicente J. Botet Escriba
62bffed368 More fixes to make executor copyable. 2015-03-03 08:27:17 +01:00
Vicente J. Botet Escriba
5a1de7a722 ensure that generic executors are copyable. 2015-03-03 00:50:48 +01:00
Vicente J. Botet Escriba
33ee3445af refactor basic_thread_pool. It doesn't works yet for at_thread_exit. Needs to replace function<void(basic_thread_pool)>. 2015-03-01 18:00:58 +01:00
Vicente J. Botet Escriba
8511771816 Merge branch 'develop' into fix/make_executors_copyable 2015-03-01 01:40:23 +01:00
Vicente J. Botet Escriba
7dbd04197d Make scheduled_thread_pool copyable. 2015-02-28 19:01:45 +01:00
Vicente J. Botet Escriba
a53f31fb99 Store the Executor in scheduling_adaptor. This class must be finished as it doesn't make use of the executor :(. 2015-02-28 17:07:57 +01:00
Vicente J. Botet Escriba
b2b8684d0c make scheduled_thread_pool design closer to basic_thread_pool. 2015-02-28 17:04:17 +01:00
Vicente J. Botet Escriba
df14c8ac18 fix the move(w) on scheduler and store copies of the Executors. 2015-02-28 16:29:00 +01:00
Vicente J. Botet Escriba
6e5a46c16f merge from develop. 2015-02-28 15:32:18 +01:00
Vicente J. Botet Escriba
264ed4c308 move the work parameter. 2015-02-28 10:44:44 +01:00
Vicente J. Botet Escriba
65c4693c87 Add missing push(movable&&) and Run some failing tests that work when BOOST_NO_CXX11_RVALUE_REFERENCES is not defined. 2015-02-28 10:41:20 +01:00
Vicente J. Botet Escriba
05d6eca09d Run some failing tests that work when BOOST_NO_CXX11_RVALUE_REFERENCES is not defined. 2015-02-28 10:38:15 +01:00
Vicente J. Botet Escriba
c192777aef fix missing include in caller_context.hpp and let the possibility to dump function at compile time. 2015-02-28 09:53:09 +01:00
Vicente J. Botet Escriba
fdd1db970d cleanup work and store by value scheduler. 2015-02-28 09:06:57 +01:00
Vicente J. Botet Escriba
3bc5fb1725 fix a lot of things for c++11 compilers. There is yet a lot to do :( 2015-02-26 08:16:11 +01:00
Vicente J. Botet Escriba
9481562b5c update executors doc removing the move-only constraint. 2015-02-21 16:51:55 +01:00
Vicente J. Botet Escriba
e44b5309ae rename serial_executors to generic_serial_executors and let serial_executor be the template form. 2015-02-21 16:17:11 +01:00
Vicente J. Botet Escriba
eecf8f6c36 Allow polymorphic executors to be copiable. 2015-02-21 14:29:51 +01:00
Vicente J. Botet Escriba
532d215de9 Make serial_executor_cont copyable, and fix it: reschedule_until and try_executing_one must return false, as a serial executor can not re-enter. 2015-02-21 12:26:40 +01:00
Vicente J. Botet Escriba
71bce54c71 fix serial_exeutor: reschedule_until and try_executing_one must return false, as a serial executor can not re-enter. 2015-02-21 12:25:29 +01:00
Vicente J. Botet Escriba
41bde57707 Make scheduler copyable. 2015-02-21 11:21:20 +01:00
Vicente J. Botet Escriba
ff7e394084 remove last sleep as now the tasks block the executors shared state lifetime as it is copied. 2015-02-21 11:20:42 +01:00
Vicente J. Botet Escriba
81f67eeb54 Change copyright date. 2015-02-21 11:18:08 +01:00
Vicente J. Botet Escriba
a4827a31f3 Change copyright date. 2015-02-21 11:16:19 +01:00
Vicente J. Botet Escriba
cd31e9c34f Make executor_adaptor copyable. 2015-02-21 01:00:12 +01:00
Vicente J. Botet Escriba
9492bcd485 Make serial_executor copyable. Replace generic_executor_ref by generic_executor. 2015-02-20 22:26:12 +01:00
Vicente J. Botet Escriba
ff9457e79c make basic_thread_pool copyable. 2015-02-20 20:47:30 +01:00
Vicente J. Botet Escriba
de580474a3 make inline_executor, loop_executor and thread_executor copyable. 2015-02-20 19:11:08 +01:00
29 changed files with 2723 additions and 715 deletions

View File

@@ -1,5 +1,5 @@
[/ [/
/ Copyright (c) 2014 Vicente J. Botet Escriba / Copyright (c) 2014-2015 Vicente J. Botet Escriba
/ /
/ Distributed under the Boost Software License, Version 1.0. (See accompanying / Distributed under the Boost Software License, Version 1.0. (See accompanying
/ file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) / file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -174,7 +174,7 @@ This has several advantages:
* The scheduled operations are available for all the executors via wrappers. * The scheduled operations are available for all the executors via wrappers.
* The template functions could accept any chrono `time_point` and `duration` respectively as we are not working with virtual functions. * The template functions could accept any chrono `time_point` and `duration` respectively as we are not working with virtual functions.
In order to manage with all the clocks, this library propose generic solution. `scheduler<Clock>` know how to manage with the `submit_at`/`submit_after` `Clock::time_point`/`Clock::duration` tasks. Note that the durations on different clocks differ. In order to manage with all the clocks, this library propose a generic solution. `scheduler<Clock>` know how to manage with the `submit_at`/`submit_after` `Clock::time_point`/`Clock::duration` tasks. Note that the durations on different clocks differ.
[heading Not Handled Exceptions] [heading Not Handled Exceptions]
As in N3785 and based on the same design decision than `std`/`boost::thread` if a user closure throws an exception, the executor must call the `std::terminate` function. As in N3785 and based on the same design decision than `std`/`boost::thread` if a user closure throws an exception, the executor must call the `std::terminate` function.
@@ -506,9 +506,6 @@ Executor abstract base class.
public: public:
typedef boost::work work; typedef boost::work work;
executor(executor const&) = delete;
executor& operator=(executor const&) = delete;
executor(); executor();
virtual ~executor() {}; virtual ~executor() {};
@@ -573,9 +570,6 @@ Polymorphic adaptor of a model of Executor to an executor.
public: public:
typedef executor::work work; typedef executor::work work;
executor_adaptor(executor_adaptor const&) = delete;
executor_adaptor& operator=(executor_adaptor const&) = delete;
template <typename ...Args> template <typename ...Args>
executor_adaptor(Args&& ... args); executor_adaptor(Args&& ... args);
@@ -641,19 +635,23 @@ Polymorphic adaptor of a model of Executor to an executor.
[/////////////////////////////////] [/////////////////////////////////]
[section:generic_executor_ref Class `generic_executor_ref`] [section:generic_executor_ref Class `generic_executor_ref`]
Executor abstract base class. Type erased executor class.
#include <boost/thread/generic_executor_ref.hpp> #include <boost/thread/generic_executor_ref.hpp>
namespace boost { namespace boost {
class generic_executor_ref class generic_executor_ref
{ {
public: public:
generic_executor_ref(generic_executor_ref const&);
generic_executor_ref& operator=(generic_executor_ref const&);
template <class Executor> template <class Executor>
generic_executor_ref(Executor& ex); generic_executor_ref(Executor& ex);
generic_executor_ref() {}; generic_executor_ref() = delete;
generic_executor_ref(generic_executor_ref const&) = default;
generic_executor_ref(generic_executor_ref &&) = default;
generic_executor_ref& operator=(generic_executor_ref const&) = default;
generic_executor_ref& operator=(generic_executor_ref &&) = default;
executor& underlying_executor() noexcept;
void close() = 0; void close() = 0;
bool closed() = 0; bool closed() = 0;
@@ -669,6 +667,44 @@ Executor abstract base class.
[endsect] [endsect]
[/////////////////////////////////]
[section:generic_executor Class `generic_executor`]
Type erased executor class.
#include <boost/thread/generic_executor.hpp>
namespace boost {
class generic_executor
{
public:
template <class Executor>
generic_executor(Executor& ex);
generic_executor() = delete;
generic_executor(generic_executor const&) = default;
generic_executor(generic_executor &&) = default;
generic_executor& operator=(generic_executor const&) = default;
generic_executor& operator=(generic_executor &&) = default;
executor& underlying_executor() noexcept;
void close() = 0;
bool closed() = 0;
template <typename Closure>
void submit(Closure&& closure);
virtual bool try_executing_one() = 0;
template <typename Pred>
bool reschedule_until(Pred const& pred);
};
}
[endsect]
[//////////////////////////////////////////////////////////] [//////////////////////////////////////////////////////////]
[section: scheduler Template Class `scheduler `] [section: scheduler Template Class `scheduler `]
@@ -684,9 +720,6 @@ Scheduler providing time related functions. Note that `scheduler` is not an Exec
using work = boost::function<void()> ; using work = boost::function<void()> ;
using clock = Clock; using clock = Clock;
scheduler(scheduler const&) = delete;
scheduler& operator=(scheduler const&) = delete;
scheduler(); scheduler();
~scheduler(); ~scheduler();
@@ -733,7 +766,7 @@ Scheduler providing time related functions. Note that `scheduler` is not an Exec
[[Effects:] [Destroys the scheduler.]] [[Effects:] [Destroys the scheduler.]]
[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]] [[Synchronization:] [The completion of all the closures happen before the completion of the destructor.]]
] ]
@@ -1339,10 +1372,6 @@ A serial executor ensuring that there are no two work units that executes concur
class serial_executor class serial_executor
{ {
public: public:
serial_executor(serial_executor const&) = delete;
serial_executor& operator=(serial_executor const&) = delete;
template <class Executor>
serial_executor(Executor& ex); serial_executor(Executor& ex);
Executor& underlying_executor() noexcept; Executor& underlying_executor() noexcept;
@@ -1393,7 +1422,7 @@ A serial executor ensuring that there are no two work units that executes concur
[/////////////////////////////////////] [/////////////////////////////////////]
[section:underlying_executor Function member `underlying_executor()`] [section:underlying_executor Function member `underlying_executor()`]
generic_executor_ref& underlying_executor() noexcept; Executor& underlying_executor() noexcept;
[variablelist [variablelist
@@ -1418,13 +1447,10 @@ A serial executor ensuring that there are no two work units that executes concur
class generic_serial_executor class generic_serial_executor
{ {
public: public:
generic_serial_executor(generic_serial_executor const&) = delete;
generic_serial_executor& operator=(generic_serial_executor const&) = delete;
template <class Executor> template <class Executor>
generic_serial_executor(Executor& ex); generic_serial_executor(Executor& ex);
generic_executor_ref& underlying_executor() noexcept; generic_executor& underlying_executor() noexcept;
void close(); void close();
bool closed(); bool closed();
@@ -1433,6 +1459,7 @@ A serial executor ensuring that there are no two work units that executes concur
void submit(Closure&& closure); void submit(Closure&& closure);
bool try_executing_one(); bool try_executing_one();
template <typename Pred> template <typename Pred>
bool reschedule_until(Pred const& pred); bool reschedule_until(Pred const& pred);
@@ -1456,7 +1483,7 @@ A serial executor ensuring that there are no two work units that executes concur
[endsect] [endsect]
[/////////////////////////////////////] [/////////////////////////////////////]
[section:destructor Destructor `~serial_executor()`] [section:destructor Destructor `~generic_serial_executor()`]
~generic_serial_executor(); ~generic_serial_executor();
@@ -1468,6 +1495,80 @@ A serial executor ensuring that there are no two work units that executes concur
] ]
[endsect]
[/////////////////////////////////////]
[section:underlying_executor Function member `underlying_executor()`]
generic_executor& underlying_executor() noexcept;
[variablelist
[[Return:] [The underlying executor instance. ]]
]
[endsect]
[endsect]
[//////////////////////////////////////////////////////////]
[section:serial_executor_cont Template Class `serial_executor_cont`]
A serial executor ensuring that there are no two work units that executes concurrently.
#include <boost/thread/serial_executor_cont.hpp>
namespace boost {
template <class Executor>
class serial_executor_cont
{
public:
serial_executor_cont(Executor& ex);
Executor& underlying_executor() noexcept;
void close();
bool closed();
template <typename Closure>
void submit(Closure&& closure);
bool try_executing_one();
template <typename Pred>
bool reschedule_until(Pred const& pred);
};
}
[/////////////////////////////////////]
[section:constructor Constructor `serial_executor_cont(Executor&)`]
template <class Executor>
serial_executor_cont(Executor& ex);
[variablelist
[[Effects:] [Constructs a serial_executor_cont. ]]
[[Throws:] [Nothing. ]]
]
[endsect]
[/////////////////////////////////////]
[section:destructor Destructor `~serial_executor_cont()`]
~serial_executor_cont();
[variablelist
[[Effects:] [Destroys the serial_executor.]]
[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]]
]
[endsect] [endsect]
[/////////////////////////////////////] [/////////////////////////////////////]
[section:underlying_executor Function member `underlying_executor()`] [section:underlying_executor Function member `underlying_executor()`]
@@ -1478,6 +1579,8 @@ A serial executor ensuring that there are no two work units that executes concur
[[Return:] [The underlying executor instance. ]] [[Return:] [The underlying executor instance. ]]
[[Throws:] [Nothing.]]
] ]
@@ -1485,6 +1588,82 @@ A serial executor ensuring that there are no two work units that executes concur
[endsect] [endsect]
[//////////////////////////////////////////////////////////]
[section:generic_serial_cont_executor Class `generic_serial_cont_executor`]
A serial executor ensuring that there are no two work units that executes concurrently.
#include <boost/thread/generic_serial_cont_executor.hpp>
namespace boost {
class generic_serial_cont_executor
{
public:
template <class Executor>
generic_serial_cont_executor(Executor& ex);
generic_executor& underlying_executor() noexcept;
void close();
bool closed();
template <typename Closure>
void submit(Closure&& closure);
bool try_executing_one();
template <typename Pred>
bool reschedule_until(Pred const& pred);
};
}
[/////////////////////////////////////]
[section:constructor Constructor `generic_serial_cont_executor(Executor&)`]
template <class Executor>
generic_serial_cont_executor(Executor& ex);
[variablelist
[[Effects:] [Constructs a generic_serial_cont_executor. ]]
[[Throws:] [Nothing. ]]
]
[endsect]
[/////////////////////////////////////]
[section:destructor Destructor `~generic_serial_cont_executor()`]
~generic_serial_cont_executor();
[variablelist
[[Effects:] [Destroys the generic_serial_cont_executor.]]
[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]]
]
[endsect]
[/////////////////////////////////////]
[section:underlying_executor Function member `underlying_executor()`]
generic_executor& underlying_executor() noexcept;
[variablelist
[[Return:] [The underlying executor instance. ]]
]
[endsect]
[endsect]
[//////////////////////////////////////////////////////////] [//////////////////////////////////////////////////////////]
[section:inline_executor Class `inline_executor`] [section:inline_executor Class `inline_executor`]
@@ -1496,10 +1675,8 @@ A serial executor ensuring that there are no two work units that executes concur
class inline_executor class inline_executor
{ {
public: public:
inline_executor(inline_executor const&) = delete;
inline_executor& operator=(inline_executor const&) = delete;
inline_executor(); inline_executor();
~inline_executor();
void close(); void close();
bool closed(); bool closed();
@@ -1561,9 +1738,6 @@ A thread pool with up to a fixed number of threads.
{ {
public: public:
basic_thread_pool(basic_thread_pool const&) = delete;
basic_thread_pool& operator=(basic_thread_pool const&) = delete;
basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()); basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency());
template <class AtThreadEntry> template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, AtThreadEntry at_thread_entry); basic_thread_pool( unsigned const thread_count, AtThreadEntry at_thread_entry);
@@ -1623,9 +1797,6 @@ A thread_executor with a threads for each task.
{ {
public: public:
thread_executor(thread_executor const&) = delete;
thread_executor& operator=(thread_executor const&) = delete;
thread_executor(); thread_executor();
template <class AtThreadEntry> template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, AtThreadEntry at_thread_entry); basic_thread_pool( unsigned const thread_count, AtThreadEntry at_thread_entry);
@@ -1681,9 +1852,6 @@ A user scheduled executor.
{ {
public: public:
loop_executor(loop_executor const&) = delete;
loop_executor& operator=(loop_executor const&) = delete;
loop_executor(); loop_executor();
~loop_executor(); ~loop_executor();

View File

@@ -17,7 +17,7 @@
#include <boost/thread/caller_context.hpp> #include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp> #include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/loop_executor.hpp> #include <boost/thread/executors/loop_executor.hpp>
#include <boost/thread/executors/serial_executor.hpp> #include <boost/thread/executors/generic_serial_executor.hpp>
#include <boost/thread/executors/inline_executor.hpp> #include <boost/thread/executors/inline_executor.hpp>
#include <boost/thread/executors/thread_executor.hpp> #include <boost/thread/executors/thread_executor.hpp>
#include <boost/thread/executors/executor.hpp> #include <boost/thread/executors/executor.hpp>
@@ -71,7 +71,7 @@ void submit_some(boost::executor& tp)
} }
void at_th_entry(boost::basic_thread_pool& ) void at_th_entry(boost::basic_thread_pool )
{ {
} }
@@ -82,6 +82,10 @@ int test_executor_adaptor()
{ {
try try
{ {
{
boost::basic_thread_pool e1;
boost::basic_thread_pool e2 = e1;
}
{ {
boost::executor_adaptor < boost::basic_thread_pool > ea(4); boost::executor_adaptor < boost::basic_thread_pool > ea(4);
submit_some( ea); submit_some( ea);
@@ -104,25 +108,65 @@ int test_executor_adaptor()
submit_some(ea); submit_some(ea);
} }
// std::cout << BOOST_CONTEXTOF << std::endl; // std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::loop_executor e1;
boost::loop_executor e2 = e1;
boost::executor_adaptor < boost::loop_executor > ea2(e2);
submit_some( ea2);
ea2.underlying_executor().run_queued_closures();
}
{ {
boost::executor_adaptor < boost::loop_executor > ea2; boost::executor_adaptor < boost::loop_executor > ea2;
submit_some( ea2); submit_some( ea2);
ea2.underlying_executor().run_queued_closures(); ea2.underlying_executor().run_queued_closures();
} }
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
// std::cout << BOOST_CONTEXTOF << std::endl; // std::cout << BOOST_CONTEXTOF << std::endl;
{ {
boost::executor_adaptor < boost::basic_thread_pool > ea1(4); boost::basic_thread_pool tp;
boost::executor_adaptor < boost::serial_executor > ea2(ea1); boost::generic_serial_executor e1(tp);
boost::generic_serial_executor e2 = e1;
}
{
boost::basic_thread_pool ea1(4);
boost::generic_serial_executor ea2(ea1);
boost::executor_adaptor < boost::generic_serial_executor > ea3(ea2);
submit_some(ea3);
}
{
boost::basic_thread_pool ea1(4);
boost::generic_serial_executor ea2(ea1);
boost::executor_adaptor < boost::generic_serial_executor > ea3(ea2);
submit_some(ea3);
}
//#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES)
{
boost::basic_thread_pool ea1(4);
boost::executor_adaptor < boost::generic_serial_executor > ea2(ea1);
submit_some(ea2); submit_some(ea2);
} }
#endif //#endif
// std::cout << BOOST_CONTEXTOF << std::endl; // std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::inline_executor e1;
boost::inline_executor e2 = e1;
boost::executor_adaptor < boost::inline_executor > ea2(e2);
submit_some(ea2);
}
{ {
boost::executor_adaptor < boost::inline_executor > ea1; boost::executor_adaptor < boost::inline_executor > ea1;
submit_some(ea1); submit_some(ea1);
} }
// std::cout << BOOST_CONTEXTOF << std::endl; // std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::thread_executor e1;
boost::thread_executor e2 = e1;
}
{
boost::thread_executor e1;
boost::executor_adaptor < boost::generic_executor > ea2(e1);
submit_some(ea2);
}
{ {
boost::executor_adaptor < boost::thread_executor > ea1; boost::executor_adaptor < boost::thread_executor > ea1;
submit_some(ea1); submit_some(ea1);

View File

@@ -0,0 +1,184 @@
// Copyright (C) 2014 Vicente Botet
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif
#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS
//#define BOOST_THREAD_USES_LOG
#define BOOST_THREAD_USES_LOG_THREAD_ID
#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
#include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/loop_executor.hpp>
#include <boost/thread/executors/generic_serial_executor.hpp>
#include <boost/thread/executors/serial_executor.hpp>
#include <boost/thread/executors/inline_executor.hpp>
#include <boost/thread/executors/thread_executor.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/thread/executors/generic_executor.hpp>
#include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/assert.hpp>
#include <string>
#include <iostream>
void p1()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
//boost::this_thread::sleep_for(boost::chrono::milliseconds(200));
}
void p2()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
//boost::this_thread::sleep_for(boost::chrono::seconds(10));
}
int f1()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
return 1;
}
int f2(int i)
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(2));
return i + 1;
}
void submit_some(boost::generic_executor tp)
{
for (int i = 0; i < 3; ++i) {
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
tp.submit(&p1);
}
}
template < class Executor>
void submit_some2(Executor& tp)
{
for (int i = 0; i < 3; ++i) {
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
tp.submit(&p1);
}
}
template <class Executor>
void submit_some3(boost::serial_executor<Executor>& tp)
{
for (int i = 0; i < 3; ++i) {
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
tp.submit(&p1);
}
}
void at_th_entry(boost::basic_thread_pool)
{
}
int test_generic_executor()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
{
try
{
{
boost::basic_thread_pool ea(4);
submit_some( ea);
{
boost::future<int> t1 = boost::async(ea, &f1);
boost::future<int> t2 = boost::async(ea, &f1);
// std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
}
submit_some(ea);
{
boost::basic_thread_pool ea3(1);
boost::future<int> t1 = boost::async(ea3, &f1);
boost::future<int> t2 = boost::async(ea3, &f1);
//boost::future<int> t2 = boost::async(ea3, f2, 1); // todo this doesn't compiles yet on C++11
//boost::future<int> t2 = boost::async(ea3, boost::bind(f2, 1)); // todo this doesn't compiles yet on C++98
// std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
}
submit_some(ea);
}
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::loop_executor ea2;
submit_some( ea2);
ea2.run_queued_closures();
}
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
// // std::cout << BOOST_CONTEXTOF << std::endl;
// {
// boost::basic_thread_pool ea1(4);
// boost::generic_serial_executor ea2(ea1);
// submit_some(ea2);
// }
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea1(4);
boost::serial_executor<boost::basic_thread_pool> ea2(ea1);
submit_some3(ea2);
}
#endif
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::inline_executor ea1;
submit_some(ea1);
}
// std::cout << BOOST_CONTEXTOF << std::endl;
{
//boost::thread_executor ea1;
//submit_some(ea1);
}
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea(4, at_th_entry);
boost::future<int> t1 = boost::async(ea, &f1);
// std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
}
}
catch (std::exception& ex)
{
std::cout << "ERROR= " << ex.what() << "" << std::endl;
return 1;
}
catch (...)
{
std::cout << " ERROR= exception thrown" << std::endl;
return 2;
}
}
// std::cout << BOOST_CONTEXTOF << std::endl;
return 0;
}
int main()
{
return test_generic_executor();
}

View File

@@ -17,11 +17,12 @@
#include <boost/thread/caller_context.hpp> #include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp> #include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/loop_executor.hpp> #include <boost/thread/executors/loop_executor.hpp>
#include <boost/thread/executors/serial_executor.hpp> #include <boost/thread/executors/generic_serial_executor.hpp>
#include <boost/thread/executors/inline_executor.hpp> #include <boost/thread/executors/inline_executor.hpp>
#include <boost/thread/executors/thread_executor.hpp> #include <boost/thread/executors/thread_executor.hpp>
#include <boost/thread/executors/executor.hpp> #include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/executor_adaptor.hpp> #include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/thread/executors/generic_executor_ref.hpp>
#include <boost/thread/executor.hpp> #include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp> #include <boost/thread/future.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
@@ -64,7 +65,7 @@ void submit_some(boost::generic_executor_ref tp)
} }
void at_th_entry(boost::basic_thread_pool& ) void at_th_entry(boost::basic_thread_pool)
{ {
} }
@@ -108,7 +109,7 @@ int test_generic_executor_ref()
// std::cout << BOOST_CONTEXTOF << std::endl; // std::cout << BOOST_CONTEXTOF << std::endl;
{ {
boost::basic_thread_pool ea1(4); boost::basic_thread_pool ea1(4);
boost::serial_executor ea2(ea1); boost::generic_serial_executor ea2(ea1);
submit_some(ea2); submit_some(ea2);
} }
#endif #endif

View File

@@ -0,0 +1,117 @@
// Copyright (C) 2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif
#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS
//#define BOOST_THREAD_USES_LOG
#define BOOST_THREAD_USES_LOG_THREAD_ID
#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
#include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/generic_serial_executor.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/assert.hpp>
#include <string>
#include <iostream>
void p1()
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(30));
std::cout << BOOST_CONTEXTOF << std::endl;
}
void p2()
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(10));
std::cout << BOOST_CONTEXTOF << std::endl;
}
int f1()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
return 1;
}
int f2(int i)
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(2));
return i + 1;
}
void submit_some(boost::generic_serial_executor& tp)
{
//std::cout << BOOST_CONTEXTOF << std::endl;
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p1);
}
//std::cout << BOOST_CONTEXTOF << std::endl;
}
void at_th_entry(boost::basic_thread_pool )
{
}
int test_executor_adaptor()
{
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool tp;
boost::generic_serial_executor e1(tp);
boost::generic_serial_executor e2 = e1;
}
std::cout << BOOST_CONTEXTOF << std::endl;
{
try
{
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea1(4);
boost::generic_serial_executor ea2(ea1);
submit_some(ea2);
}
#endif
// std::cout << BOOST_CONTEXTOF << std::endl;
}
catch (std::exception& ex)
{
std::cout << "ERROR= " << ex.what() << "" << std::endl;
return 1;
}
catch (...)
{
std::cout << " ERROR= exception thrown" << std::endl;
return 2;
}
}
// std::cout << BOOST_CONTEXTOF << std::endl;
return 0;
}
int main()
{
return test_executor_adaptor();
}

View File

@@ -0,0 +1,118 @@
// Copyright (C) 2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif
#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS
//#define BOOST_THREAD_USES_LOG
#define BOOST_THREAD_USES_LOG_THREAD_ID
#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
#include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/generic_serial_executor_cont.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/assert.hpp>
#include <string>
#include <iostream>
void p1()
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(30));
std::cout << BOOST_CONTEXTOF << std::endl;
}
void p2()
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(10));
std::cout << BOOST_CONTEXTOF << std::endl;
}
int f1()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
return 1;
}
int f2(int i)
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(2));
return i + 1;
}
void submit_some(boost::generic_serial_executor_cont& tp)
{
//std::cout << BOOST_CONTEXTOF << std::endl;
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p1);
}
//std::cout << BOOST_CONTEXTOF << std::endl;
}
void at_th_entry(boost::basic_thread_pool)
{
}
int test_executor_adaptor()
{
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool tp;
boost::generic_serial_executor_cont e1(tp);
boost::generic_serial_executor_cont e2 = e1;
}
// std::cout << BOOST_CONTEXTOF << std::endl;
{
try
{
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea1(4);
boost::generic_serial_executor_cont ea2(ea1);
submit_some(ea2);
}
#endif
// std::cout << BOOST_CONTEXTOF << std::endl;
}
catch (std::exception& ex)
{
std::cout << "ERROR= " << ex.what() << "" << std::endl;
return 1;
}
catch (...)
{
std::cout << " ERROR= exception thrown" << std::endl;
return 2;
}
}
// std::cout << BOOST_CONTEXTOF << std::endl;
return 0;
}
int main()
{
return test_executor_adaptor();
}

View File

@@ -51,8 +51,8 @@ int f2(int i)
boost::this_thread::sleep_for(boost::chrono::seconds(2)); boost::this_thread::sleep_for(boost::chrono::seconds(2));
return i + 1; return i + 1;
} }
template <class Executor>
void submit_some(boost::serial_executor& tp) void submit_some(boost::serial_executor<Executor>& tp)
{ {
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
std::cout << BOOST_CONTEXTOF << std::endl; std::cout << BOOST_CONTEXTOF << std::endl;
@@ -80,9 +80,8 @@ int test_executor_adaptor()
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES) #if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
{ {
boost::basic_thread_pool ea1(4); boost::basic_thread_pool ea1(4);
boost::serial_executor ea2(ea1); boost::serial_executor<boost::basic_thread_pool> ea2(ea1);
submit_some(ea2); submit_some(ea2);
boost::this_thread::sleep_for(boost::chrono::seconds(10));
} }
#endif #endif
} }

View File

@@ -52,18 +52,19 @@ int f2(int i)
return i + 1; return i + 1;
} }
void submit_some(boost::serial_executor_cont& tp) template < class Executor>
void submit_some(boost::serial_executor_cont<Executor>& tp)
{ {
std::cout << BOOST_CONTEXTOF << std::endl; //std::cout << BOOST_CONTEXTOF << std::endl;
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
std::cout << BOOST_CONTEXTOF << std::endl; //std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p2); tp.submit(&p2);
} }
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
std::cout << BOOST_CONTEXTOF << std::endl; //std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p1); tp.submit(&p1);
} }
std::cout << BOOST_CONTEXTOF << std::endl; //std::cout << BOOST_CONTEXTOF << std::endl;
} }
@@ -84,9 +85,8 @@ int test_executor_adaptor()
// std::cout << BOOST_CONTEXTOF << std::endl; // std::cout << BOOST_CONTEXTOF << std::endl;
{ {
boost::basic_thread_pool ea1(4); boost::basic_thread_pool ea1(4);
boost::serial_executor_cont ea2(ea1); boost::serial_executor_cont<boost::basic_thread_pool> ea2(ea1);
submit_some(ea2); submit_some(ea2);
boost::this_thread::sleep_for(boost::chrono::seconds(10));
} }
#endif #endif
// std::cout << BOOST_CONTEXTOF << std::endl; // std::cout << BOOST_CONTEXTOF << std::endl;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013-2014 Vicente J. Botet Escriba // Copyright (C) 2013-2015 Vicente J. Botet Escriba
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -18,6 +18,11 @@
#include <boost/thread/executors/work.hpp> #include <boost/thread/executors/work.hpp>
#include <boost/thread/csbl/vector.hpp> #include <boost/thread/csbl/vector.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/smart_ptr/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/config/abi_prefix.hpp> #include <boost/config/abi_prefix.hpp>
namespace boost namespace boost
@@ -30,14 +35,23 @@ namespace executors
/// type-erasure to store the works to do /// type-erasure to store the works to do
typedef executors::work work; typedef executors::work work;
private: private:
typedef thread thread_t;
struct shared_state : enable_shared_from_this<shared_state> {
typedef executors::work work;
/// the kind of stored threads are scoped threads to ensure that the threads are joined.
/// A move aware vector type /// A move aware vector type
//typedef scoped_thread<> thread_t;
typedef thread thread_t;
typedef csbl::vector<thread_t> thread_vector; typedef csbl::vector<thread_t> thread_vector;
/// A move aware vector
thread_vector threads;
/// the thread safe work queue /// the thread safe work queue
concurrent::sync_queue<work > work_queue; concurrent::sync_queue<work > work_queue;
/// A move aware vector
thread_vector threads;
unsigned const thread_count;
boost::function<void(basic_thread_pool)> at_thread_entry;
friend class basic_thread_pool;
public: public:
/** /**
@@ -81,13 +95,15 @@ namespace executors
*/ */
void worker_thread() void worker_thread()
{ {
// fixme: this call results on segmentation fault
//at_thread_entry(basic_thread_pool(this->shared_from_this()));
try try
{ {
for(;;) for(;;)
{ {
work task; work task;
queue_op_status st = work_queue.wait_pull(task); queue_op_status st = work_queue.wait_pull(task);
if (st == queue_op_status::closed) return; if (st == queue_op_status::closed) break;
task(); task();
} }
} }
@@ -97,49 +113,18 @@ namespace executors
return; return;
} }
} }
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <class AtThreadEntry>
void worker_thread1(AtThreadEntry& at_thread_entry)
{
at_thread_entry(*this);
worker_thread();
}
#endif
void worker_thread2(void(*at_thread_entry)(basic_thread_pool&))
{
at_thread_entry(*this);
worker_thread();
}
template <class AtThreadEntry>
void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
{
at_thread_entry(*this);
worker_thread();
}
static void do_nothing_at_thread_entry(basic_thread_pool&) {}
public: static void do_nothing_at_thread_entry(basic_thread_pool) {}
/// basic_thread_pool is not copyable.
BOOST_THREAD_NO_COPYABLE(basic_thread_pool)
/** void init()
* \b Effects: creates a thread pool that runs closures on \c thread_count threads.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
{ {
try try
{ {
threads.reserve(thread_count); threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i) for (unsigned i = 0; i < thread_count; ++i)
{ {
#if 1 thread th (&shared_state::worker_thread, this);
thread th (&basic_thread_pool::worker_thread, this);
threads.push_back(thread_t(boost::move(th))); threads.push_back(thread_t(boost::move(th)));
#else
threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
#endif
} }
} }
catch (...) catch (...)
@@ -148,81 +133,57 @@ namespace executors
throw; throw;
} }
} }
public:
/// basic_thread_pool is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
shared_state(unsigned const thread_count = thread::hardware_concurrency()+1)
: thread_count(thread_count),
at_thread_entry(do_nothing_at_thread_entry)
{
}
/** /**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads * \b Effects: creates a thread pool that runs closures on \c thread_count threads
* and executes the at_thread_entry function at the entry of each created thread. . * and executes the at_thread_entry function at the entry of each created thread. .
* *
* \b Throws: Whatever exception is thrown while initializing the needed resources. * \b Throws: Whatever exception is thrown while initializing the needed resources.
*/ */
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <class AtThreadEntry> template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry) shared_state( unsigned const thread_count, AtThreadEntry& at_thread_entry)
: thread_count(thread_count),
at_thread_entry(at_thread_entry)
{ {
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry);
threads.push_back(thread_t(boost::move(th)));
//threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
} }
} #endif
catch (...) shared_state( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool))
: thread_count(thread_count),
at_thread_entry(at_thread_entry)
{ {
close();
throw;
}
}
#endif
basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&))
{
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry);
threads.push_back(thread_t(boost::move(th)));
//threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
}
}
catch (...)
{
close();
throw;
}
} }
template <class AtThreadEntry> template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry) shared_state( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
: thread_count(thread_count),
at_thread_entry(boost::move(at_thread_entry))
{ {
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry));
threads.push_back(thread_t(boost::move(th)));
//threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
}
}
catch (...)
{
close();
throw;
}
} }
/** /**
* \b Effects: Destroys the thread pool. * \b Effects: Destroys the thread pool.
* *
* \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor. * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
*/ */
~basic_thread_pool() ~shared_state()
{ {
// signal to all the worker threads that there will be no more submissions. // signal to all the worker threads that there will be no more submissions.
close(); close();
// joins all the threads before destroying the thread pool resources (e.g. the queue).
join(); join();
// joins all the threads as the threads were scoped_threads
} }
/** /**
@@ -232,6 +193,7 @@ namespace executors
{ {
for (unsigned i = 0; i < threads.size(); ++i) for (unsigned i = 0; i < threads.size(); ++i)
{ {
if (this_thread::get_id() == threads[i].get_id()) continue;
threads[i].join(); threads[i].join();
} }
} }
@@ -265,13 +227,13 @@ namespace executors
* Whatever exception that can be throw while storing the closure. * Whatever exception that can be throw while storing the closure.
*/ */
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure> template <typename Closure>
void submit(Closure & closure) void submit(Closure & closure)
{ {
work_queue.push(work(closure)); work_queue.push(work(closure));
} }
#endif #endif
void submit(void (*closure)()) void submit(void (*closure)())
{ {
work_queue.push(work(closure)); work_queue.push(work(closure));
@@ -299,7 +261,149 @@ namespace executors
} while (! pred()); } while (! pred());
return true; return true;
} }
};
/**
* \b Effects: creates a thread pool with this shared state.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
friend struct shared_state;
basic_thread_pool(shared_ptr<shared_state> ptr)
: pimpl(ptr)
{
}
public:
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
: pimpl(make_shared<shared_state>(thread_count))
{
pimpl->init();
}
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads
* and executes the at_thread_entry function at the entry of each created thread. .
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry)
: pimpl(make_shared<shared_state>(thread_count, at_thread_entry))
{
pimpl->init();
}
#endif
basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool))
: pimpl(make_shared<shared_state>(thread_count, at_thread_entry))
{
pimpl->init();
}
template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
: pimpl(make_shared<shared_state>(thread_count, boost::forward<AtThreadEntry>(at_thread_entry)))
{
pimpl->init();
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
*/
~basic_thread_pool()
{
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return pimpl->try_executing_one();
}
/**
* \b Effects: join all the threads.
*/
void join()
{
pimpl->join();
}
/**
* \b Effects: close the \c basic_thread_pool for submissions.
* The worker threads will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return pimpl->reschedule_until(pred);
}
void schedule_one_or_yield()
{
return pimpl->schedule_one_or_yield();
}
private:
shared_ptr<shared_state> pimpl;
}; };
} }
using executors::basic_thread_pool; using executors::basic_thread_pool;

View File

@@ -24,7 +24,6 @@ namespace detail
class priority_executor_base class priority_executor_base
{ {
public: public:
//typedef boost::function<void()> work;
typedef executors::work_pq work; typedef executors::work_pq work;
protected: protected:
typedef Queue queue_type; typedef Queue queue_type;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013,2014 Vicente J. Botet Escriba // Copyright (C) 2013,2015 Vicente J. Botet Escriba
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -27,8 +27,6 @@ namespace boost
/// type-erasure to store the works to do /// type-erasure to store the works to do
typedef executors::work work; typedef executors::work work;
/// executor is not copyable.
BOOST_THREAD_NO_COPYABLE(executor)
executor() {} executor() {}
/** /**
@@ -128,7 +126,6 @@ namespace boost
bool reschedule_until(Pred const& pred) bool reschedule_until(Pred const& pred)
{ {
do { do {
//schedule_one_or_yield();
if ( ! try_executing_one()) if ( ! try_executing_one())
{ {
return false; return false;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013,2014 Vicente J. Botet Escriba // Copyright (C) 2013,2015 Vicente J. Botet Escriba
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -30,9 +30,8 @@ namespace executors
/// type-erasure to store the works to do /// type-erasure to store the works to do
typedef executor::work work; typedef executor::work work;
/// executor is not copyable. // executor_adaptor(executor_adaptor const&) = default;
BOOST_THREAD_NO_COPYABLE(executor_adaptor) // executor_adaptor(executor_adaptor &&) = default;
/** /**
* executor_adaptor constructor * executor_adaptor constructor
*/ */

View File

@@ -0,0 +1,150 @@
// Copyright (C) 2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_THREAD_EXECUTORS_GENERIC_EXECUTOR_HPP
#define BOOST_THREAD_EXECUTORS_GENERIC_EXECUTOR_HPP
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/type_traits/decay.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
{
namespace executors
{
class generic_executor
{
shared_ptr<executor> ex;
public:
/// type-erasure to store the works to do
typedef executors::work work;
//generic_executor(generic_executor const&) = default;
//generic_executor(generic_executor &&) = default;
template<typename Executor>
generic_executor(Executor const& ex
, typename boost::disable_if<is_same<Executor, generic_executor>,
int* >::type = (int*)0
)
//: ex(make_shared<executor_adaptor<Executor> >(ex)) // todo check why this doesn't work with C++03
: ex( new executor_adaptor<Executor>(ex) )
{
}
//generic_executor(generic_executor const& other) noexcept {}
//generic_executor& operator=(generic_executor const& other) noexcept {}
/**
* \par Effects
* Close the \c executor for submissions.
* The worker threads will work until there is no more closures to run.
*/
void close() { ex->close(); }
/**
* \par Returns
* Whether the pool is closed for submissions.
*/
bool closed() { return ex->closed(); }
void submit(BOOST_THREAD_RV_REF(work) closure)
{
ex->submit(boost::forward<work>(closure));
}
/**
* \par Requires
* \c Closure is a model of Callable(void()) and a model of CopyConstructible/MoveConstructible.
*
* \par Effects
* The specified closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the thread pool will call std::terminate, as is the case with threads.
*
* \par Synchronization
* Completion of closure on a particular thread happens before destruction of thread's thread local variables.
*
* \par Throws
* \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
work w ((closure));
submit(boost::move(w));
}
#endif
void submit(void (*closure)())
{
work w ((closure));
submit(boost::move(w));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
work w = boost::move(closure);
submit(boost::move(w));
}
// size_t num_pending_closures() const
// {
// return ex->num_pending_closures();
// }
/**
* \par Effects
* Try to execute one task.
*
* \par Returns
* Whether a task has been executed.
*
* \par Throws
* Whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one() { return ex->try_executing_one(); }
/**
* \par Requires
* This must be called from an scheduled task.
*
* \par Effects
* reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
do {
//schedule_one_or_yield();
if ( ! try_executing_one())
{
return false;
}
} while (! pred());
return true;
}
};
}
using executors::generic_executor;
}
#include <boost/config/abi_suffix.hpp>
#endif

View File

@@ -13,7 +13,8 @@
#include <boost/thread/detail/move.hpp> #include <boost/thread/detail/move.hpp>
#include <boost/thread/executors/executor.hpp> #include <boost/thread/executors/executor.hpp>
#include <boost/shared_ptr.hpp> #include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp> #include <boost/config/abi_prefix.hpp>
@@ -99,8 +100,8 @@ namespace boost
template<typename Executor> template<typename Executor>
generic_executor_ref(Executor& ex) generic_executor_ref(Executor& ex)
//: ex(make_shared<executor_ref<Executor> >(ex)) // todo check why this doesn't works with C++03 //: ex(make_shared<executor_ref<typename decay<Executor>::type> >(ex)) // todo check why this doesn't work with C++03
: ex( new executor_ref<Executor>(ex) ) : ex( new executor_ref<typename decay<Executor>::type>(ex) )
{ {
} }

View File

@@ -0,0 +1,284 @@
// Copyright (C) 2013,2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// 2013/11 Vicente J. Botet Escriba
// first implementation of a simple serial scheduler.
#ifndef BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_HPP
#define BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_HPP
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/type_traits/is_same.hpp>
#include <boost/type_traits/decay.hpp>
#include <boost/thread/caller_context.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
{
namespace executors
{
class generic_serial_executor
{
public:
/// type-erasure to store the works to do
typedef executors::work work;
private:
struct shared_state {
typedef executors::work work;
typedef scoped_thread<> thread_t;
/// the thread safe work queue
concurrent::sync_queue<work > work_queue;
generic_executor ex;
thread_t thr;
struct try_executing_one_task {
work& task;
boost::promise<void> &p;
try_executing_one_task(work& task, boost::promise<void> &p)
: task(task), p(p) {}
void operator()() {
try {
task();
p.set_value();
} catch (...)
{
p.set_exception(current_exception());
}
}
};
public:
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor& underlying_executor() BOOST_NOEXCEPT { return ex; }
private:
/**
* The main loop of the worker thread
*/
void worker_thread()
{
try
{
for(;;)
{
work task;
queue_op_status st = work_queue.wait_pull(task);
if (st == queue_op_status::closed) return;
boost::promise<void> p;
try_executing_one_task tmp(task,p);
ex.submit(tmp);
p.get_future().wait();
}
}
catch (...)
{
std::terminate();
return;
}
}
public:
/// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
template <class Executor>
shared_state(Executor const& ex)
: ex(ex), thr(&shared_state::worker_thread, this)
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c shared_state destructor.
*/
~shared_state()
{
// signal to the worker thread that there will be no more submissions.
close();
}
/**
* \b Effects: close the \c generic_serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
work_queue.close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return work_queue.closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c generic_serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
work_queue.push(work(closure));
}
#endif
void submit(void (*closure)())
{
work_queue.push(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
work_queue.push(work(boost::forward<Closure>(closure)));
}
};
public:
// generic_serial_executor(generic_serial_executor const&) = default;
// generic_serial_executor(generic_serial_executor &&) = default;
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
template <class Executor>
generic_serial_executor(Executor const& ex
, typename boost::disable_if<is_same<Executor, generic_serial_executor>,
int* >::type = (int*)0)
: pimpl(make_shared<shared_state>(ex))
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c generic_serial_executor destructor.
*/
~generic_serial_executor()
{
}
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor& underlying_executor() BOOST_NOEXCEPT
{
return pimpl->underlying_executor();
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
{
return false;
}
/**
* \b Effects: close the \c generic_serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c generic_serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return false;
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::generic_serial_executor;
}
#include <boost/config/abi_suffix.hpp>
#endif

View File

@@ -0,0 +1,265 @@
// Copyright (C) 2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// 2013/11 Vicente J. Botet Escriba
// first implementation of a simple serial scheduler.
#ifndef BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_CONT_HPP
#define BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_CONT_HPP
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/type_traits/is_same.hpp>
#include <boost/type_traits/decay.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
{
namespace executors
{
class generic_serial_executor_cont
{
public:
/// type-erasure to store the works to do
typedef executors::work work;
private:
struct shared_state {
typedef executors::work work;
generic_executor ex_;
future<void> fut_; // protected by mtx_
bool closed_; // protected by mtx_
mutex mtx_;
struct continuation {
work task;
template <class X>
struct result {
typedef void type;
};
continuation(BOOST_THREAD_RV_REF(work) tsk)
: task(boost::move(tsk)) {}
void operator()(future<void> f)
{
try {
task();
} catch (...) {
std::terminate();
}
}
};
bool closed(lock_guard<mutex>&) const
{
return closed_;
}
public:
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor& underlying_executor() BOOST_NOEXCEPT { return ex_; }
/// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*
* \b Notes:
* * The lifetime of the associated executor must outlive the serial executor.
* * The current implementation doesn't support submission from synchronous continuation, that is,
* - the executor must execute the continuation asynchronously or
* - the continuation can not submit to this serial executor.
*/
template <class Executor>
shared_state(Executor const& ex)
: ex_(ex), fut_(make_ready_future()), closed_(false)
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c generic_serial_executor_cont destructor.
*/
~shared_state()
{
// signal to the worker thread that there will be no more submissions.
close();
}
/**
* \b Effects: close the \c generic_serial_executor_cont for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
lock_guard<mutex> lk(mtx_);
closed_ = true;;
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish.
* If the invoked closure throws an exception the \c generic_serial_executor_cont will call \c std::terminate, as is the case with threads.
*
* \b Throws: \c sync_queue_is_closed if the executor is closed.
* Whatever exception that can be throw while storing the closure.
*
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure)));
}
#endif
void submit(void (*closure)())
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure)));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(boost::forward<Closure>(closure))));
}
};
public:
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
template <class Executor>
generic_serial_executor_cont(Executor const& ex
, typename boost::disable_if<is_same<Executor, generic_serial_executor_cont>,
int* >::type = (int*)0)
: pimpl(make_shared<shared_state>(ex))
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor.
*/
~generic_serial_executor_cont()
{
}
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor& underlying_executor() BOOST_NOEXCEPT
{
return pimpl->underlying_executor();
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
{
return false;
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return false;
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::generic_serial_executor_cont;
}
#include <boost/config/abi_suffix.hpp>
#endif

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2014 Vicente J. Botet Escriba // Copyright (C) 2014-2015 Vicente J. Botet Escriba
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -14,6 +14,9 @@
#include <boost/thread/detail/move.hpp> #include <boost/thread/detail/move.hpp>
#include <boost/thread/executors/work.hpp> #include <boost/thread/executors/work.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp> #include <boost/config/abi_prefix.hpp>
namespace boost namespace boost
@@ -25,8 +28,14 @@ namespace executors
public: public:
/// type-erasure to store the works to do /// type-erasure to store the works to do
typedef executors::work work; typedef executors::work work;
private:
struct shared_state {
typedef executors::work work;
bool closed_; bool closed_;
mutable mutex mtx_; mutable mutex mtx_;
/** /**
* Effects: try to execute one task. * Effects: try to execute one task.
* Returns: whether a task has been executed. * Returns: whether a task has been executed.
@@ -37,16 +46,15 @@ namespace executors
return false; return false;
} }
public: /// shared_state is not copyable.
/// inline_executor is not copyable. BOOST_THREAD_NO_COPYABLE(shared_state)
BOOST_THREAD_NO_COPYABLE(inline_executor)
/** /**
* \b Effects: creates a inline executor that runs closures immediately. * \b Effects: creates a inline executor that runs closures immediately.
* *
* \b Throws: Nothing. * \b Throws: Nothing.
*/ */
inline_executor() shared_state()
: closed_(false) : closed_(false)
{ {
} }
@@ -55,7 +63,7 @@ namespace executors
* *
* \b Synchronization: The completion of all the closures happen before the completion of the \c inline_executor destructor. * \b Synchronization: The completion of all the closures happen before the completion of the \c inline_executor destructor.
*/ */
~inline_executor() ~shared_state()
{ {
// signal to all the worker thread that there will be no more submissions. // signal to all the worker thread that there will be no more submissions.
close(); close();
@@ -74,11 +82,11 @@ namespace executors
/** /**
* \b Returns: whether the pool is closed for submissions. * \b Returns: whether the pool is closed for submissions.
*/ */
bool closed(lock_guard<mutex>& ) bool closed(lock_guard<mutex>& ) const
{ {
return closed_; return closed_;
} }
bool closed() bool closed() const
{ {
lock_guard<mutex> lk(mtx_); lock_guard<mutex> lk(mtx_);
return closed(lk); return closed(lk);
@@ -96,7 +104,7 @@ namespace executors
* Whatever exception that can be throw while storing the closure. * Whatever exception that can be throw while storing the closure.
*/ */
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure> template <typename Closure>
void submit(Closure & closure) void submit(Closure & closure)
{ {
@@ -114,7 +122,7 @@ namespace executors
return; return;
} }
} }
#endif #endif
void submit(void (*closure)()) void submit(void (*closure)())
{ {
{ {
@@ -160,7 +168,87 @@ namespace executors
{ {
return false; return false;
} }
};
public:
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
inline_executor()
: pimpl(make_shared<shared_state>())
{
}
/**
* \b Effects: close the \c inline_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the executor is closed for submissions.
*/
bool closed() const
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return pimpl->try_executing_one();
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& p)
{
return pimpl->reschedule_until(p);
}
private:
shared_ptr<shared_state> pimpl;
}; };
} }
using executors::inline_executor; using executors::inline_executor;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013,2014 Vicente J. Botet Escriba // Copyright (C) 2013-2015 Vicente J. Botet Escriba
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -17,6 +17,9 @@
#include <boost/thread/concurrent_queues/sync_queue.hpp> #include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp> #include <boost/thread/executors/work.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp> #include <boost/config/abi_prefix.hpp>
namespace boost namespace boost
@@ -30,10 +33,12 @@ namespace executors
/// type-erasure to store the works to do /// type-erasure to store the works to do
typedef executors::work work; typedef executors::work work;
private: private:
struct shared_state {
typedef executors::work work;
/// the thread safe work queue /// the thread safe work queue
concurrent::sync_queue<work > work_queue; concurrent::sync_queue<work > work_queue;
public:
/** /**
* Effects: try to execute one task. * Effects: try to execute one task.
* Returns: whether a task has been executed. * Returns: whether a task has been executed.
@@ -57,7 +62,6 @@ namespace executors
return false; return false;
} }
} }
private:
/** /**
* Effects: schedule one task or yields * Effects: schedule one task or yields
* Throws: whatever the current task constructor throws or the task() throws. * Throws: whatever the current task constructor throws or the task() throws.
@@ -70,19 +74,15 @@ namespace executors
} }
} }
public:
/// loop_executor is not copyable. /// loop_executor is not copyable.
BOOST_THREAD_NO_COPYABLE(loop_executor) BOOST_THREAD_NO_COPYABLE(shared_state)
/** /**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
* *
* \b Throws: Whatever exception is thrown while initializing the needed resources. * \b Throws: Whatever exception is thrown while initializing the needed resources.
*/ */
loop_executor() shared_state()
{ {
} }
/** /**
@@ -90,7 +90,7 @@ namespace executors
* *
* \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor. * \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor.
*/ */
~loop_executor() ~shared_state()
{ {
// signal to all the worker thread that there will be no more submissions. // signal to all the worker thread that there will be no more submissions.
close(); close();
@@ -139,13 +139,13 @@ namespace executors
* Whatever exception that can be throw while storing the closure. * Whatever exception that can be throw while storing the closure.
*/ */
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure> template <typename Closure>
void submit(Closure & closure) void submit(Closure & closure)
{ {
work_queue.push(work(closure)); work_queue.push(work(closure));
} }
#endif #endif
void submit(void (*closure)()) void submit(void (*closure)())
{ {
work_queue.push(work(closure)); work_queue.push(work(closure));
@@ -189,6 +189,120 @@ namespace executors
} }
}; };
public:
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
loop_executor()
: pimpl(make_shared<shared_state>())
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor.
*/
~loop_executor()
{
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return pimpl->try_executing_one();
}
// /**
// * Effects: schedule one task or yields
// * Throws: whatever the current task constructor throws or the task() throws.
// */
// void schedule_one_or_yield()
// {
// return pimpl->schedule_one_or_yield();
// }
/**
* The main loop of the worker thread
*/
void loop()
{
pimpl->loop();
}
/**
* \b Effects: close the \c loop_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c loop_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return pimpl->reschedule_until(pred);
}
/**
* run queued closures
*/
void run_queued_closures()
{
pimpl->run_queued_closures();
}
private:
shared_ptr<shared_state> pimpl;
};
} }
using executors::loop_executor; using executors::loop_executor;

View File

@@ -9,36 +9,123 @@
#define BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP #define BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP
#include <boost/thread/executors/detail/scheduled_executor_base.hpp> #include <boost/thread/executors/detail/scheduled_executor_base.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/thread/csbl/vector.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
namespace boost namespace boost
{ {
namespace executors namespace executors
{ {
class scheduled_thread_pool : public detail::scheduled_executor_base<> template <class Clock = chrono::steady_clock>
class scheduled_thread_pool
{ {
private: private:
thread_group _workers;
public:
scheduled_thread_pool(size_t num_threads) : super() struct shared_state : public detail::scheduled_executor_base<> {
/// basic_thread_pool is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
typedef detail::scheduled_executor_base<> super;
typedef typename super::work work;
typedef scoped_thread<> thread_t;
typedef csbl::vector<thread_t> thread_vector;
thread_vector threads;
shared_state(unsigned const thread_count = thread::hardware_concurrency()+1) : super()
{ {
for(size_t i = 0; i < num_threads; i++)
try
{ {
_workers.create_thread(bind(&super::loop, this)); threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
#if 1
thread th (&shared_state::loop, this);
threads.push_back(thread_t(boost::move(th)));
#else
threads.push_back(thread_t(&shared_state::loop, this)); // do not compile
#endif
}
}
catch (...)
{
close();
throw;
} }
} }
~scheduled_thread_pool() /**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
*/
~shared_state()
{ {
this->close(); this->close();
_workers.join_all();
} }
private:
typedef detail::scheduled_executor_base<> super;
}; //end class }; //end class
public:
typedef typename shared_state::work work;
typedef Clock clock;
typedef typename clock::duration duration;
typedef typename clock::time_point time_point;
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
scheduled_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
: pimpl(make_shared<shared_state>(thread_count))
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
*/
~scheduled_thread_pool()
{
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
void submit_at(work w, const time_point& tp)
{
return pimpl->submit_at(boost::move(w), tp);
}
void submit_after(work w, const duration& d)
{
return pimpl->submit_after(boost::move(w), d);
}
private:
shared_ptr<shared_state> pimpl;
};
} //end executors namespace } //end executors namespace
using executors::scheduled_thread_pool; using executors::scheduled_thread_pool;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2014 Vicente J. Botet Escriba // Copyright (C) 2014-2015 Vicente J. Botet Escriba
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -13,6 +13,8 @@
#include <boost/chrono/time_point.hpp> #include <boost/chrono/time_point.hpp>
#include <boost/chrono/duration.hpp> #include <boost/chrono/duration.hpp>
#include <boost/chrono/system_clocks.hpp> #include <boost/chrono/system_clocks.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp> #include <boost/config/abi_prefix.hpp>
@@ -36,7 +38,7 @@ namespace boost
} }
private: private:
Executor& ex; Executor ex;
Function funct; Function funct;
}; };
@@ -100,8 +102,8 @@ namespace boost
} }
private: private:
Scheduler& sch; Scheduler sch;
Executor& ex; Executor ex;
typename clock::time_point tp; typename clock::time_point tp;
bool is_closed; bool is_closed;
}; };
@@ -150,8 +152,8 @@ namespace boost
} }
private: private:
Scheduler& sch; Scheduler sch;
Executor& ex; Executor ex;
}; //end class }; //end class
/// Wraps a reference to a @c Scheduler providing an @c Executor that /// Wraps a reference to a @c Scheduler providing an @c Executor that
@@ -208,7 +210,7 @@ namespace boost
} }
private: private:
Scheduler& sch; Scheduler sch;
time_point tp; time_point tp;
bool is_closed; bool is_closed;
}; //end class }; //end class
@@ -217,22 +219,71 @@ namespace boost
/// It provides factory helper functions such as at/after that convert a @c Scheduler into an @c Executor /// It provides factory helper functions such as at/after that convert a @c Scheduler into an @c Executor
/// that submit the work at/after a specific time/duration respectively. /// that submit the work at/after a specific time/duration respectively.
template <class Clock = chrono::steady_clock> template <class Clock = chrono::steady_clock>
class scheduler : public detail::scheduled_executor_base<Clock> class scheduler
{ {
public: private:
typedef typename detail::scheduled_executor_base<Clock>::work work;
typedef Clock clock; struct shared_state : public detail::scheduled_executor_base<Clock> {
typedef detail::scheduled_executor_base<Clock> super;
typedef typename super::work work;
thread thr;
scheduler() /// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
shared_state()
: super(), : super(),
thr(&super::loop, this) {} thr(&super::loop, this) {}
~scheduler() ~shared_state()
{ {
this->close(); this->close();
thr.join(); thr.join();
} }
};
public:
typedef typename shared_state::work work;
typedef Clock clock;
typedef typename clock::duration duration;
typedef typename clock::time_point time_point;
scheduler()
: pimpl(make_shared<shared_state>())
{}
~scheduler()
{
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
void submit_at(work w, const time_point& tp)
{
return pimpl->submit_at(boost::move(w), tp);
}
void submit_after(work w, const duration& d)
{
return pimpl->submit_after(boost::move(w), d);
}
template <class Ex> template <class Ex>
scheduler_executor_wrapper<scheduler, Ex> on(Ex& ex) scheduler_executor_wrapper<scheduler, Ex> on(Ex& ex)
{ {
@@ -250,13 +301,10 @@ namespace boost
{ {
return at_executor<scheduler>(*this, tp); return at_executor<scheduler>(*this, tp);
} }
private: private:
typedef detail::scheduled_executor_base<Clock> super; shared_ptr<shared_state> pimpl;
thread thr;
}; };
} }
using executors::resubmitter; using executors::resubmitter;
using executors::resubmit; using executors::resubmit;

View File

@@ -19,7 +19,7 @@ namespace executors
class scheduling_adpator : public detail::scheduled_executor_base<> class scheduling_adpator : public detail::scheduled_executor_base<>
{ {
private: private:
Executor& _exec; Executor _exec;
thread _scheduler; thread _scheduler;
public: public:

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013 Vicente J. Botet Escriba // Copyright (C) 2013,2015 Vicente J. Botet Escriba
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -14,27 +14,34 @@
#include <boost/thread/detail/move.hpp> #include <boost/thread/detail/move.hpp>
#include <boost/thread/concurrent_queues/sync_queue.hpp> #include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp> #include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor_ref.hpp> #include <boost/thread/executors/generic_executor.hpp>
#include <boost/thread/future.hpp> #include <boost/thread/future.hpp>
#include <boost/thread/scoped_thread.hpp> #include <boost/thread/scoped_thread.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp> #include <boost/config/abi_prefix.hpp>
namespace boost namespace boost
{ {
namespace executors namespace executors
{ {
template <class Executor>
class serial_executor class serial_executor
{ {
public: public:
/// type-erasure to store the works to do /// type-erasure to store the works to do
typedef executors::work work; typedef executors::work work;
private: private:
struct shared_state {
typedef executors::work work;
typedef scoped_thread<> thread_t; typedef scoped_thread<> thread_t;
/// the thread safe work queue /// the thread safe work queue
concurrent::sync_queue<work > work_queue; concurrent::sync_queue<work > work_queue;
generic_executor_ref ex; Executor ex;
thread_t thr; thread_t thr;
struct try_executing_one_task { struct try_executing_one_task {
@@ -57,81 +64,55 @@ namespace executors
* \par Returns * \par Returns
* The underlying executor wrapped on a generic executor reference. * The underlying executor wrapped on a generic executor reference.
*/ */
generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; } Executor& underlying_executor() BOOST_NOEXCEPT { return ex; }
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
work task;
try
{
if (work_queue.try_pull(task) == queue_op_status::success)
{
boost::promise<void> p;
try_executing_one_task tmp(task,p);
ex.submit(tmp);
p.get_future().wait();
return true;
}
return false;
}
catch (...)
{
std::terminate();
return false;
}
}
private: private:
/**
* Effects: schedule one task or yields
* Throws: whatever the current task constructor throws or the task() throws.
*/
void schedule_one_or_yield()
{
if ( ! try_executing_one())
{
this_thread::yield();
}
}
/** /**
* The main loop of the worker thread * The main loop of the worker thread
*/ */
void worker_thread() void worker_thread()
{ {
while (!closed()) try
{ {
schedule_one_or_yield(); for(;;)
{
work task;
queue_op_status st = work_queue.wait_pull(task);
if (st == queue_op_status::closed) return;
boost::promise<void> p;
try_executing_one_task tmp(task,p);
ex.submit(tmp);
p.get_future().wait();
} }
while (try_executing_one()) }
catch (...)
{ {
std::terminate();
return;
} }
} }
public: public:
/// serial_executor is not copyable. /// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(serial_executor) BOOST_THREAD_NO_COPYABLE(shared_state)
/** /**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
* *
* \b Throws: Whatever exception is thrown while initializing the needed resources. * \b Throws: Whatever exception is thrown while initializing the needed resources.
*/ */
template <class Executor> shared_state(Executor& ex)
serial_executor(Executor& ex) : ex(ex), thr(&shared_state::worker_thread, this)
: ex(ex), thr(&serial_executor::worker_thread, this)
{ {
} }
/** /**
* \b Effects: Destroys the thread pool. * \b Effects: Destroys the thread pool.
* *
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor. * \b Synchronization: The completion of all the closures happen before the completion of the \c shared_state destructor.
*/ */
~serial_executor() ~shared_state()
{ {
// signal to the worker thread that there will be no more submissions. // signal to the worker thread that there will be no more submissions.
close(); close();
@@ -166,13 +147,13 @@ namespace executors
* Whatever exception that can be throw while storing the closure. * Whatever exception that can be throw while storing the closure.
*/ */
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure> template <typename Closure>
void submit(Closure & closure) void submit(Closure & closure)
{ {
work_queue.push(work(closure)); work_queue.push(work(closure));
} }
#endif #endif
void submit(void (*closure)()) void submit(void (*closure)())
{ {
work_queue.push(work(closure)); work_queue.push(work(closure));
@@ -184,23 +165,104 @@ namespace executors
work_queue.push(work(boost::forward<Closure>(closure))); work_queue.push(work(boost::forward<Closure>(closure)));
} }
};
public:
/** /**
* \b Requires: This must be called from an scheduled task. * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
* *
* \b Effects: reschedule functions until pred() * \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
serial_executor(Executor& ex)
: pimpl(make_shared<shared_state>(ex))
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor.
*/
~serial_executor()
{
}
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
Executor& underlying_executor() BOOST_NOEXCEPT
{
return pimpl->underlying_executor();
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
{
return false;
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/ */
template <typename Pred> template <typename Pred>
bool reschedule_until(Pred const& pred) bool reschedule_until(Pred const& pred)
{
do {
if ( ! try_executing_one())
{ {
return false; return false;
} }
} while (! pred()); private:
return true; shared_ptr<shared_state> pimpl;
}
}; };
} }
using executors::serial_executor; using executors::serial_executor;

View File

@@ -12,18 +12,20 @@
#include <boost/thread/detail/config.hpp> #include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp> #include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp> #include <boost/thread/detail/move.hpp>
#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp> #include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor_ref.hpp>
#include <boost/thread/future.hpp> #include <boost/thread/future.hpp>
#include <boost/thread/scoped_thread.hpp> #include <boost/thread/scoped_thread.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp> #include <boost/config/abi_prefix.hpp>
namespace boost namespace boost
{ {
namespace executors namespace executors
{ {
template <class Executor>
class serial_executor_cont class serial_executor_cont
{ {
public: public:
@@ -31,7 +33,10 @@ namespace executors
typedef executors::work work; typedef executors::work work;
private: private:
generic_executor_ref ex_; struct shared_state {
typedef executors::work work;
Executor ex_;
future<void> fut_; // protected by mtx_ future<void> fut_; // protected by mtx_
bool closed_; // protected by mtx_ bool closed_; // protected by mtx_
mutex mtx_; mutex mtx_;
@@ -63,10 +68,10 @@ namespace executors
* \par Returns * \par Returns
* The underlying executor wrapped on a generic executor reference. * The underlying executor wrapped on a generic executor reference.
*/ */
generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex_; } Executor& underlying_executor() BOOST_NOEXCEPT { return ex_; }
/// serial_executor_cont is not copyable. /// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(serial_executor_cont) BOOST_THREAD_NO_COPYABLE(shared_state)
/** /**
* \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor. * \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor.
@@ -79,8 +84,7 @@ namespace executors
* - the executor must execute the continuation asynchronously or * - the executor must execute the continuation asynchronously or
* - the continuation can not submit to this serial executor. * - the continuation can not submit to this serial executor.
*/ */
template <class Executor> shared_state(Executor& ex)
serial_executor_cont(Executor& ex)
: ex_(ex), fut_(make_ready_future()), closed_(false) : ex_(ex), fut_(make_ready_future()), closed_(false)
{ {
} }
@@ -89,7 +93,7 @@ namespace executors
* *
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor_cont destructor. * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor_cont destructor.
*/ */
~serial_executor_cont() ~shared_state()
{ {
// signal to the worker thread that there will be no more submissions. // signal to the worker thread that there will be no more submissions.
close(); close();
@@ -114,17 +118,6 @@ namespace executors
return closed(lk); return closed(lk);
} }
/**
* Effects: none.
* Returns: always false.
* Throws: No.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
{
return false;
}
/** /**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
* *
@@ -136,7 +129,7 @@ namespace executors
* *
*/ */
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure> template <typename Closure>
void submit(Closure & closure) void submit(Closure & closure)
{ {
@@ -144,7 +137,7 @@ namespace executors
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure))); fut_ = fut_.then(ex_, continuation(work(closure)));
} }
#endif #endif
void submit(void (*closure)()) void submit(void (*closure)())
{ {
lock_guard<mutex> lk(mtx_); lock_guard<mutex> lk(mtx_);
@@ -159,7 +152,102 @@ namespace executors
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(boost::forward<Closure>(closure)))); fut_ = fut_.then(ex_, continuation(work(boost::forward<Closure>(closure))));
} }
};
public:
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
serial_executor_cont(Executor& ex)
: pimpl(make_shared<shared_state>(ex))
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor.
*/
~serial_executor_cont()
{
}
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
Executor& underlying_executor() BOOST_NOEXCEPT
{
return pimpl->underlying_executor();
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
{
return false;
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return false;
}
private:
shared_ptr<shared_state> pimpl;
}; };
} }
using executors::serial_executor_cont; using executors::serial_executor_cont;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2014 Vicente J. Botet Escriba // Copyright (C) 2014-2015 Vicente J. Botet Escriba
// //
// Distributed under the Boost Software License, Version 1.0. (See accompanying // Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -18,6 +18,9 @@
#include <boost/thread/scoped_thread.hpp> #include <boost/thread/scoped_thread.hpp>
#include <boost/thread/csbl/vector.hpp> #include <boost/thread/csbl/vector.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp> #include <boost/config/abi_prefix.hpp>
namespace boost namespace boost
@@ -29,12 +32,40 @@ namespace executors
public: public:
/// type-erasure to store the works to do /// type-erasure to store the works to do
typedef executors::work work; typedef executors::work work;
private:
struct shared_state {
typedef executors::work work;
bool closed_; bool closed_;
typedef scoped_thread<> thread_t; typedef scoped_thread<> thread_t;
typedef csbl::vector<thread_t> threads_type; typedef csbl::vector<thread_t> threads_type;
threads_type threads_; threads_type threads_;
mutable mutex mtx_; mutable mutex mtx_;
/// thread_executor is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
shared_state()
: closed_(false)
{
}
/**
* \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor.
*/
~shared_state()
{
// signal to all the worker thread that there will be no more submissions.
close();
// all the scoped threads will join before destroying
}
/** /**
* Effects: try to execute one task. * Effects: try to execute one task.
* Returns: whether a task has been executed. * Returns: whether a task has been executed.
@@ -45,31 +76,6 @@ namespace executors
return false; return false;
} }
public:
/// thread_executor is not copyable.
BOOST_THREAD_NO_COPYABLE(thread_executor)
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
thread_executor()
: closed_(false)
{
}
/**
* \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor.
*/
~thread_executor()
{
// signal to all the worker thread that there will be no more submissions.
close();
// all the scoped threads will join before destroying
}
/** /**
* \b Effects: close the \c thread_executor for submissions. * \b Effects: close the \c thread_executor for submissions.
* The loop will work until there is no more closures to run. * The loop will work until there is no more closures to run.
@@ -105,7 +111,7 @@ namespace executors
* Whatever exception that can be throw while storing the closure. * Whatever exception that can be throw while storing the closure.
*/ */
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure> template <typename Closure>
void submit(Closure & closure) void submit(Closure & closure)
{ {
@@ -115,7 +121,7 @@ namespace executors
thread th(closure); thread th(closure);
threads_.push_back(thread_t(boost::move(th))); threads_.push_back(thread_t(boost::move(th)));
} }
#endif #endif
void submit(void (*closure)()) void submit(void (*closure)())
{ {
lock_guard<mutex> lk(mtx_); lock_guard<mutex> lk(mtx_);
@@ -145,8 +151,97 @@ namespace executors
{ {
return false; return false;
} }
}; };
public:
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
thread_executor()
: pimpl(make_shared<shared_state>())
{
}
/**
* \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor.
*/
~thread_executor()
{
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return pimpl->try_executing_one();
}
/**
* \b Effects: close the \c thread_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c thread_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& p)
{
return pimpl->reschedule_until(p);
}
private:
shared_ptr<shared_state> pimpl;
};
} }
using executors::thread_executor; using executors::thread_executor;
} }

View File

@@ -116,9 +116,9 @@ BOOST_THREAD_INLINE_NAMESPACE(v2)
template<typename F> template<typename F>
friend void task_region_final(BOOST_THREAD_FWD_REF(F) f); friend void task_region_final(BOOST_THREAD_FWD_REF(F) f);
template <class Ex, typename F> template <class Ex, typename F>
friend void task_region(Ex&, BOOST_THREAD_FWD_REF(F) f); friend void task_region(Ex const&, BOOST_THREAD_FWD_REF(F) f);
template<class Ex, typename F> template<class Ex, typename F>
friend void task_region_final(Ex&, BOOST_THREAD_FWD_REF(F) f); friend void task_region_final(Ex const&, BOOST_THREAD_FWD_REF(F) f);
void wait_all() void wait_all()
{ {
@@ -153,21 +153,20 @@ protected:
#if defined BOOST_THREAD_TASK_REGION_HAS_SHARED_CANCELED && defined BOOST_THREAD_PROVIDES_EXECUTORS #if defined BOOST_THREAD_TASK_REGION_HAS_SHARED_CANCELED && defined BOOST_THREAD_PROVIDES_EXECUTORS
task_region_handle_gen() task_region_handle_gen()
: canceled(false) : canceled(false)
, ex(0)
{} {}
task_region_handle_gen(Executor& ex) task_region_handle_gen(Executor const& ex)
: canceled(false) : canceled(false)
, ex(&ex) , ex(ex)
{} {}
#endif #endif
#if ! defined BOOST_THREAD_TASK_REGION_HAS_SHARED_CANCELED && defined BOOST_THREAD_PROVIDES_EXECUTORS #if ! defined BOOST_THREAD_TASK_REGION_HAS_SHARED_CANCELED && defined BOOST_THREAD_PROVIDES_EXECUTORS
task_region_handle_gen() task_region_handle_gen()
: ex(0) //: ex(0)
{} {}
task_region_handle_gen(Executor& ex) task_region_handle_gen(Executor const& ex)
: ex(&ex) : ex(ex)
{} {}
#endif #endif
@@ -188,7 +187,7 @@ protected:
bool canceled; bool canceled;
#endif #endif
#if defined BOOST_THREAD_PROVIDES_EXECUTORS #if defined BOOST_THREAD_PROVIDES_EXECUTORS
Executor* ex; Executor ex;
#endif #endif
exception_list exs; exception_list exs;
typedef csbl::vector<future<void> > group_type; typedef csbl::vector<future<void> > group_type;
@@ -211,13 +210,13 @@ protected:
} }
} }
#if defined BOOST_THREAD_PROVIDES_EXECUTORS #if defined BOOST_THREAD_PROVIDES_EXECUTORS
group.push_back(async(*ex, detail::wrapped<task_region_handle_gen<Executor>, F>(*this, forward<F>(f)))); group.push_back(async(ex, detail::wrapped<task_region_handle_gen<Executor>, F>(*this, forward<F>(f))));
#else #else
group.push_back(async(detail::wrapped<task_region_handle_gen<Executor>, F>(*this, forward<F>(f)))); group.push_back(async(detail::wrapped<task_region_handle_gen<Executor>, F>(*this, forward<F>(f))));
#endif #endif
#else #else
#if defined BOOST_THREAD_PROVIDES_EXECUTORS #if defined BOOST_THREAD_PROVIDES_EXECUTORS
group.push_back(async(*ex, forward<F>(f))); group.push_back(async(ex, forward<F>(f)));
#else #else
group.push_back(async(forward<F>(f))); group.push_back(async(forward<F>(f)));
#endif #endif
@@ -245,17 +244,18 @@ protected:
class task_region_handle : class task_region_handle :
public task_region_handle_gen<default_executor> public task_region_handle_gen<default_executor>
{ {
default_executor tp; //default_executor tp;
template <typename F> template <typename F>
friend void task_region(BOOST_THREAD_FWD_REF(F) f); friend void task_region(BOOST_THREAD_FWD_REF(F) f);
template<typename F> template<typename F>
friend void task_region_final(BOOST_THREAD_FWD_REF(F) f); friend void task_region_final(BOOST_THREAD_FWD_REF(F) f);
protected: protected:
task_region_handle() : task_region_handle_gen<default_executor>() task_region_handle()
: task_region_handle_gen<default_executor>()
{ {
#if defined BOOST_THREAD_PROVIDES_EXECUTORS #if defined BOOST_THREAD_PROVIDES_EXECUTORS
ex = &tp; //ex = &tp;
#endif #endif
} }
BOOST_DELETED_FUNCTION(task_region_handle(const task_region_handle&)) BOOST_DELETED_FUNCTION(task_region_handle(const task_region_handle&))
@@ -265,7 +265,7 @@ protected:
}; };
template <typename Executor, typename F> template <typename Executor, typename F>
void task_region_final(Executor& ex, BOOST_THREAD_FWD_REF(F) f) void task_region_final(Executor const& ex, BOOST_THREAD_FWD_REF(F) f)
{ {
task_region_handle_gen<Executor> tr(ex); task_region_handle_gen<Executor> tr(ex);
try try
@@ -280,7 +280,7 @@ protected:
} }
template <typename Executor, typename F> template <typename Executor, typename F>
void task_region(Executor& ex, BOOST_THREAD_FWD_REF(F) f) void task_region(Executor const& ex, BOOST_THREAD_FWD_REF(F) f)
{ {
task_region_final(ex, forward<F>(f)); task_region_final(ex, forward<F>(f));
} }

View File

@@ -810,7 +810,10 @@ rule thread-compile ( sources : reqs * : name )
[ thread-run2 ../example/user_scheduler.cpp : ex_user_scheduler ] [ thread-run2 ../example/user_scheduler.cpp : ex_user_scheduler ]
[ thread-run2 ../example/executor.cpp : ex_executor ] [ thread-run2 ../example/executor.cpp : ex_executor ]
[ thread-run2 ../example/generic_executor_ref.cpp : ex_generic_executor_ref ] [ thread-run2 ../example/generic_executor_ref.cpp : ex_generic_executor_ref ]
[ thread-run2 ../example/generic_executor.cpp : ex_generic_executor ]
[ thread-run2 ../example/generic_serial_executor.cpp : ex_generic_serial_executor ]
[ thread-run2 ../example/serial_executor.cpp : ex_serial_executor ] [ thread-run2 ../example/serial_executor.cpp : ex_serial_executor ]
[ thread-run2 ../example/generic_serial_executor_cont.cpp : ex_generic_serial_executor_cont ]
[ thread-run2 ../example/serial_executor_cont.cpp : ex_serial_executor_cont ] [ thread-run2 ../example/serial_executor_cont.cpp : ex_serial_executor_cont ]
[ thread-run2 ../example/future_when_all.cpp : ex_future_when_all ] [ thread-run2 ../example/future_when_all.cpp : ex_future_when_all ]
[ thread-run2 ../example/parallel_accumulate.cpp : ex_parallel_accumulate ] [ thread-run2 ../example/parallel_accumulate.cpp : ex_parallel_accumulate ]

View File

@@ -24,7 +24,7 @@
using namespace boost::chrono; using namespace boost::chrono;
typedef boost::scheduled_thread_pool scheduled_tp; typedef boost::scheduled_thread_pool<> scheduled_tp;
void fn(int x) void fn(int x)
{ {
@@ -46,19 +46,18 @@ void func2(scheduled_tp* tp, steady_clock::duration d)
void test_timing(const int n) void test_timing(const int n)
{ {
//This function should take n seconds to execute. //This function should take n seconds to execute.
boost::scheduled_thread_pool se(4); boost::scheduled_thread_pool<> se(4);
for(int i = 1; i <= n; i++) for(int i = 1; i <= n; i++)
{ {
se.submit_after(boost::bind(fn,i), milliseconds(i*100)); se.submit_after(boost::bind(fn,i), milliseconds(i*100));
} }
boost::this_thread::sleep_for(boost::chrono::seconds(10));
//dtor is called here so all task will have to be executed before we return //dtor is called here so all task will have to be executed before we return
} }
void test_deque_timing() void test_deque_timing()
{ {
boost::scheduled_thread_pool se(4); boost::scheduled_thread_pool<> se(4);
for(int i = 0; i < 10; i++) for(int i = 0; i < 10; i++)
{ {
steady_clock::duration d = milliseconds(i*100); steady_clock::duration d = milliseconds(i*100);
@@ -85,10 +84,10 @@ void test_deque_multi(const int n)
int main() int main()
{ {
steady_clock::time_point start = steady_clock::now(); //steady_clock::time_point start = steady_clock::now();
test_timing(5); test_timing(5);
steady_clock::duration diff = steady_clock::now() - start; //steady_clock::duration diff = steady_clock::now() - start;
BOOST_TEST(diff > milliseconds(500)); //BOOST_TEST(diff > milliseconds(500));
test_deque_timing(); test_deque_timing();
test_deque_multi(4); test_deque_multi(4);
test_deque_multi(8); test_deque_multi(8);

View File

@@ -27,7 +27,6 @@ typedef boost::executors::basic_thread_pool thread_pool;
void fn(int x) void fn(int x)
{ {
//std::cout << "[" << __LINE__ << "] " << steady_clock::now() << std::endl;
std::cout << x << std::endl; std::cout << x << std::endl;
} }
@@ -75,7 +74,7 @@ int main()
test_after(5, sch); test_after(5, sch);
test_at(5, sch); test_at(5, sch);
test_on(5, sch, tp); test_on(5, sch, tp);
boost::this_thread::sleep_for(boost::chrono::seconds(10)); std::cout << "[" << __LINE__ << "] " << std::endl;
return boost::report_errors(); return boost::report_errors();
} }

View File

@@ -28,7 +28,6 @@ typedef boost::executors::basic_thread_pool thread_pool;
void fn(int x) void fn(int x)
{ {
//std::cout << "[" << __LINE__ << "] " << steady_clock::now() << std::endl;
std::cout << x << std::endl; std::cout << x << std::endl;
} }
@@ -41,14 +40,10 @@ void test_timing(const int n)
sa.submit_after(boost::bind(fn,i),seconds(i)); sa.submit_after(boost::bind(fn,i),seconds(i));
sa.submit_after(boost::bind(fn,i), milliseconds(i*100)); sa.submit_after(boost::bind(fn,i), milliseconds(i*100));
} }
boost::this_thread::sleep_for(boost::chrono::seconds(10));
} }
int main() int main()
{ {
steady_clock::time_point start = steady_clock::now();
test_timing(5); test_timing(5);
steady_clock::duration diff = steady_clock::now() - start;
BOOST_TEST(diff > seconds(5));
return boost::report_errors(); return boost::report_errors();
} }