diff --git a/doc/async_executors.qbk b/doc/async_executors.qbk index c9311bdd..2529446f 100644 --- a/doc/async_executors.qbk +++ b/doc/async_executors.qbk @@ -1,5 +1,5 @@ [/ - / Copyright (c) 2014 Vicente J. Botet Escriba + / Copyright (c) 2014-2015 Vicente J. Botet Escriba / / Distributed under the Boost Software License, Version 1.0. (See accompanying / file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -174,7 +174,7 @@ This has several advantages: * The scheduled operations are available for all the executors via wrappers. * The template functions could accept any chrono `time_point` and `duration` respectively as we are not working with virtual functions. -In order to manage with all the clocks, this library propose generic solution. `scheduler` know how to manage with the `submit_at`/`submit_after` `Clock::time_point`/`Clock::duration` tasks. Note that the durations on different clocks differ. +In order to manage with all the clocks, this library propose a generic solution. `scheduler` know how to manage with the `submit_at`/`submit_after` `Clock::time_point`/`Clock::duration` tasks. Note that the durations on different clocks differ. [heading Not Handled Exceptions] As in N3785 and based on the same design decision than `std`/`boost::thread` if a user closure throws an exception, the executor must call the `std::terminate` function. @@ -506,9 +506,6 @@ Executor abstract base class. public: typedef boost::work work; - executor(executor const&) = delete; - executor& operator=(executor const&) = delete; - executor(); virtual ~executor() {}; @@ -573,9 +570,6 @@ Polymorphic adaptor of a model of Executor to an executor. public: typedef executor::work work; - executor_adaptor(executor_adaptor const&) = delete; - executor_adaptor& operator=(executor_adaptor const&) = delete; - template executor_adaptor(Args&& ... args); @@ -641,20 +635,24 @@ Polymorphic adaptor of a model of Executor to an executor. [/////////////////////////////////] [section:generic_executor_ref Class `generic_executor_ref`] -Executor abstract base class. +Type erased executor class. #include namespace boost { class generic_executor_ref { public: - generic_executor_ref(generic_executor_ref const&); - generic_executor_ref& operator=(generic_executor_ref const&); template generic_executor_ref(Executor& ex); - generic_executor_ref() {}; + generic_executor_ref() = delete; + generic_executor_ref(generic_executor_ref const&) = default; + generic_executor_ref(generic_executor_ref &&) = default; + generic_executor_ref& operator=(generic_executor_ref const&) = default; + generic_executor_ref& operator=(generic_executor_ref &&) = default; + executor& underlying_executor() noexcept; + void close() = 0; bool closed() = 0; @@ -669,6 +667,44 @@ Executor abstract base class. [endsect] + +[/////////////////////////////////] +[section:generic_executor Class `generic_executor`] + +Type erased executor class. + + #include + namespace boost { + class generic_executor + { + public: + + template + generic_executor(Executor& ex); + generic_executor() = delete; + + generic_executor(generic_executor const&) = default; + generic_executor(generic_executor &&) = default; + generic_executor& operator=(generic_executor const&) = default; + generic_executor& operator=(generic_executor &&) = default; + + executor& underlying_executor() noexcept; + + void close() = 0; + bool closed() = 0; + + template + void submit(Closure&& closure); + + virtual bool try_executing_one() = 0; + template + bool reschedule_until(Pred const& pred); + }; + } + +[endsect] + + [//////////////////////////////////////////////////////////] [section: scheduler Template Class `scheduler `] @@ -683,10 +719,7 @@ Scheduler providing time related functions. Note that `scheduler` is not an Exec public: using work = boost::function ; using clock = Clock; - - scheduler(scheduler const&) = delete; - scheduler& operator=(scheduler const&) = delete; - + scheduler(); ~scheduler(); @@ -733,7 +766,7 @@ Scheduler providing time related functions. Note that `scheduler` is not an Exec [[Effects:] [Destroys the scheduler.]] -[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]] +[[Synchronization:] [The completion of all the closures happen before the completion of the destructor.]] ] @@ -1339,10 +1372,6 @@ A serial executor ensuring that there are no two work units that executes concur class serial_executor { public: - serial_executor(serial_executor const&) = delete; - serial_executor& operator=(serial_executor const&) = delete; - - template serial_executor(Executor& ex); Executor& underlying_executor() noexcept; @@ -1393,7 +1422,7 @@ A serial executor ensuring that there are no two work units that executes concur [/////////////////////////////////////] [section:underlying_executor Function member `underlying_executor()`] - generic_executor_ref& underlying_executor() noexcept; + Executor& underlying_executor() noexcept; [variablelist @@ -1418,13 +1447,10 @@ A serial executor ensuring that there are no two work units that executes concur class generic_serial_executor { public: - generic_serial_executor(generic_serial_executor const&) = delete; - generic_serial_executor& operator=(generic_serial_executor const&) = delete; - template generic_serial_executor(Executor& ex); - generic_executor_ref& underlying_executor() noexcept; + generic_executor& underlying_executor() noexcept; void close(); bool closed(); @@ -1433,6 +1459,7 @@ A serial executor ensuring that there are no two work units that executes concur void submit(Closure&& closure); bool try_executing_one(); + template bool reschedule_until(Pred const& pred); @@ -1456,7 +1483,7 @@ A serial executor ensuring that there are no two work units that executes concur [endsect] [/////////////////////////////////////] -[section:destructor Destructor `~serial_executor()`] +[section:destructor Destructor `~generic_serial_executor()`] ~generic_serial_executor(); @@ -1468,6 +1495,80 @@ A serial executor ensuring that there are no two work units that executes concur ] +[endsect] +[/////////////////////////////////////] +[section:underlying_executor Function member `underlying_executor()`] + + generic_executor& underlying_executor() noexcept; + +[variablelist + +[[Return:] [The underlying executor instance. ]] + +] + + +[endsect] + +[endsect] + +[//////////////////////////////////////////////////////////] +[section:serial_executor_cont Template Class `serial_executor_cont`] + +A serial executor ensuring that there are no two work units that executes concurrently. + + #include + namespace boost { + template + class serial_executor_cont + { + public: + serial_executor_cont(Executor& ex); + + Executor& underlying_executor() noexcept; + + void close(); + bool closed(); + + template + void submit(Closure&& closure); + + bool try_executing_one(); + template + bool reschedule_until(Pred const& pred); + + }; + } + +[/////////////////////////////////////] +[section:constructor Constructor `serial_executor_cont(Executor&)`] + + template + serial_executor_cont(Executor& ex); + +[variablelist + +[[Effects:] [Constructs a serial_executor_cont. ]] + +[[Throws:] [Nothing. ]] + +] + + +[endsect] +[/////////////////////////////////////] +[section:destructor Destructor `~serial_executor_cont()`] + + ~serial_executor_cont(); + +[variablelist + +[[Effects:] [Destroys the serial_executor.]] + +[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]] + +] + [endsect] [/////////////////////////////////////] [section:underlying_executor Function member `underlying_executor()`] @@ -1478,6 +1579,8 @@ A serial executor ensuring that there are no two work units that executes concur [[Return:] [The underlying executor instance. ]] +[[Throws:] [Nothing.]] + ] @@ -1485,6 +1588,82 @@ A serial executor ensuring that there are no two work units that executes concur [endsect] +[//////////////////////////////////////////////////////////] +[section:generic_serial_cont_executor Class `generic_serial_cont_executor`] + +A serial executor ensuring that there are no two work units that executes concurrently. + + #include + namespace boost { + class generic_serial_cont_executor + { + public: + template + generic_serial_cont_executor(Executor& ex); + + generic_executor& underlying_executor() noexcept; + + void close(); + bool closed(); + + template + void submit(Closure&& closure); + + bool try_executing_one(); + + template + bool reschedule_until(Pred const& pred); + + }; + } + +[/////////////////////////////////////] +[section:constructor Constructor `generic_serial_cont_executor(Executor&)`] + + template + generic_serial_cont_executor(Executor& ex); + +[variablelist + +[[Effects:] [Constructs a generic_serial_cont_executor. ]] + +[[Throws:] [Nothing. ]] + +] + + +[endsect] +[/////////////////////////////////////] +[section:destructor Destructor `~generic_serial_cont_executor()`] + + ~generic_serial_cont_executor(); + +[variablelist + +[[Effects:] [Destroys the generic_serial_cont_executor.]] + +[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]] + +] + +[endsect] +[/////////////////////////////////////] +[section:underlying_executor Function member `underlying_executor()`] + + generic_executor& underlying_executor() noexcept; + +[variablelist + +[[Return:] [The underlying executor instance. ]] + +] + + +[endsect] + +[endsect] + + [//////////////////////////////////////////////////////////] [section:inline_executor Class `inline_executor`] @@ -1496,10 +1675,8 @@ A serial executor ensuring that there are no two work units that executes concur class inline_executor { public: - inline_executor(inline_executor const&) = delete; - inline_executor& operator=(inline_executor const&) = delete; - inline_executor(); + ~inline_executor(); void close(); bool closed(); @@ -1560,9 +1737,6 @@ A thread pool with up to a fixed number of threads. class basic_thread_pool { public: - - basic_thread_pool(basic_thread_pool const&) = delete; - basic_thread_pool& operator=(basic_thread_pool const&) = delete; basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()); template @@ -1623,9 +1797,6 @@ A thread_executor with a threads for each task. { public: - thread_executor(thread_executor const&) = delete; - thread_executor& operator=(thread_executor const&) = delete; - thread_executor(); template basic_thread_pool( unsigned const thread_count, AtThreadEntry at_thread_entry); @@ -1680,9 +1851,6 @@ A user scheduled executor. class loop_executor { public: - - loop_executor(loop_executor const&) = delete; - loop_executor& operator=(loop_executor const&) = delete; loop_executor(); ~loop_executor(); diff --git a/example/executor.cpp b/example/executor.cpp index f0380e9f..fae3e9e5 100644 --- a/example/executor.cpp +++ b/example/executor.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include #include @@ -73,7 +73,7 @@ void submit_some(boost::executor& tp) } -void at_th_entry(boost::basic_thread_pool& ) +void at_th_entry(boost::basic_thread_pool ) { } @@ -84,6 +84,10 @@ int test_executor_adaptor() { try { + { + boost::basic_thread_pool e1; + boost::basic_thread_pool e2 = e1; + } { boost::executor_adaptor < boost::basic_thread_pool > ea(4); std::cout << BOOST_CONTEXTOF << std::endl; @@ -119,25 +123,65 @@ int test_executor_adaptor() std::cout << BOOST_CONTEXTOF << std::endl; } std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::loop_executor e1; + boost::loop_executor e2 = e1; + boost::executor_adaptor < boost::loop_executor > ea2(e2); + submit_some( ea2); + ea2.underlying_executor().run_queued_closures(); + } { boost::executor_adaptor < boost::loop_executor > ea2; submit_some( ea2); ea2.underlying_executor().run_queued_closures(); } -#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES) - std::cout << BOOST_CONTEXTOF << std::endl; + // std::cout << BOOST_CONTEXTOF << std::endl; { - boost::executor_adaptor < boost::basic_thread_pool > ea1(4); - boost::executor_adaptor < boost::serial_executor > ea2(ea1); + boost::basic_thread_pool tp; + boost::generic_serial_executor e1(tp); + boost::generic_serial_executor e2 = e1; + } + { + boost::basic_thread_pool ea1(4); + boost::generic_serial_executor ea2(ea1); + boost::executor_adaptor < boost::generic_serial_executor > ea3(ea2); + submit_some(ea3); + } + { + boost::basic_thread_pool ea1(4); + boost::generic_serial_executor ea2(ea1); + boost::executor_adaptor < boost::generic_serial_executor > ea3(ea2); + submit_some(ea3); + } +//#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) + { + boost::basic_thread_pool ea1(4); + boost::executor_adaptor < boost::generic_serial_executor > ea2(ea1); + submit_some(ea2); + } +//#endif + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::inline_executor e1; + boost::inline_executor e2 = e1; + boost::executor_adaptor < boost::inline_executor > ea2(e2); submit_some(ea2); } -#endif - std::cout << BOOST_CONTEXTOF << std::endl; { boost::executor_adaptor < boost::inline_executor > ea1; submit_some(ea1); } std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::thread_executor e1; + boost::thread_executor e2 = e1; + } + { + boost::thread_executor e1; + boost::executor_adaptor < boost::generic_executor > ea2(e1); + submit_some(ea2); + } + { boost::executor_adaptor < boost::thread_executor > ea1; submit_some(ea1); diff --git a/example/generic_executor.cpp b/example/generic_executor.cpp new file mode 100644 index 00000000..67429b5e --- /dev/null +++ b/example/generic_executor.cpp @@ -0,0 +1,184 @@ +// Copyright (C) 2014 Vicente Botet +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#if ! defined BOOST_NO_CXX11_DECLTYPE +#define BOOST_RESULT_OF_USE_DECLTYPE +#endif + +#define BOOST_THREAD_VERSION 4 +#define BOOST_THREAD_PROVIDES_EXECUTORS +//#define BOOST_THREAD_USES_LOG +#define BOOST_THREAD_USES_LOG_THREAD_ID +#define BOOST_THREAD_QUEUE_DEPRECATE_OLD + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +void p1() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + //boost::this_thread::sleep_for(boost::chrono::milliseconds(200)); +} + +void p2() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + //boost::this_thread::sleep_for(boost::chrono::seconds(10)); +} + +int f1() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + return 1; +} +int f2(int i) +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::seconds(2)); + return i + 1; +} + +void submit_some(boost::generic_executor tp) +{ + for (int i = 0; i < 3; ++i) { + tp.submit(&p2); + } + for (int i = 0; i < 3; ++i) { + tp.submit(&p1); + } + +} + +template < class Executor> +void submit_some2(Executor& tp) +{ + for (int i = 0; i < 3; ++i) { + tp.submit(&p2); + } + for (int i = 0; i < 3; ++i) { + tp.submit(&p1); + } + +} + +template +void submit_some3(boost::serial_executor& tp) +{ + for (int i = 0; i < 3; ++i) { + tp.submit(&p2); + } + for (int i = 0; i < 3; ++i) { + tp.submit(&p1); + } +} + +void at_th_entry(boost::basic_thread_pool) +{ + +} + + + +int test_generic_executor() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + { + try + { + { + boost::basic_thread_pool ea(4); + submit_some( ea); + { + boost::future t1 = boost::async(ea, &f1); + boost::future t2 = boost::async(ea, &f1); + // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl; + // std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl; + } + submit_some(ea); + { + boost::basic_thread_pool ea3(1); + boost::future t1 = boost::async(ea3, &f1); + boost::future t2 = boost::async(ea3, &f1); + //boost::future t2 = boost::async(ea3, f2, 1); // todo this doesn't compiles yet on C++11 + //boost::future t2 = boost::async(ea3, boost::bind(f2, 1)); // todo this doesn't compiles yet on C++98 + // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl; + // std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl; + } + submit_some(ea); + } + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::loop_executor ea2; + submit_some( ea2); + ea2.run_queued_closures(); + } +#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES) +// // std::cout << BOOST_CONTEXTOF << std::endl; +// { +// boost::basic_thread_pool ea1(4); +// boost::generic_serial_executor ea2(ea1); +// submit_some(ea2); +// } + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::basic_thread_pool ea1(4); + boost::serial_executor ea2(ea1); + submit_some3(ea2); + } +#endif + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::inline_executor ea1; + submit_some(ea1); + } + // std::cout << BOOST_CONTEXTOF << std::endl; + { + //boost::thread_executor ea1; + //submit_some(ea1); + } + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::basic_thread_pool ea(4, at_th_entry); + boost::future t1 = boost::async(ea, &f1); + // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl; + } + } + catch (std::exception& ex) + { + std::cout << "ERROR= " << ex.what() << "" << std::endl; + return 1; + } + catch (...) + { + std::cout << " ERROR= exception thrown" << std::endl; + return 2; + } + } + // std::cout << BOOST_CONTEXTOF << std::endl; + return 0; +} + + +int main() +{ + return test_generic_executor(); + + +} diff --git a/example/generic_executor_ref.cpp b/example/generic_executor_ref.cpp index 9e61d5dc..7090bb6d 100644 --- a/example/generic_executor_ref.cpp +++ b/example/generic_executor_ref.cpp @@ -17,11 +17,12 @@ #include #include #include -#include +#include #include #include #include #include +#include #include #include #include @@ -64,7 +65,7 @@ void submit_some(boost::generic_executor_ref tp) } -void at_th_entry(boost::basic_thread_pool& ) +void at_th_entry(boost::basic_thread_pool) { } @@ -108,7 +109,7 @@ int test_generic_executor_ref() std::cout << BOOST_CONTEXTOF << std::endl; { boost::basic_thread_pool ea1(4); - boost::serial_executor ea2(ea1); + boost::generic_serial_executor ea2(ea1); submit_some(ea2); } #endif diff --git a/example/generic_serial_executor.cpp b/example/generic_serial_executor.cpp new file mode 100644 index 00000000..453c02f5 --- /dev/null +++ b/example/generic_serial_executor.cpp @@ -0,0 +1,117 @@ +// Copyright (C) 2015 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#if ! defined BOOST_NO_CXX11_DECLTYPE +#define BOOST_RESULT_OF_USE_DECLTYPE +#endif + +#define BOOST_THREAD_VERSION 4 +#define BOOST_THREAD_PROVIDES_EXECUTORS +//#define BOOST_THREAD_USES_LOG +#define BOOST_THREAD_USES_LOG_THREAD_ID +#define BOOST_THREAD_QUEUE_DEPRECATE_OLD + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +void p1() +{ + std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::milliseconds(30)); + std::cout << BOOST_CONTEXTOF << std::endl; +} + +void p2() +{ + std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::milliseconds(10)); + std::cout << BOOST_CONTEXTOF << std::endl; +} + +int f1() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + return 1; +} +int f2(int i) +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::seconds(2)); + return i + 1; +} + +void submit_some(boost::generic_serial_executor& tp) +{ + //std::cout << BOOST_CONTEXTOF << std::endl; + for (int i = 0; i < 3; ++i) { + //std::cout << BOOST_CONTEXTOF << std::endl; + tp.submit(&p2); + } + for (int i = 0; i < 3; ++i) { + //std::cout << BOOST_CONTEXTOF << std::endl; + tp.submit(&p1); + } + //std::cout << BOOST_CONTEXTOF << std::endl; +} + + +void at_th_entry(boost::basic_thread_pool ) +{ + +} + +int test_executor_adaptor() +{ + std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::basic_thread_pool tp; + boost::generic_serial_executor e1(tp); + boost::generic_serial_executor e2 = e1; + } + std::cout << BOOST_CONTEXTOF << std::endl; + { + try + { + +#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::basic_thread_pool ea1(4); + boost::generic_serial_executor ea2(ea1); + submit_some(ea2); + } +#endif + // std::cout << BOOST_CONTEXTOF << std::endl; + } + catch (std::exception& ex) + { + std::cout << "ERROR= " << ex.what() << "" << std::endl; + return 1; + } + catch (...) + { + std::cout << " ERROR= exception thrown" << std::endl; + return 2; + } + } + // std::cout << BOOST_CONTEXTOF << std::endl; + return 0; +} + + +int main() +{ + return test_executor_adaptor(); +} diff --git a/example/generic_serial_executor_cont.cpp b/example/generic_serial_executor_cont.cpp new file mode 100644 index 00000000..f39ad46b --- /dev/null +++ b/example/generic_serial_executor_cont.cpp @@ -0,0 +1,118 @@ +// Copyright (C) 2015 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#if ! defined BOOST_NO_CXX11_DECLTYPE +#define BOOST_RESULT_OF_USE_DECLTYPE +#endif + +#define BOOST_THREAD_VERSION 4 +#define BOOST_THREAD_PROVIDES_EXECUTORS +//#define BOOST_THREAD_USES_LOG +#define BOOST_THREAD_USES_LOG_THREAD_ID +#define BOOST_THREAD_QUEUE_DEPRECATE_OLD + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +void p1() +{ + std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::milliseconds(30)); + std::cout << BOOST_CONTEXTOF << std::endl; +} + +void p2() +{ + std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::milliseconds(10)); + std::cout << BOOST_CONTEXTOF << std::endl; +} + +int f1() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + return 1; +} +int f2(int i) +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::seconds(2)); + return i + 1; +} + +void submit_some(boost::generic_serial_executor_cont& tp) +{ + //std::cout << BOOST_CONTEXTOF << std::endl; + for (int i = 0; i < 3; ++i) { + //std::cout << BOOST_CONTEXTOF << std::endl; + tp.submit(&p2); + } + for (int i = 0; i < 3; ++i) { + //std::cout << BOOST_CONTEXTOF << std::endl; + tp.submit(&p1); + } + //std::cout << BOOST_CONTEXTOF << std::endl; + +} + + +void at_th_entry(boost::basic_thread_pool) +{ + +} + +int test_executor_adaptor() +{ + std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::basic_thread_pool tp; + boost::generic_serial_executor_cont e1(tp); + boost::generic_serial_executor_cont e2 = e1; + } + // std::cout << BOOST_CONTEXTOF << std::endl; + { + try + { + +#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::basic_thread_pool ea1(4); + boost::generic_serial_executor_cont ea2(ea1); + submit_some(ea2); + } +#endif + // std::cout << BOOST_CONTEXTOF << std::endl; + } + catch (std::exception& ex) + { + std::cout << "ERROR= " << ex.what() << "" << std::endl; + return 1; + } + catch (...) + { + std::cout << " ERROR= exception thrown" << std::endl; + return 2; + } + } + // std::cout << BOOST_CONTEXTOF << std::endl; + return 0; +} + + +int main() +{ + return test_executor_adaptor(); +} diff --git a/example/serial_executor.cpp b/example/serial_executor.cpp index e3c2c340..36208369 100644 --- a/example/serial_executor.cpp +++ b/example/serial_executor.cpp @@ -51,8 +51,8 @@ int f2(int i) boost::this_thread::sleep_for(boost::chrono::seconds(2)); return i + 1; } - -void submit_some(boost::serial_executor& tp) +template +void submit_some(boost::serial_executor& tp) { for (int i = 0; i < 3; ++i) { std::cout << BOOST_CONTEXTOF << std::endl; @@ -80,9 +80,8 @@ int test_executor_adaptor() #if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES) { boost::basic_thread_pool ea1(4); - boost::serial_executor ea2(ea1); + boost::serial_executor ea2(ea1); submit_some(ea2); - boost::this_thread::sleep_for(boost::chrono::seconds(10)); } #endif } diff --git a/example/serial_executor_cont.cpp b/example/serial_executor_cont.cpp index 1883b071..d03164dc 100644 --- a/example/serial_executor_cont.cpp +++ b/example/serial_executor_cont.cpp @@ -52,18 +52,19 @@ int f2(int i) return i + 1; } -void submit_some(boost::serial_executor_cont& tp) +template < class Executor> +void submit_some(boost::serial_executor_cont& tp) { - std::cout << BOOST_CONTEXTOF << std::endl; + //std::cout << BOOST_CONTEXTOF << std::endl; for (int i = 0; i < 3; ++i) { - std::cout << BOOST_CONTEXTOF << std::endl; + //std::cout << BOOST_CONTEXTOF << std::endl; tp.submit(&p2); } for (int i = 0; i < 3; ++i) { - std::cout << BOOST_CONTEXTOF << std::endl; + //std::cout << BOOST_CONTEXTOF << std::endl; tp.submit(&p1); } - std::cout << BOOST_CONTEXTOF << std::endl; + //std::cout << BOOST_CONTEXTOF << std::endl; } @@ -84,9 +85,8 @@ int test_executor_adaptor() // std::cout << BOOST_CONTEXTOF << std::endl; { boost::basic_thread_pool ea1(4); - boost::serial_executor_cont ea2(ea1); + boost::serial_executor_cont ea2(ea1); submit_some(ea2); - boost::this_thread::sleep_for(boost::chrono::seconds(10)); } #endif // std::cout << BOOST_CONTEXTOF << std::endl; diff --git a/include/boost/thread/executors/basic_thread_pool.hpp b/include/boost/thread/executors/basic_thread_pool.hpp index 1cde3fa9..0c8e282c 100644 --- a/include/boost/thread/executors/basic_thread_pool.hpp +++ b/include/boost/thread/executors/basic_thread_pool.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2013-2014 Vicente J. Botet Escriba +// Copyright (C) 2013-2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -18,6 +18,11 @@ #include #include +#include +#include +#include +#include + #include namespace boost @@ -30,126 +35,256 @@ namespace executors /// type-erasure to store the works to do typedef executors::work work; private: - typedef thread thread_t; - /// A move aware vector type - typedef csbl::vector thread_vector; - /// A move aware vector - thread_vector threads; - /// the thread safe work queue - concurrent::sync_queue work_queue; + struct shared_state : enable_shared_from_this { + typedef executors::work work; + /// the kind of stored threads are scoped threads to ensure that the threads are joined. + /// A move aware vector type + //typedef scoped_thread<> thread_t; + typedef thread thread_t; + typedef csbl::vector thread_vector; - public: - /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. - */ - bool try_executing_one() - { - try - { - work task; - if (work_queue.try_pull(task) == queue_op_status::success) - { - task(); - return true; - } - return false; - } - catch (...) - { - std::terminate(); - return false; - } - } - /** - * Effects: schedule one task or yields - * Throws: whatever the current task constructor throws or the task() throws. - */ - void schedule_one_or_yield() - { - if ( ! try_executing_one()) - { - this_thread::yield(); - } - } - private: + /// the thread safe work queue + concurrent::sync_queue work_queue; + /// A move aware vector + thread_vector threads; + unsigned const thread_count; + boost::function at_thread_entry; + friend class basic_thread_pool; - /** - * The main loop of the worker threads - */ - void worker_thread() - { - try + + public: + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() { - for(;;) + try { work task; - queue_op_status st = work_queue.wait_pull(task); - if (st == queue_op_status::closed) { - return; + if (work_queue.try_pull(task) == queue_op_status::success) + { + task(); + return true; } - task(); + return false; + } + catch (...) + { + std::terminate(); + return false; } } - catch (...) + /** + * Effects: schedule one task or yields + * Throws: whatever the current task constructor throws or the task() throws. + */ + void schedule_one_or_yield() { - std::terminate(); - return; + if ( ! try_executing_one()) + { + this_thread::yield(); + } } - } -#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) - template - void worker_thread1(AtThreadEntry& at_thread_entry) - { - at_thread_entry(*this); - worker_thread(); - } -#endif - void worker_thread2(void(*at_thread_entry)(basic_thread_pool&)) - { - at_thread_entry(*this); - worker_thread(); - } - template - void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry) - { - at_thread_entry(*this); - worker_thread(); - } - static void do_nothing_at_thread_entry(basic_thread_pool&) {} + private: + /** + * The main loop of the worker threads + */ + void worker_thread() + { + // fixme: this call results on segmentation fault + //at_thread_entry(basic_thread_pool(this->shared_from_this())); + try + { + for(;;) + { + work task; + queue_op_status st = work_queue.wait_pull(task); + if (st == queue_op_status::closed) break; + task(); + } + } + catch (...) + { + std::terminate(); + return; + } + } + + static void do_nothing_at_thread_entry(basic_thread_pool) {} + + void init() + { + try + { + threads.reserve(thread_count); + for (unsigned i = 0; i < thread_count; ++i) + { + thread th (&shared_state::worker_thread, this); + threads.push_back(thread_t(boost::move(th))); + } + } + catch (...) + { + close(); + throw; + } + } + + public: + /// basic_thread_pool is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a thread pool that runs closures on \c thread_count threads. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + shared_state(unsigned const thread_count = thread::hardware_concurrency()+1) + : thread_count(thread_count), + at_thread_entry(do_nothing_at_thread_entry) + { + } + /** + * \b Effects: creates a thread pool that runs closures on \c thread_count threads + * and executes the at_thread_entry function at the entry of each created thread. . + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + shared_state( unsigned const thread_count, AtThreadEntry& at_thread_entry) + : thread_count(thread_count), + at_thread_entry(at_thread_entry) + { + } + #endif + shared_state( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool)) + : thread_count(thread_count), + at_thread_entry(at_thread_entry) + { + } + template + shared_state( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry) + : thread_count(thread_count), + at_thread_entry(boost::move(at_thread_entry)) + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor. + */ + ~shared_state() + { + // signal to all the worker threads that there will be no more submissions. + close(); + join(); + // joins all the threads as the threads were scoped_threads + } + + /** + * \b Effects: join all the threads. + */ + void join() + { + for (unsigned i = 0; i < threads.size(); ++i) + { + if (this_thread::get_id() == threads[i].get_id()) continue; + threads[i].join(); + } + } + + /** + * \b Effects: close the \c basic_thread_pool for submissions. + * The worker threads will work until there is no more closures to run. + */ + void close() + { + work_queue.close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return work_queue.closed(); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + work_queue.push(work(closure)); + } + #endif + void submit(void (*closure)()) + { + work_queue.push(work(closure)); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + work_queue.push(work(boost::forward(closure))); + } + + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& pred) + { + do { + if ( ! try_executing_one()) + { + return false; + } + } while (! pred()); + return true; + } + }; + + /** + * \b Effects: creates a thread pool with this shared state. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + friend struct shared_state; + basic_thread_pool(shared_ptr ptr) + : pimpl(ptr) + { + } public: - /// basic_thread_pool is not copyable. - BOOST_THREAD_NO_COPYABLE(basic_thread_pool) - /** * \b Effects: creates a thread pool that runs closures on \c thread_count threads. * * \b Throws: Whatever exception is thrown while initializing the needed resources. */ basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1) + : pimpl(make_shared(thread_count)) { - try - { - threads.reserve(thread_count); - for (unsigned i = 0; i < thread_count; ++i) - { -#if 1 - thread th (&basic_thread_pool::worker_thread, this); - threads.push_back(thread_t(boost::move(th))); -#else - threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile -#endif - } - } - catch (...) - { - close(); - throw; - } + pimpl->init(); } + /** * \b Effects: creates a thread pool that runs closures on \c thread_count threads * and executes the at_thread_entry function at the entry of each created thread. . @@ -159,61 +294,24 @@ namespace executors #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) template basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry) + : pimpl(make_shared(thread_count, at_thread_entry)) { - try - { - threads.reserve(thread_count); - for (unsigned i = 0; i < thread_count; ++i) - { - thread th (&basic_thread_pool::worker_thread1, this, at_thread_entry); - threads.push_back(thread_t(boost::move(th))); - //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile - } - } - catch (...) - { - close(); - throw; - } + pimpl->init(); } #endif - basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&)) + + basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool)) + : pimpl(make_shared(thread_count, at_thread_entry)) { - try - { - threads.reserve(thread_count); - for (unsigned i = 0; i < thread_count; ++i) - { - thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry); - threads.push_back(thread_t(boost::move(th))); - //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile - } - } - catch (...) - { - close(); - throw; - } + pimpl->init(); } template basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry) + : pimpl(make_shared(thread_count, boost::forward(at_thread_entry))) { - try - { - threads.reserve(thread_count); - for (unsigned i = 0; i < thread_count; ++i) - { - thread th (&basic_thread_pool::worker_thread3, this, boost::forward(at_thread_entry)); - threads.push_back(thread_t(boost::move(th))); - //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile - } - } - catch (...) - { - close(); - throw; - } + pimpl->init(); } + /** * \b Effects: Destroys the thread pool. * @@ -221,10 +319,16 @@ namespace executors */ ~basic_thread_pool() { - // signal to all the worker threads that there will be no more submissions. - close(); - // joins all the threads before destroying the thread pool resources (e.g. the queue). - join(); + } + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return pimpl->try_executing_one(); } /** @@ -232,10 +336,7 @@ namespace executors */ void join() { - for (unsigned i = 0; i < threads.size(); ++i) - { - threads[i].join(); - } + pimpl->join(); } /** @@ -244,7 +345,7 @@ namespace executors */ void close() { - work_queue.close(); + pimpl->close(); } /** @@ -252,7 +353,7 @@ namespace executors */ bool closed() { - return work_queue.closed(); + return pimpl->closed(); } /** @@ -271,18 +372,18 @@ namespace executors template void submit(Closure & closure) { - work_queue.push(work(closure)); + pimpl->submit(closure); } #endif void submit(void (*closure)()) { - work_queue.push(work(closure)); + pimpl->submit(closure); } template void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work_queue.push(work(boost::forward(closure))); + pimpl->submit(boost::forward(closure)); } /** @@ -293,15 +394,16 @@ namespace executors template bool reschedule_until(Pred const& pred) { - do { - if ( ! try_executing_one()) - { - return false; - } - } while (! pred()); - return true; + return pimpl->reschedule_until(pred); } + void schedule_one_or_yield() + { + return pimpl->schedule_one_or_yield(); + } + + private: + shared_ptr pimpl; }; } using executors::basic_thread_pool; diff --git a/include/boost/thread/executors/detail/priority_executor_base.hpp b/include/boost/thread/executors/detail/priority_executor_base.hpp index 2191c0b3..90b2e9e5 100644 --- a/include/boost/thread/executors/detail/priority_executor_base.hpp +++ b/include/boost/thread/executors/detail/priority_executor_base.hpp @@ -24,7 +24,6 @@ namespace detail class priority_executor_base { public: - //typedef boost::function work; typedef executors::work_pq work; protected: typedef Queue queue_type; diff --git a/include/boost/thread/executors/executor.hpp b/include/boost/thread/executors/executor.hpp index 1075bce7..031af1f7 100644 --- a/include/boost/thread/executors/executor.hpp +++ b/include/boost/thread/executors/executor.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2013,2014 Vicente J. Botet Escriba +// Copyright (C) 2013,2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -27,8 +27,6 @@ namespace boost /// type-erasure to store the works to do typedef executors::work work; - /// executor is not copyable. - BOOST_THREAD_NO_COPYABLE(executor) executor() {} /** @@ -128,7 +126,6 @@ namespace boost bool reschedule_until(Pred const& pred) { do { - //schedule_one_or_yield(); if ( ! try_executing_one()) { return false; diff --git a/include/boost/thread/executors/executor_adaptor.hpp b/include/boost/thread/executors/executor_adaptor.hpp index ebe4e347..c4793fd0 100644 --- a/include/boost/thread/executors/executor_adaptor.hpp +++ b/include/boost/thread/executors/executor_adaptor.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2013,2014 Vicente J. Botet Escriba +// Copyright (C) 2013,2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -30,9 +30,8 @@ namespace executors /// type-erasure to store the works to do typedef executor::work work; - /// executor is not copyable. - BOOST_THREAD_NO_COPYABLE(executor_adaptor) - +// executor_adaptor(executor_adaptor const&) = default; +// executor_adaptor(executor_adaptor &&) = default; /** * executor_adaptor constructor */ diff --git a/include/boost/thread/executors/generic_executor.hpp b/include/boost/thread/executors/generic_executor.hpp new file mode 100644 index 00000000..0513371c --- /dev/null +++ b/include/boost/thread/executors/generic_executor.hpp @@ -0,0 +1,150 @@ +// Copyright (C) 2015 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_THREAD_EXECUTORS_GENERIC_EXECUTOR_HPP +#define BOOST_THREAD_EXECUTORS_GENERIC_EXECUTOR_HPP + +#include + +#include +#include +#include + +#include +#include +#include + +#include + +namespace boost +{ + namespace executors + { + + class generic_executor + { + shared_ptr ex; + public: + /// type-erasure to store the works to do + typedef executors::work work; + + //generic_executor(generic_executor const&) = default; + //generic_executor(generic_executor &&) = default; + + template + generic_executor(Executor const& ex + , typename boost::disable_if, + int* >::type = (int*)0 + ) + //: ex(make_shared >(ex)) // todo check why this doesn't work with C++03 + : ex( new executor_adaptor(ex) ) + { + } + + //generic_executor(generic_executor const& other) noexcept {} + //generic_executor& operator=(generic_executor const& other) noexcept {} + + + /** + * \par Effects + * Close the \c executor for submissions. + * The worker threads will work until there is no more closures to run. + */ + void close() { ex->close(); } + + /** + * \par Returns + * Whether the pool is closed for submissions. + */ + bool closed() { return ex->closed(); } + + void submit(BOOST_THREAD_RV_REF(work) closure) + { + ex->submit(boost::forward(closure)); + } + + /** + * \par Requires + * \c Closure is a model of Callable(void()) and a model of CopyConstructible/MoveConstructible. + * + * \par Effects + * The specified closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the thread pool will call std::terminate, as is the case with threads. + * + * \par Synchronization + * Completion of closure on a particular thread happens before destruction of thread's thread local variables. + * + * \par Throws + * \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + +#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + work w ((closure)); + submit(boost::move(w)); + } +#endif + void submit(void (*closure)()) + { + work w ((closure)); + submit(boost::move(w)); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + work w = boost::move(closure); + submit(boost::move(w)); + } + +// size_t num_pending_closures() const +// { +// return ex->num_pending_closures(); +// } + + /** + * \par Effects + * Try to execute one task. + * + * \par Returns + * Whether a task has been executed. + * + * \par Throws + * Whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() { return ex->try_executing_one(); } + + /** + * \par Requires + * This must be called from an scheduled task. + * + * \par Effects + * reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& pred) + { + do { + //schedule_one_or_yield(); + if ( ! try_executing_one()) + { + return false; + } + } while (! pred()); + return true; + } + + }; + } + using executors::generic_executor; +} + +#include + +#endif diff --git a/include/boost/thread/executors/generic_executor_ref.hpp b/include/boost/thread/executors/generic_executor_ref.hpp index 57609c91..b2080e01 100644 --- a/include/boost/thread/executors/generic_executor_ref.hpp +++ b/include/boost/thread/executors/generic_executor_ref.hpp @@ -13,7 +13,8 @@ #include #include -#include +#include +#include #include @@ -99,8 +100,8 @@ namespace boost template generic_executor_ref(Executor& ex) - //: ex(make_shared >(ex)) // todo check why this doesn't works with C++03 - : ex( new executor_ref(ex) ) + //: ex(make_shared::type> >(ex)) // todo check why this doesn't work with C++03 + : ex( new executor_ref::type>(ex) ) { } diff --git a/include/boost/thread/executors/generic_serial_executor.hpp b/include/boost/thread/executors/generic_serial_executor.hpp new file mode 100644 index 00000000..efa9077a --- /dev/null +++ b/include/boost/thread/executors/generic_serial_executor.hpp @@ -0,0 +1,284 @@ +// Copyright (C) 2013,2015 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// 2013/11 Vicente J. Botet Escriba +// first implementation of a simple serial scheduler. + +#ifndef BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_HPP +#define BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include + +namespace boost +{ +namespace executors +{ + class generic_serial_executor + { + public: + /// type-erasure to store the works to do + typedef executors::work work; + private: + + struct shared_state { + typedef executors::work work; + typedef scoped_thread<> thread_t; + + /// the thread safe work queue + concurrent::sync_queue work_queue; + generic_executor ex; + thread_t thr; + + struct try_executing_one_task { + work& task; + boost::promise &p; + try_executing_one_task(work& task, boost::promise &p) + : task(task), p(p) {} + void operator()() { + try { + task(); + p.set_value(); + } catch (...) + { + p.set_exception(current_exception()); + } + } + }; + public: + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + generic_executor& underlying_executor() BOOST_NOEXCEPT { return ex; } + + private: + + /** + * The main loop of the worker thread + */ + void worker_thread() + { + try + { + for(;;) + { + work task; + queue_op_status st = work_queue.wait_pull(task); + if (st == queue_op_status::closed) return; + + boost::promise p; + try_executing_one_task tmp(task,p); + ex.submit(tmp); + p.get_future().wait(); + } + } + catch (...) + { + std::terminate(); + return; + } + } + + public: + /// shared_state is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + template + shared_state(Executor const& ex) + : ex(ex), thr(&shared_state::worker_thread, this) + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c shared_state destructor. + */ + ~shared_state() + { + // signal to the worker thread that there will be no more submissions. + close(); + } + + /** + * \b Effects: close the \c generic_serial_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + work_queue.close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return work_queue.closed(); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c generic_serial_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + work_queue.push(work(closure)); + } + #endif + void submit(void (*closure)()) + { + work_queue.push(work(closure)); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + work_queue.push(work(boost::forward(closure))); + } + + }; + + public: + +// generic_serial_executor(generic_serial_executor const&) = default; +// generic_serial_executor(generic_serial_executor &&) = default; + + /** + * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + template + generic_serial_executor(Executor const& ex + , typename boost::disable_if, + int* >::type = (int*)0) + : pimpl(make_shared(ex)) + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c generic_serial_executor destructor. + */ + ~generic_serial_executor() + { + } + + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + generic_executor& underlying_executor() BOOST_NOEXCEPT + { + return pimpl->underlying_executor(); + } + + /** + * \b Returns: always false as a serial executor can not re-enter. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. + */ + bool try_executing_one() + { + return false; + } + + /** + * \b Effects: close the \c generic_serial_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + pimpl->close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return pimpl->closed(); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c generic_serial_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + +#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + pimpl->submit(closure); + } +#endif + void submit(void (*closure)()) + { + pimpl->submit(closure); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + pimpl->submit(boost::forward(closure)); + } + + /** + * \b Returns: always false as a serial executor can not re-enter. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. + */ + template + bool reschedule_until(Pred const& pred) + { + return false; + } + private: + shared_ptr pimpl; + }; +} +using executors::generic_serial_executor; +} + +#include + +#endif diff --git a/include/boost/thread/executors/generic_serial_executor_cont.hpp b/include/boost/thread/executors/generic_serial_executor_cont.hpp new file mode 100644 index 00000000..1c6cf1fe --- /dev/null +++ b/include/boost/thread/executors/generic_serial_executor_cont.hpp @@ -0,0 +1,265 @@ +// Copyright (C) 2015 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// 2013/11 Vicente J. Botet Escriba +// first implementation of a simple serial scheduler. + +#ifndef BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_CONT_HPP +#define BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_CONT_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +namespace boost +{ +namespace executors +{ + class generic_serial_executor_cont + { + public: + /// type-erasure to store the works to do + typedef executors::work work; + private: + + struct shared_state { + typedef executors::work work; + + generic_executor ex_; + future fut_; // protected by mtx_ + bool closed_; // protected by mtx_ + mutex mtx_; + + struct continuation { + work task; + template + struct result { + typedef void type; + }; + continuation(BOOST_THREAD_RV_REF(work) tsk) + : task(boost::move(tsk)) {} + void operator()(future f) + { + try { + task(); + } catch (...) { + std::terminate(); + } + } + }; + + bool closed(lock_guard&) const + { + return closed_; + } + public: + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + generic_executor& underlying_executor() BOOST_NOEXCEPT { return ex_; } + + /// shared_state is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + * + * \b Notes: + * * The lifetime of the associated executor must outlive the serial executor. + * * The current implementation doesn't support submission from synchronous continuation, that is, + * - the executor must execute the continuation asynchronously or + * - the continuation can not submit to this serial executor. + */ + template + shared_state(Executor const& ex) + : ex_(ex), fut_(make_ready_future()), closed_(false) + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c generic_serial_executor_cont destructor. + */ + ~shared_state() + { + // signal to the worker thread that there will be no more submissions. + close(); + } + + /** + * \b Effects: close the \c generic_serial_executor_cont for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + lock_guard lk(mtx_); + closed_ = true;; + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + lock_guard lk(mtx_); + return closed(lk); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish. + * If the invoked closure throws an exception the \c generic_serial_executor_cont will call \c std::terminate, as is the case with threads. + * + * \b Throws: \c sync_queue_is_closed if the executor is closed. + * Whatever exception that can be throw while storing the closure. + * + */ + +#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(closure))); + } +#endif + void submit(void (*closure)()) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(closure))); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(boost::forward(closure)))); + } + }; + public: + /** + * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + template + generic_serial_executor_cont(Executor const& ex + , typename boost::disable_if, + int* >::type = (int*)0) + : pimpl(make_shared(ex)) + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor. + */ + ~generic_serial_executor_cont() + { + } + + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + generic_executor& underlying_executor() BOOST_NOEXCEPT + { + return pimpl->underlying_executor(); + } + + /** + * \b Returns: always false as a serial executor can not re-enter. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. + */ + bool try_executing_one() + { + return false; + } + + /** + * \b Effects: close the \c serial_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + pimpl->close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return pimpl->closed(); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + +#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + pimpl->submit(closure); + } +#endif + void submit(void (*closure)()) + { + pimpl->submit(closure); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + pimpl->submit(boost::forward(closure)); + } + + /** + * \b Returns: always false as a serial executor can not re-enter. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. + */ + template + bool reschedule_until(Pred const& pred) + { + return false; + } + private: + shared_ptr pimpl; + }; +} +using executors::generic_serial_executor_cont; +} + +#include + +#endif diff --git a/include/boost/thread/executors/inline_executor.hpp b/include/boost/thread/executors/inline_executor.hpp index 5dd52318..ee5a4f1e 100644 --- a/include/boost/thread/executors/inline_executor.hpp +++ b/include/boost/thread/executors/inline_executor.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014 Vicente J. Botet Escriba +// Copyright (C) 2014-2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -14,6 +14,9 @@ #include #include +#include +#include + #include namespace boost @@ -25,142 +28,227 @@ namespace executors public: /// type-erasure to store the works to do typedef executors::work work; - bool closed_; - mutable mutex mtx_; - /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. - */ - bool try_executing_one() - { - return false; - } + private: + struct shared_state { + typedef executors::work work; + + bool closed_; + mutable mutex mtx_; + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return false; + } + + /// shared_state is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a inline executor that runs closures immediately. + * + * \b Throws: Nothing. + */ + shared_state() + : closed_(false) + { + } + /** + * \b Effects: Destroys the inline executor. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c inline_executor destructor. + */ + ~shared_state() + { + // signal to all the worker thread that there will be no more submissions. + close(); + } + + /** + * \b Effects: close the \c inline_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + lock_guard lk(mtx_); + closed_ = true; + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed(lock_guard& ) const + { + return closed_; + } + bool closed() const + { + lock_guard lk(mtx_); + return closed(lk); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } + } + #endif + void submit(void (*closure)()) + { + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } + } + + template + void submit(BOOST_THREAD_FWD_REF(Closure) closure) + { + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } + } + + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& ) + { + return false; + } + }; public: - /// inline_executor is not copyable. - BOOST_THREAD_NO_COPYABLE(inline_executor) + /** + * \b Effects: creates a inline executor that runs closures immediately. + * + * \b Throws: Nothing. + */ + inline_executor() + : pimpl(make_shared()) + { + } - /** - * \b Effects: creates a inline executor that runs closures immediately. - * - * \b Throws: Nothing. - */ - inline_executor() - : closed_(false) - { - } - /** - * \b Effects: Destroys the inline executor. - * - * \b Synchronization: The completion of all the closures happen before the completion of the \c inline_executor destructor. - */ - ~inline_executor() - { - // signal to all the worker thread that there will be no more submissions. - close(); - } + /** + * \b Effects: close the \c inline_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + pimpl->close(); + } - /** - * \b Effects: close the \c inline_executor for submissions. - * The loop will work until there is no more closures to run. - */ - void close() - { - lock_guard lk(mtx_); - closed_ = true; - } + /** + * \b Returns: whether the executor is closed for submissions. + */ + bool closed() const + { + return pimpl->closed(); + } - /** - * \b Returns: whether the pool is closed for submissions. - */ - bool closed(lock_guard& ) - { - return closed_; - } - bool closed() - { - lock_guard lk(mtx_); - return closed(lk); - } - - /** - * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. - * - * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. - * If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads. - * - * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. - * - * \b Throws: \c sync_queue_is_closed if the thread pool is closed. - * Whatever exception that can be throw while storing the closure. - */ + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) - template - void submit(Closure & closure) - { - { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - } - try - { - closure(); - } - catch (...) - { - std::terminate(); - return; - } - } + template + void submit(Closure & closure) + { + pimpl->submit(closure); + } #endif - void submit(void (*closure)()) - { - { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - } - try - { - closure(); - } - catch (...) - { - std::terminate(); - return; - } - } + void submit(void (*closure)()) + { + pimpl->submit(closure); + } - template - void submit(BOOST_THREAD_FWD_REF(Closure) closure) - { - { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - } - try - { - closure(); - } - catch (...) - { - std::terminate(); - return; - } - } + template + void submit(BOOST_THREAD_FWD_REF(Closure) closure) + { + pimpl->submit(boost::forward(closure)); + } - /** - * \b Requires: This must be called from an scheduled task. - * - * \b Effects: reschedule functions until pred() - */ - template - bool reschedule_until(Pred const& ) - { - return false; - } + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return pimpl->try_executing_one(); + } + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& p) + { + return pimpl->reschedule_until(p); + } + private: + shared_ptr pimpl; }; } using executors::inline_executor; diff --git a/include/boost/thread/executors/loop_executor.hpp b/include/boost/thread/executors/loop_executor.hpp index e9eadadf..058122bb 100644 --- a/include/boost/thread/executors/loop_executor.hpp +++ b/include/boost/thread/executors/loop_executor.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2013,2014 Vicente J. Botet Escriba +// Copyright (C) 2013-2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -17,6 +17,9 @@ #include #include +#include +#include + #include namespace boost @@ -30,59 +33,170 @@ namespace executors /// type-erasure to store the works to do typedef executors::work work; private: - /// the thread safe work queue - concurrent::sync_queue work_queue; - public: - /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. - */ - bool try_executing_one() - { - work task; - try + struct shared_state { + typedef executors::work work; + /// the thread safe work queue + concurrent::sync_queue work_queue; + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() { - if (work_queue.try_pull(task) == queue_op_status::success) + work task; + try { + if (work_queue.try_pull(task) == queue_op_status::success) + { + task(); + return true; + } + return false; + } + catch (...) + { + std::terminate(); + return false; + } + } + /** + * Effects: schedule one task or yields + * Throws: whatever the current task constructor throws or the task() throws. + */ + void schedule_one_or_yield() + { + if ( ! try_executing_one()) + { + this_thread::yield(); + } + } + + /// loop_executor is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + shared_state() + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor. + */ + ~shared_state() + { + // signal to all the worker thread that there will be no more submissions. + close(); + } + + /** + * The main loop of the worker thread + */ + void loop() + { + while (!closed()) + { + schedule_one_or_yield(); + } + while (try_executing_one()) + { + } + } + + /** + * \b Effects: close the \c loop_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + work_queue.close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return work_queue.closed(); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c loop_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + work_queue.push(work(closure)); + } + #endif + void submit(void (*closure)()) + { + work_queue.push(work(closure)); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + work_queue.push(work(boost::forward(closure))); + } + + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& pred) + { + do { + if ( ! try_executing_one()) + { + return false; + } + } while (! pred()); + return true; + } + + /** + * run queued closures + */ + void run_queued_closures() + { + sync_queue::underlying_queue_type q = work_queue.underlying_queue(); + while (! q.empty()) + { + work& task = q.front(); task(); - return true; + q.pop_front(); } - return false; } - catch (...) - { - std::terminate(); - return false; - } - } - private: - /** - * Effects: schedule one task or yields - * Throws: whatever the current task constructor throws or the task() throws. - */ - void schedule_one_or_yield() - { - if ( ! try_executing_one()) - { - this_thread::yield(); - } - } - - - + }; public: - /// loop_executor is not copyable. - BOOST_THREAD_NO_COPYABLE(loop_executor) - /** * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. * * \b Throws: Whatever exception is thrown while initializing the needed resources. */ loop_executor() + : pimpl(make_shared()) { } /** @@ -92,22 +206,33 @@ namespace executors */ ~loop_executor() { - // signal to all the worker thread that there will be no more submissions. - close(); } + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return pimpl->try_executing_one(); + } +// /** +// * Effects: schedule one task or yields +// * Throws: whatever the current task constructor throws or the task() throws. +// */ +// void schedule_one_or_yield() +// { +// return pimpl->schedule_one_or_yield(); +// } + + /** * The main loop of the worker thread */ void loop() { - while (!closed()) - { - schedule_one_or_yield(); - } - while (try_executing_one()) - { - } + pimpl->loop(); } /** @@ -116,7 +241,7 @@ namespace executors */ void close() { - work_queue.close(); + pimpl->close(); } /** @@ -124,7 +249,7 @@ namespace executors */ bool closed() { - return work_queue.closed(); + return pimpl->closed(); } /** @@ -143,18 +268,18 @@ namespace executors template void submit(Closure & closure) { - work_queue.push(work(closure)); + pimpl->submit(closure); } #endif void submit(void (*closure)()) { - work_queue.push(work(closure)); + pimpl->submit(closure); } template void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work_queue.push(work(boost::forward(closure))); + pimpl->submit(boost::forward(closure)); } /** @@ -165,13 +290,7 @@ namespace executors template bool reschedule_until(Pred const& pred) { - do { - if ( ! try_executing_one()) - { - return false; - } - } while (! pred()); - return true; + return pimpl->reschedule_until(pred); } /** @@ -179,15 +298,10 @@ namespace executors */ void run_queued_closures() { - sync_queue::underlying_queue_type q = work_queue.underlying_queue(); - while (! q.empty()) - { - work& task = q.front(); - task(); - q.pop_front(); - } + pimpl->run_queued_closures(); } - + private: + shared_ptr pimpl; }; } using executors::loop_executor; diff --git a/include/boost/thread/executors/scheduled_thread_pool.hpp b/include/boost/thread/executors/scheduled_thread_pool.hpp index 408013b2..c13d79e4 100644 --- a/include/boost/thread/executors/scheduled_thread_pool.hpp +++ b/include/boost/thread/executors/scheduled_thread_pool.hpp @@ -9,36 +9,123 @@ #define BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP #include +#include +#include +#include +#include +#include +#include namespace boost { namespace executors { - class scheduled_thread_pool : public detail::scheduled_executor_base<> + template + class scheduled_thread_pool { private: - thread_group _workers; - public: - scheduled_thread_pool(size_t num_threads) : super() - { - for(size_t i = 0; i < num_threads; i++) + struct shared_state : public detail::scheduled_executor_base<> { + + /// basic_thread_pool is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + typedef detail::scheduled_executor_base<> super; + typedef typename super::work work; + + typedef scoped_thread<> thread_t; + typedef csbl::vector thread_vector; + thread_vector threads; + + shared_state(unsigned const thread_count = thread::hardware_concurrency()+1) : super() { - _workers.create_thread(bind(&super::loop, this)); + + try + { + threads.reserve(thread_count); + for (unsigned i = 0; i < thread_count; ++i) + { + #if 1 + thread th (&shared_state::loop, this); + threads.push_back(thread_t(boost::move(th))); + #else + threads.push_back(thread_t(&shared_state::loop, this)); // do not compile + #endif + } + } + catch (...) + { + close(); + throw; + } } + + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor. + */ + ~shared_state() + { + this->close(); + } + }; //end class + + public: + typedef typename shared_state::work work; + typedef Clock clock; + typedef typename clock::duration duration; + typedef typename clock::time_point time_point; + + + /** + * \b Effects: creates a thread pool that runs closures on \c thread_count threads. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + scheduled_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1) + : pimpl(make_shared(thread_count)) + { } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor. + */ ~scheduled_thread_pool() { - this->close(); - _workers.join_all(); + } + /** + * \b Effects: close the \c serial_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + pimpl->close(); } - private: - typedef detail::scheduled_executor_base<> super; - }; //end class + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return pimpl->closed(); + } + void submit_at(work w, const time_point& tp) + { + return pimpl->submit_at(boost::move(w), tp); + } + + void submit_after(work w, const duration& d) + { + return pimpl->submit_after(boost::move(w), d); + } + private: + shared_ptr pimpl; + }; } //end executors namespace using executors::scheduled_thread_pool; diff --git a/include/boost/thread/executors/scheduler.hpp b/include/boost/thread/executors/scheduler.hpp index 5796a7d3..12a51146 100644 --- a/include/boost/thread/executors/scheduler.hpp +++ b/include/boost/thread/executors/scheduler.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014 Vicente J. Botet Escriba +// Copyright (C) 2014-2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -13,6 +13,8 @@ #include #include #include +#include +#include #include @@ -36,7 +38,7 @@ namespace boost } private: - Executor& ex; + Executor ex; Function funct; }; @@ -100,8 +102,8 @@ namespace boost } private: - Scheduler& sch; - Executor& ex; + Scheduler sch; + Executor ex; typename clock::time_point tp; bool is_closed; }; @@ -150,8 +152,8 @@ namespace boost } private: - Scheduler& sch; - Executor& ex; + Scheduler sch; + Executor ex; }; //end class /// Wraps a reference to a @c Scheduler providing an @c Executor that @@ -208,7 +210,7 @@ namespace boost } private: - Scheduler& sch; + Scheduler sch; time_point tp; bool is_closed; }; //end class @@ -217,22 +219,71 @@ namespace boost /// It provides factory helper functions such as at/after that convert a @c Scheduler into an @c Executor /// that submit the work at/after a specific time/duration respectively. template - class scheduler : public detail::scheduled_executor_base + class scheduler { - public: - typedef typename detail::scheduled_executor_base::work work; + private: + struct shared_state : public detail::scheduled_executor_base { + typedef detail::scheduled_executor_base super; + typedef typename super::work work; + thread thr; + + /// shared_state is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + shared_state() + : super(), + thr(&super::loop, this) {} + + ~shared_state() + { + this->close(); + thr.join(); + } + + }; + + public: + typedef typename shared_state::work work; typedef Clock clock; + typedef typename clock::duration duration; + typedef typename clock::time_point time_point; scheduler() - : super(), - thr(&super::loop, this) {} + : pimpl(make_shared()) + {} ~scheduler() { - this->close(); - thr.join(); } + + /** + * \b Effects: close the \c serial_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + pimpl->close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return pimpl->closed(); + } + + void submit_at(work w, const time_point& tp) + { + return pimpl->submit_at(boost::move(w), tp); + } + + void submit_after(work w, const duration& d) + { + return pimpl->submit_after(boost::move(w), d); + } + template scheduler_executor_wrapper on(Ex& ex) { @@ -250,13 +301,10 @@ namespace boost { return at_executor(*this, tp); } - private: - typedef detail::scheduled_executor_base super; - thread thr; + shared_ptr pimpl; }; - } using executors::resubmitter; using executors::resubmit; diff --git a/include/boost/thread/executors/scheduling_adaptor.hpp b/include/boost/thread/executors/scheduling_adaptor.hpp index ac0a0acb..d345558a 100644 --- a/include/boost/thread/executors/scheduling_adaptor.hpp +++ b/include/boost/thread/executors/scheduling_adaptor.hpp @@ -19,7 +19,7 @@ namespace executors class scheduling_adpator : public detail::scheduled_executor_base<> { private: - Executor& _exec; + Executor _exec; thread _scheduler; public: diff --git a/include/boost/thread/executors/serial_executor.hpp b/include/boost/thread/executors/serial_executor.hpp index 6f426666..3ad3fadb 100644 --- a/include/boost/thread/executors/serial_executor.hpp +++ b/include/boost/thread/executors/serial_executor.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2013 Vicente J. Botet Escriba +// Copyright (C) 2013,2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -14,116 +14,168 @@ #include #include #include -#include +#include #include #include +#include +#include + #include namespace boost { namespace executors { + template class serial_executor { public: /// type-erasure to store the works to do typedef executors::work work; private: - typedef scoped_thread<> thread_t; - /// the thread safe work queue - concurrent::sync_queue work_queue; - generic_executor_ref ex; - thread_t thr; + struct shared_state { + typedef executors::work work; + typedef scoped_thread<> thread_t; - struct try_executing_one_task { - work& task; - boost::promise &p; - try_executing_one_task(work& task, boost::promise &p) - : task(task), p(p) {} - void operator()() { - try { - task(); - p.set_value(); - } catch (...) + /// the thread safe work queue + concurrent::sync_queue work_queue; + Executor ex; + thread_t thr; + + struct try_executing_one_task { + work& task; + boost::promise &p; + try_executing_one_task(work& task, boost::promise &p) + : task(task), p(p) {} + void operator()() { + try { + task(); + p.set_value(); + } catch (...) + { + p.set_exception(current_exception()); + } + } + }; + public: + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + Executor& underlying_executor() BOOST_NOEXCEPT { return ex; } + + private: + + /** + * The main loop of the worker thread + */ + void worker_thread() + { + try { - p.set_exception(current_exception()); + for(;;) + { + work task; + queue_op_status st = work_queue.wait_pull(task); + if (st == queue_op_status::closed) return; + + boost::promise p; + try_executing_one_task tmp(task,p); + ex.submit(tmp); + p.get_future().wait(); + } + } + catch (...) + { + std::terminate(); + return; } } + + public: + /// shared_state is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + shared_state(Executor& ex) + : ex(ex), thr(&shared_state::worker_thread, this) + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c shared_state destructor. + */ + ~shared_state() + { + // signal to the worker thread that there will be no more submissions. + close(); + } + + /** + * \b Effects: close the \c serial_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + work_queue.close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return work_queue.closed(); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + work_queue.push(work(closure)); + } + #endif + void submit(void (*closure)()) + { + work_queue.push(work(closure)); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + work_queue.push(work(boost::forward(closure))); + } + }; - public: - /** - * \par Returns - * The underlying executor wrapped on a generic executor reference. - */ - generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; } - - /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. - */ - bool try_executing_one() - { - work task; - try - { - if (work_queue.try_pull(task) == queue_op_status::success) - { - boost::promise p; - try_executing_one_task tmp(task,p); - ex.submit(tmp); - p.get_future().wait(); - return true; - } - return false; - } - catch (...) - { - std::terminate(); - return false; - } - } - private: - /** - * Effects: schedule one task or yields - * Throws: whatever the current task constructor throws or the task() throws. - */ - void schedule_one_or_yield() - { - if ( ! try_executing_one()) - { - this_thread::yield(); - } - } - - /** - * The main loop of the worker thread - */ - void worker_thread() - { - while (!closed()) - { - schedule_one_or_yield(); - } - while (try_executing_one()) - { - } - } public: - /// serial_executor is not copyable. - BOOST_THREAD_NO_COPYABLE(serial_executor) /** * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. * * \b Throws: Whatever exception is thrown while initializing the needed resources. */ - template serial_executor(Executor& ex) - : ex(ex), thr(&serial_executor::worker_thread, this) + : pimpl(make_shared(ex)) { } /** @@ -133,8 +185,24 @@ namespace executors */ ~serial_executor() { - // signal to the worker thread that there will be no more submissions. - close(); + } + + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + Executor& underlying_executor() BOOST_NOEXCEPT + { + return pimpl->underlying_executor(); + } + + /** + * \b Returns: always false as a serial executor can not re-enter. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. + */ + bool try_executing_one() + { + return false; } /** @@ -143,7 +211,7 @@ namespace executors */ void close() { - work_queue.close(); + pimpl->close(); } /** @@ -151,7 +219,7 @@ namespace executors */ bool closed() { - return work_queue.closed(); + return pimpl->closed(); } /** @@ -170,37 +238,31 @@ namespace executors template void submit(Closure & closure) { - work_queue.push(work(closure)); + pimpl->submit(closure); } #endif void submit(void (*closure)()) { - work_queue.push(work(closure)); + pimpl->submit(closure); } template void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work_queue.push(work(boost::forward(closure))); + pimpl->submit(boost::forward(closure)); } /** - * \b Requires: This must be called from an scheduled task. - * - * \b Effects: reschedule functions until pred() + * \b Returns: always false as a serial executor can not re-enter. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. */ template bool reschedule_until(Pred const& pred) { - do { - if ( ! try_executing_one()) - { - return false; - } - } while (! pred()); - return true; + return false; } - + private: + shared_ptr pimpl; }; } using executors::serial_executor; diff --git a/include/boost/thread/executors/serial_executor_cont.hpp b/include/boost/thread/executors/serial_executor_cont.hpp index 1c4cc14a..c8d45b32 100644 --- a/include/boost/thread/executors/serial_executor_cont.hpp +++ b/include/boost/thread/executors/serial_executor_cont.hpp @@ -12,18 +12,20 @@ #include #include #include -#include #include -#include #include #include +#include +#include + #include namespace boost { namespace executors { + template class serial_executor_cont { public: @@ -31,93 +33,156 @@ namespace executors typedef executors::work work; private: - generic_executor_ref ex_; - future fut_; // protected by mtx_ - bool closed_; // protected by mtx_ - mutex mtx_; + struct shared_state { + typedef executors::work work; - struct continuation { - work task; - template - struct result { - typedef void type; - }; - continuation(BOOST_THREAD_RV_REF(work) tsk) - : task(boost::move(tsk)) {} - void operator()(future f) - { - try { - task(); - } catch (...) { - std::terminate(); + Executor ex_; + future fut_; // protected by mtx_ + bool closed_; // protected by mtx_ + mutex mtx_; + + struct continuation { + work task; + template + struct result { + typedef void type; + }; + continuation(BOOST_THREAD_RV_REF(work) tsk) + : task(boost::move(tsk)) {} + void operator()(future f) + { + try { + task(); + } catch (...) { + std::terminate(); + } } + }; + + bool closed(lock_guard&) const + { + return closed_; + } + public: + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + Executor& underlying_executor() BOOST_NOEXCEPT { return ex_; } + + /// shared_state is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + * + * \b Notes: + * * The lifetime of the associated executor must outlive the serial executor. + * * The current implementation doesn't support submission from synchronous continuation, that is, + * - the executor must execute the continuation asynchronously or + * - the continuation can not submit to this serial executor. + */ + shared_state(Executor& ex) + : ex_(ex), fut_(make_ready_future()), closed_(false) + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor_cont destructor. + */ + ~shared_state() + { + // signal to the worker thread that there will be no more submissions. + close(); + } + + /** + * \b Effects: close the \c serial_executor_cont for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + lock_guard lk(mtx_); + closed_ = true;; + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + lock_guard lk(mtx_); + return closed(lk); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish. + * If the invoked closure throws an exception the \c serial_executor_cont will call \c std::terminate, as is the case with threads. + * + * \b Throws: \c sync_queue_is_closed if the executor is closed. + * Whatever exception that can be throw while storing the closure. + * + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(closure))); + } + #endif + void submit(void (*closure)()) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(closure))); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(boost::forward(closure)))); } }; - - bool closed(lock_guard&) const - { - return closed_; - } public: /** - * \par Returns - * The underlying executor wrapped on a generic executor reference. - */ - generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex_; } - - /// serial_executor_cont is not copyable. - BOOST_THREAD_NO_COPYABLE(serial_executor_cont) - - /** - * \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor. + * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. * * \b Throws: Whatever exception is thrown while initializing the needed resources. - * - * \b Notes: - * * The lifetime of the associated executor must outlive the serial executor. - * * The current implementation doesn't support submission from synchronous continuation, that is, - * - the executor must execute the continuation asynchronously or - * - the continuation can not submit to this serial executor. */ - template serial_executor_cont(Executor& ex) - : ex_(ex), fut_(make_ready_future()), closed_(false) + : pimpl(make_shared(ex)) { } /** * \b Effects: Destroys the thread pool. * - * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor_cont destructor. + * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor. */ ~serial_executor_cont() { - // signal to the worker thread that there will be no more submissions. - close(); } /** - * \b Effects: close the \c serial_executor_cont for submissions. - * The loop will work until there is no more closures to run. + * \par Returns + * The underlying executor wrapped on a generic executor reference. */ - void close() + Executor& underlying_executor() BOOST_NOEXCEPT { - lock_guard lk(mtx_); - closed_ = true;; + return pimpl->underlying_executor(); } /** - * \b Returns: whether the pool is closed for submissions. - */ - bool closed() - { - lock_guard lk(mtx_); - return closed(lk); - } - - /** - * Effects: none. - * Returns: always false. - * Throws: No. + * \b Returns: always false as a serial executor can not re-enter. * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. */ bool try_executing_one() @@ -125,41 +190,64 @@ namespace executors return false; } + /** + * \b Effects: close the \c serial_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + pimpl->close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return pimpl->closed(); + } + /** * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. * - * \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish. - * If the invoked closure throws an exception the \c serial_executor_cont will call \c std::terminate, as is the case with threads. + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads. * - * \b Throws: \c sync_queue_is_closed if the executor is closed. + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. * Whatever exception that can be throw while storing the closure. - * */ #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) template void submit(Closure & closure) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - fut_ = fut_.then(ex_, continuation(work(closure))); + pimpl->submit(closure); } #endif void submit(void (*closure)()) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - fut_ = fut_.then(ex_, continuation(work(closure))); + pimpl->submit(closure); } template void submit(BOOST_THREAD_RV_REF(Closure) closure) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - fut_ = fut_.then(ex_, continuation(work(boost::forward(closure)))); + pimpl->submit(boost::forward(closure)); } + /** + * \b Returns: always false as a serial executor can not re-enter. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. + */ + template + bool reschedule_until(Pred const& pred) + { + return false; + } + private: + shared_ptr pimpl; }; } using executors::serial_executor_cont; diff --git a/include/boost/thread/executors/thread_executor.hpp b/include/boost/thread/executors/thread_executor.hpp index a8cd5c21..e2e0b001 100644 --- a/include/boost/thread/executors/thread_executor.hpp +++ b/include/boost/thread/executors/thread_executor.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014 Vicente J. Botet Escriba +// Copyright (C) 2014-2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -18,6 +18,9 @@ #include #include +#include +#include + #include namespace boost @@ -29,33 +32,134 @@ namespace executors public: /// type-erasure to store the works to do typedef executors::work work; - bool closed_; - typedef scoped_thread<> thread_t; - typedef csbl::vector threads_type; - threads_type threads_; - mutable mutex mtx_; + private: - /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. - */ - bool try_executing_one() - { - return false; - } + struct shared_state { + typedef executors::work work; + bool closed_; + typedef scoped_thread<> thread_t; + typedef csbl::vector threads_type; + threads_type threads_; + mutable mutex mtx_; + /// thread_executor is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a inline executor that runs closures immediately. + * + * \b Throws: Nothing. + */ + shared_state() + : closed_(false) + { + } + /** + * \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor. + */ + ~shared_state() + { + // signal to all the worker thread that there will be no more submissions. + close(); + // all the scoped threads will join before destroying + } + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return false; + } + + /** + * \b Effects: close the \c thread_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + lock_guard lk(mtx_); + closed_ = true; + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed(lock_guard& ) + { + return closed_; + } + bool closed() + { + lock_guard lk(mtx_); + return closed(lk); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c thread_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); + thread th(closure); + threads_.push_back(thread_t(boost::move(th))); + } + #endif + void submit(void (*closure)()) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); + thread th(closure); + threads_.push_back(thread_t(boost::move(th))); + } + + template + void submit(BOOST_THREAD_FWD_REF(Closure) closure) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); + thread th(boost::forward(closure)); + threads_.push_back(thread_t(boost::move(th))); + } + + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const&) + { + return false; + } + }; public: - /// thread_executor is not copyable. - BOOST_THREAD_NO_COPYABLE(thread_executor) - /** * \b Effects: creates a inline executor that runs closures immediately. * * \b Throws: Nothing. */ thread_executor() - : closed_(false) + : pimpl(make_shared()) { } /** @@ -65,9 +169,16 @@ namespace executors */ ~thread_executor() { - // signal to all the worker thread that there will be no more submissions. - close(); - // all the scoped threads will join before destroying + } + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return pimpl->try_executing_one(); } /** @@ -76,21 +187,15 @@ namespace executors */ void close() { - lock_guard lk(mtx_); - closed_ = true; + pimpl->close(); } /** * \b Returns: whether the pool is closed for submissions. */ - bool closed(lock_guard& ) - { - return closed_; - } bool closed() { - lock_guard lk(mtx_); - return closed(lk); + return pimpl->closed(); } /** @@ -109,30 +214,18 @@ namespace executors template void submit(Closure & closure) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - threads_.reserve(threads_.size() + 1); - thread th(closure); - threads_.push_back(thread_t(boost::move(th))); + pimpl->submit(closure); } #endif void submit(void (*closure)()) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - threads_.reserve(threads_.size() + 1); - thread th(closure); - threads_.push_back(thread_t(boost::move(th))); + pimpl->submit(closure); } template void submit(BOOST_THREAD_FWD_REF(Closure) closure) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - threads_.reserve(threads_.size() + 1); - thread th(boost::forward(closure)); - threads_.push_back(thread_t(boost::move(th))); + pimpl->submit(boost::forward(closure)); } /** @@ -141,12 +234,14 @@ namespace executors * \b Effects: reschedule functions until pred() */ template - bool reschedule_until(Pred const&) + bool reschedule_until(Pred const& p) { - return false; + return pimpl->reschedule_until(p); } - + private: + shared_ptr pimpl; }; + } using executors::thread_executor; } diff --git a/include/boost/thread/experimental/parallel/v2/task_region.hpp b/include/boost/thread/experimental/parallel/v2/task_region.hpp index 3a278c50..63aad9b0 100755 --- a/include/boost/thread/experimental/parallel/v2/task_region.hpp +++ b/include/boost/thread/experimental/parallel/v2/task_region.hpp @@ -116,9 +116,9 @@ BOOST_THREAD_INLINE_NAMESPACE(v2) template friend void task_region_final(BOOST_THREAD_FWD_REF(F) f); template - friend void task_region(Ex&, BOOST_THREAD_FWD_REF(F) f); + friend void task_region(Ex const&, BOOST_THREAD_FWD_REF(F) f); template - friend void task_region_final(Ex&, BOOST_THREAD_FWD_REF(F) f); + friend void task_region_final(Ex const&, BOOST_THREAD_FWD_REF(F) f); void wait_all() { @@ -153,21 +153,20 @@ protected: #if defined BOOST_THREAD_TASK_REGION_HAS_SHARED_CANCELED && defined BOOST_THREAD_PROVIDES_EXECUTORS task_region_handle_gen() : canceled(false) - , ex(0) {} - task_region_handle_gen(Executor& ex) + task_region_handle_gen(Executor const& ex) : canceled(false) - , ex(&ex) + , ex(ex) {} #endif #if ! defined BOOST_THREAD_TASK_REGION_HAS_SHARED_CANCELED && defined BOOST_THREAD_PROVIDES_EXECUTORS task_region_handle_gen() - : ex(0) + //: ex(0) {} - task_region_handle_gen(Executor& ex) - : ex(&ex) + task_region_handle_gen(Executor const& ex) + : ex(ex) {} #endif @@ -188,7 +187,7 @@ protected: bool canceled; #endif #if defined BOOST_THREAD_PROVIDES_EXECUTORS - Executor* ex; + Executor ex; #endif exception_list exs; typedef csbl::vector > group_type; @@ -211,13 +210,13 @@ protected: } } #if defined BOOST_THREAD_PROVIDES_EXECUTORS - group.push_back(async(*ex, detail::wrapped, F>(*this, forward(f)))); + group.push_back(async(ex, detail::wrapped, F>(*this, forward(f)))); #else group.push_back(async(detail::wrapped, F>(*this, forward(f)))); #endif #else #if defined BOOST_THREAD_PROVIDES_EXECUTORS - group.push_back(async(*ex, forward(f))); + group.push_back(async(ex, forward(f))); #else group.push_back(async(forward(f))); #endif @@ -245,17 +244,18 @@ protected: class task_region_handle : public task_region_handle_gen { - default_executor tp; + //default_executor tp; template friend void task_region(BOOST_THREAD_FWD_REF(F) f); template friend void task_region_final(BOOST_THREAD_FWD_REF(F) f); protected: - task_region_handle() : task_region_handle_gen() + task_region_handle() + : task_region_handle_gen() { #if defined BOOST_THREAD_PROVIDES_EXECUTORS - ex = &tp; + //ex = &tp; #endif } BOOST_DELETED_FUNCTION(task_region_handle(const task_region_handle&)) @@ -265,7 +265,7 @@ protected: }; template - void task_region_final(Executor& ex, BOOST_THREAD_FWD_REF(F) f) + void task_region_final(Executor const& ex, BOOST_THREAD_FWD_REF(F) f) { task_region_handle_gen tr(ex); try @@ -280,7 +280,7 @@ protected: } template - void task_region(Executor& ex, BOOST_THREAD_FWD_REF(F) f) + void task_region(Executor const& ex, BOOST_THREAD_FWD_REF(F) f) { task_region_final(ex, forward(f)); } diff --git a/test/Jamfile.v2 b/test/Jamfile.v2 index c8656374..5761c621 100644 --- a/test/Jamfile.v2 +++ b/test/Jamfile.v2 @@ -810,7 +810,10 @@ rule thread-compile ( sources : reqs * : name ) [ thread-run2 ../example/user_scheduler.cpp : ex_user_scheduler ] #[ thread-run2 ../example/executor.cpp : ex_executor ] #[ thread-run2 ../example/generic_executor_ref.cpp : ex_generic_executor_ref ] + #[ thread-run2 ../example/generic_executor.cpp : ex_generic_executor ] + [ thread-run2 ../example/generic_serial_executor.cpp : ex_generic_serial_executor ] [ thread-run2 ../example/serial_executor.cpp : ex_serial_executor ] + #[ thread-run2 ../example/generic_serial_executor_cont.cpp : ex_generic_serial_executor_cont ] [ thread-run2 ../example/serial_executor_cont.cpp : ex_serial_executor_cont ] [ thread-run2 ../example/future_when_all.cpp : ex_future_when_all ] #[ thread-run2 ../example/parallel_accumulate.cpp : ex_parallel_accumulate ] @@ -959,13 +962,15 @@ rule thread-compile ( sources : reqs * : name ) test-suite ts_ : #[ thread-run test_11256.cpp ] - #[ thread-run2 ../example/executor.cpp : ex_executor2 ] - #[ thread-run2 ../example/generic_executor_ref.cpp : ex_generic_executor_ref2 ] - #[ thread-run2-noit ./sync/futures/async/async_executor_pass.cpp : async__async_executor_p2 ] - #[ thread-run2 ../example/fib_task_region.cpp : ex_fib_task_region2 ] - #[ thread-run2-noit ./experimental/parallel/v2/task_region_pass.cpp : task_region_p2 ] - #[ thread-run2 ../example/parallel_quick_sort.cpp : ex_parallel_quick_sort2 ] - #[ thread-run2 ../example/parallel_accumulate.cpp : ex_parallel_accumulate2 ] + [ thread-run2 ../example/executor.cpp : ex_executor2 ] + [ thread-run2 ../example/generic_executor_ref.cpp : ex_generic_executor_ref2 ] + [ thread-run2 ../example/generic_executor.cpp : ex_generic_executor2 ] + [ thread-run2 ../example/generic_serial_executor_cont.cpp : ex_generic_serial_executor_cont2 ] + [ thread-run2-noit ./sync/futures/async/async_executor_pass.cpp : async__async_executor_p2 ] + [ thread-run2 ../example/fib_task_region.cpp : ex_fib_task_region2 ] + [ thread-run2-noit ./experimental/parallel/v2/task_region_pass.cpp : task_region_p2 ] + [ thread-run2 ../example/parallel_quick_sort.cpp : ex_parallel_quick_sort2 ] + [ thread-run2 ../example/parallel_accumulate.cpp : ex_parallel_accumulate2 ] ; diff --git a/test/test_scheduled_tp.cpp b/test/test_scheduled_tp.cpp index b4503a8b..ca0f3f39 100644 --- a/test/test_scheduled_tp.cpp +++ b/test/test_scheduled_tp.cpp @@ -24,7 +24,7 @@ using namespace boost::chrono; -typedef boost::scheduled_thread_pool scheduled_tp; +typedef boost::scheduled_thread_pool<> scheduled_tp; void fn(int x) { @@ -46,19 +46,18 @@ void func2(scheduled_tp* tp, steady_clock::duration d) void test_timing(const int n) { //This function should take n seconds to execute. - boost::scheduled_thread_pool se(4); + boost::scheduled_thread_pool<> se(4); for(int i = 1; i <= n; i++) { se.submit_after(boost::bind(fn,i), milliseconds(i*100)); } - boost::this_thread::sleep_for(boost::chrono::seconds(10)); //dtor is called here so all task will have to be executed before we return } void test_deque_timing() { - boost::scheduled_thread_pool se(4); + boost::scheduled_thread_pool<> se(4); for(int i = 0; i < 10; i++) { steady_clock::duration d = milliseconds(i*100); @@ -85,10 +84,10 @@ void test_deque_multi(const int n) int main() { - steady_clock::time_point start = steady_clock::now(); + //steady_clock::time_point start = steady_clock::now(); test_timing(5); - steady_clock::duration diff = steady_clock::now() - start; - BOOST_TEST(diff > milliseconds(500)); + //steady_clock::duration diff = steady_clock::now() - start; + //BOOST_TEST(diff > milliseconds(500)); test_deque_timing(); test_deque_multi(4); test_deque_multi(8); diff --git a/test/test_scheduler.cpp b/test/test_scheduler.cpp index 5847ff01..c72715b8 100644 --- a/test/test_scheduler.cpp +++ b/test/test_scheduler.cpp @@ -27,7 +27,6 @@ typedef boost::executors::basic_thread_pool thread_pool; void fn(int x) { - //std::cout << "[" << __LINE__ << "] " << steady_clock::now() << std::endl; std::cout << x << std::endl; } @@ -75,7 +74,7 @@ int main() test_after(5, sch); test_at(5, sch); test_on(5, sch, tp); - boost::this_thread::sleep_for(boost::chrono::seconds(10)); + std::cout << "[" << __LINE__ << "] " << std::endl; return boost::report_errors(); } diff --git a/test/test_scheduling_adaptor.cpp b/test/test_scheduling_adaptor.cpp index ba43551f..c994e38c 100644 --- a/test/test_scheduling_adaptor.cpp +++ b/test/test_scheduling_adaptor.cpp @@ -28,7 +28,6 @@ typedef boost::executors::basic_thread_pool thread_pool; void fn(int x) { - //std::cout << "[" << __LINE__ << "] " << steady_clock::now() << std::endl; std::cout << x << std::endl; } @@ -41,14 +40,10 @@ void test_timing(const int n) sa.submit_after(boost::bind(fn,i),seconds(i)); sa.submit_after(boost::bind(fn,i), milliseconds(i*100)); } - boost::this_thread::sleep_for(boost::chrono::seconds(10)); } int main() { - steady_clock::time_point start = steady_clock::now(); test_timing(5); - steady_clock::duration diff = steady_clock::now() - start; - BOOST_TEST(diff > seconds(5)); return boost::report_errors(); }