2
0
mirror of https://github.com/boostorg/thread.git synced 2026-02-04 09:52:14 +00:00

Compare commits

..

44 Commits

Author SHA1 Message Date
Vicente J. Botet Escriba
6e427659a4 Remove the set_executor promise/packged_task member function. Added a promise constructor taking an Executor as parameter. The packaged_task will come later. 2015-10-29 23:15:48 +01:00
Vicente J. Botet Escriba
c108444e78 s/ex/excp when ex means exception. Mark the executor type erased code. 2015-10-29 18:18:29 +01:00
Vicente J. Botet Escriba
c0317c5206 Add return *this in invoker assignment. 2015-10-29 11:06:48 +01:00
Vicente J. Botet Escriba
06d2571ec6 Merge branch 'master' into develop 2015-10-22 23:05:13 +02:00
Vicente J. Botet Escriba
fdaba4efe7 try to fix Windows issue with system::time_point arithmetic. 2015-10-22 23:04:23 +02:00
Vicente J. Botet Escriba
8a7cd83123 Merge branch 'develop' 2015-10-22 11:22:27 +02:00
Vicente J. Botet Escriba
98a5e343f8 fix compiler error on time_point_cast. 2015-10-22 11:15:20 +02:00
Vicente J. Botet Escriba
89d8e18c82 Merge branch 'develop' 2015-10-22 00:43:17 +02:00
Vicente J. Botet Escriba
d9492530bd fix timi_point conversion. 2015-10-21 20:19:56 +02:00
Vicente J. Botet Escriba
f6c732b124 Merge branch 'develop' 2015-10-21 18:59:44 +02:00
Vicente J. Botet Escriba
dcc3227668 don't use steady clock if not supported. 2015-10-20 10:03:51 +02:00
Vicente J. Botet Escriba
b3d237731a avoid Boost.Test. 2015-10-20 10:03:15 +02:00
Vicente J. Botet Escriba
4321b59c1e Merge branch 'develop' 2015-10-20 00:23:51 +02:00
Vicente J. Botet Escriba
67759325eb update version 2015-10-20 00:23:06 +02:00
Vicente J. Botet Escriba
8153e2a652 try to fix MSVC issue with template class. 2015-10-19 20:01:42 +02:00
Vicente J. Botet Escriba
7876163c68 make sync optional. 2015-10-19 00:50:29 +02:00
Vicente J. Botet Escriba
805fa41a4e try to fix issues with gcc-3.x.y with not supported -Wno-variadic-macros. 2015-10-18 18:50:34 +02:00
Vicente J. Botet Escriba
0e6376d93a make async return a blocking future. 2015-10-18 18:47:24 +02:00
Vicente J. Botet Escriba
bf1fc5158e Added launch::inherit and specialize the behavior for the(launch::inherit, Cont). 2015-10-18 04:17:27 +02:00
Vicente J. Botet Escriba
1e4e9ab84c #11734. future::then(Cont) should be able to execute the contination on undetermined thread. 2015-10-18 00:36:25 +02:00
Vicente J. Botet Escriba
88ab663ac5 Merge branch 'feature/non_blocking_futures' into develop 2015-10-17 14:57:39 +02:00
Vicente J. Botet Escriba
0ab63b9248 Remove counting when async future blocking. 2015-10-17 11:38:06 +02:00
Vicente J. Botet Escriba
73053e4abe merge from develop. 2015-10-17 10:51:50 +02:00
Vicente J. Botet Escriba
9a4fbbec5d Merge branch 'develop' 2015-10-17 08:22:38 +02:00
Vicente J. Botet Escriba
b4744a2aa8 then can be now sync. The destructor of a shared state that joins check for thread_ifbefore join. Adde continuation_shared_state to share common behavior. 2015-10-17 00:44:45 +02:00
Vicente J. Botet Escriba
4169bcee44 Addoverloadsfor shared. 2015-10-17 00:40:53 +02:00
Vicente J. Botet Escriba
baf516e82f comment serial conttest. 2015-10-17 00:40:23 +02:00
Vicente J. Botet Escriba
cc309eef8d refactor ::then(f) in function of ::then(launch_policy, f). 2015-10-15 23:31:38 +02:00
Vicente J. Botet Escriba
45cc1704ef fix some missing decay. Cleanup adding init. 2015-10-15 19:19:07 +02:00
Vicente J. Botet Escriba
278a06fd47 Merge pull request #71 from jhunold/executors_fix
Fix executor forward
2015-10-12 23:49:48 +02:00
Jürgen Hunold
6ed577f4eb Fix executor forward
correct namespace and type
2015-10-12 08:01:19 +02:00
Vicente J. Botet Escriba
bf8459cf23 Merge from develop. 2015-10-11 23:34:45 +02:00
Vicente J. Botet Escriba
c7df715709 Merge branch 'develop' 2015-10-06 23:27:17 +02:00
Vicente J. Botet Escriba
773f8bfcb4 replace >> by > >. 2015-09-30 23:57:00 +02:00
Vicente J. Botet Escriba
8afcbe22af uncomment fixed test after merge from develop. 2015-09-30 00:08:16 +02:00
Vicente J. Botet Escriba
450f34daed merge from develop. 2015-09-30 00:01:33 +02:00
Vicente J. Botet Escriba
730cb550e6 Merge branch 'develop' 2015-09-27 08:24:26 +02:00
Vicente J. Botet Escriba
a3497e1ffc Merge branch 'develop' 2015-08-22 07:20:49 +02:00
Vicente J. Botet Escriba
f25bc8bbab Merge branch 'develop' 2015-04-18 11:56:34 +02:00
Vicente J. Botet Escriba
0f6a3ebbe5 Merge branch 'develop' 2015-03-21 15:44:02 +01:00
Vicente J. Botet Escriba
287100119a Merge branch 'develop' 2015-03-17 03:16:35 +01:00
Vicente J. Botet Escriba
d9594e7fc8 Merge branch 'develop' 2015-03-01 18:54:42 +01:00
Vicente J. Botet Escriba
855e56076b Merge branch 'develop' 2015-02-19 08:59:18 +01:00
Vicente J. Botet Escriba
3c6a183aa3 Merge branch 'develop' 2015-02-08 18:19:07 +01:00
45 changed files with 1412 additions and 3323 deletions

View File

@@ -60,7 +60,8 @@ project boost/thread
<toolset>gcc:<cxxflags>-Wno-long-long
#<toolset>gcc:<cxxflags>-ansi
#<toolset>gcc:<cxxflags>-fpermissive
<toolset>gcc:<cxxflags>-Wno-variadic-macros
<toolset>gcc-4:<cxxflags>-Wno-variadic-macros
<toolset>gcc-5:<cxxflags>-Wno-variadic-macros
#<toolset>gcc:<cxxflags>-Wunused-local-typedefs
<toolset>gcc:<cxxflags>-Wunused-function
<toolset>gcc:<cxxflags>-Wno-unused-parameter
@@ -70,7 +71,9 @@ project boost/thread
#<toolset>darwin:<cxxflags>-ansi
<toolset>darwin:<cxxflags>-fpermissive
<toolset>darwin:<cxxflags>-Wno-long-long
<toolset>darwin:<cxxflags>-Wno-variadic-macros
#<toolset>darwin:<cxxflags>-Wno-variadic-macros
<toolset>darwin-4:<cxxflags>-Wno-variadic-macros
<toolset>darwin-5:<cxxflags>-Wno-variadic-macros
#<toolset>darwin:<cxxflags>-Wunused-local-typedefs
<toolset>darwin:<cxxflags>-Wunused-function
<toolset>darwin:<cxxflags>-Wno-unused-parameter

View File

@@ -174,7 +174,7 @@ This has several advantages:
* The scheduled operations are available for all the executors via wrappers.
* The template functions could accept any chrono `time_point` and `duration` respectively as we are not working with virtual functions.
In order to manage with all the clocks, this library propose a generic solution. `scheduler<Clock>` know how to manage with the `submit_at`/`submit_after` `Clock::time_point`/`Clock::duration` tasks. Note that the durations on different clocks differ.
In order to manage with all the clocks, this library propose generic solution. `scheduler<Clock>` know how to manage with the `submit_at`/`submit_after` `Clock::time_point`/`Clock::duration` tasks. Note that the durations on different clocks differ.
[heading Not Handled Exceptions]
As in N3785 and based on the same design decision than `std`/`boost::thread` if a user closure throws an exception, the executor must call the `std::terminate` function.
@@ -506,6 +506,9 @@ Executor abstract base class.
public:
typedef boost::work work;
executor(executor const&) = delete;
executor& operator=(executor const&) = delete;
executor();
virtual ~executor() {};
@@ -570,6 +573,9 @@ Polymorphic adaptor of a model of Executor to an executor.
public:
typedef executor::work work;
executor_adaptor(executor_adaptor const&) = delete;
executor_adaptor& operator=(executor_adaptor const&) = delete;
template <typename ...Args>
executor_adaptor(Args&& ... args);
@@ -635,24 +641,20 @@ Polymorphic adaptor of a model of Executor to an executor.
[/////////////////////////////////]
[section:generic_executor_ref Class `generic_executor_ref`]
Type erased executor class.
Executor abstract base class.
#include <boost/thread/generic_executor_ref.hpp>
namespace boost {
class generic_executor_ref
{
public:
generic_executor_ref(generic_executor_ref const&);
generic_executor_ref& operator=(generic_executor_ref const&);
template <class Executor>
generic_executor_ref(Executor& ex);
generic_executor_ref() = delete;
generic_executor_ref(generic_executor_ref const&) = default;
generic_executor_ref(generic_executor_ref &&) = default;
generic_executor_ref& operator=(generic_executor_ref const&) = default;
generic_executor_ref& operator=(generic_executor_ref &&) = default;
generic_executor_ref() {};
executor& underlying_executor() noexcept;
void close() = 0;
bool closed() = 0;
@@ -667,44 +669,6 @@ Type erased executor class.
[endsect]
[/////////////////////////////////]
[section:generic_executor Class `generic_executor`]
Type erased executor class.
#include <boost/thread/generic_executor.hpp>
namespace boost {
class generic_executor
{
public:
template <class Executor>
generic_executor(Executor& ex);
generic_executor() = delete;
generic_executor(generic_executor const&) = default;
generic_executor(generic_executor &&) = default;
generic_executor& operator=(generic_executor const&) = default;
generic_executor& operator=(generic_executor &&) = default;
executor& underlying_executor() noexcept;
void close() = 0;
bool closed() = 0;
template <typename Closure>
void submit(Closure&& closure);
virtual bool try_executing_one() = 0;
template <typename Pred>
bool reschedule_until(Pred const& pred);
};
}
[endsect]
[//////////////////////////////////////////////////////////]
[section: scheduler Template Class `scheduler `]
@@ -719,7 +683,10 @@ Scheduler providing time related functions. Note that `scheduler` is not an Exec
public:
using work = boost::function<void()> ;
using clock = Clock;
scheduler(scheduler const&) = delete;
scheduler& operator=(scheduler const&) = delete;
scheduler();
~scheduler();
@@ -766,7 +733,7 @@ Scheduler providing time related functions. Note that `scheduler` is not an Exec
[[Effects:] [Destroys the scheduler.]]
[[Synchronization:] [The completion of all the closures happen before the completion of the destructor.]]
[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]]
]
@@ -1372,6 +1339,10 @@ A serial executor ensuring that there are no two work units that executes concur
class serial_executor
{
public:
serial_executor(serial_executor const&) = delete;
serial_executor& operator=(serial_executor const&) = delete;
template <class Executor>
serial_executor(Executor& ex);
Executor& underlying_executor() noexcept;
@@ -1422,7 +1393,7 @@ A serial executor ensuring that there are no two work units that executes concur
[/////////////////////////////////////]
[section:underlying_executor Function member `underlying_executor()`]
Executor& underlying_executor() noexcept;
generic_executor_ref& underlying_executor() noexcept;
[variablelist
@@ -1447,10 +1418,13 @@ A serial executor ensuring that there are no two work units that executes concur
class generic_serial_executor
{
public:
generic_serial_executor(generic_serial_executor const&) = delete;
generic_serial_executor& operator=(generic_serial_executor const&) = delete;
template <class Executor>
generic_serial_executor(Executor& ex);
generic_executor& underlying_executor() noexcept;
generic_executor_ref& underlying_executor() noexcept;
void close();
bool closed();
@@ -1459,7 +1433,6 @@ A serial executor ensuring that there are no two work units that executes concur
void submit(Closure&& closure);
bool try_executing_one();
template <typename Pred>
bool reschedule_until(Pred const& pred);
@@ -1483,7 +1456,7 @@ A serial executor ensuring that there are no two work units that executes concur
[endsect]
[/////////////////////////////////////]
[section:destructor Destructor `~generic_serial_executor()`]
[section:destructor Destructor `~serial_executor()`]
~generic_serial_executor();
@@ -1495,80 +1468,6 @@ A serial executor ensuring that there are no two work units that executes concur
]
[endsect]
[/////////////////////////////////////]
[section:underlying_executor Function member `underlying_executor()`]
generic_executor& underlying_executor() noexcept;
[variablelist
[[Return:] [The underlying executor instance. ]]
]
[endsect]
[endsect]
[//////////////////////////////////////////////////////////]
[section:serial_executor_cont Template Class `serial_executor_cont`]
A serial executor ensuring that there are no two work units that executes concurrently.
#include <boost/thread/serial_executor_cont.hpp>
namespace boost {
template <class Executor>
class serial_executor_cont
{
public:
serial_executor_cont(Executor& ex);
Executor& underlying_executor() noexcept;
void close();
bool closed();
template <typename Closure>
void submit(Closure&& closure);
bool try_executing_one();
template <typename Pred>
bool reschedule_until(Pred const& pred);
};
}
[/////////////////////////////////////]
[section:constructor Constructor `serial_executor_cont(Executor&)`]
template <class Executor>
serial_executor_cont(Executor& ex);
[variablelist
[[Effects:] [Constructs a serial_executor_cont. ]]
[[Throws:] [Nothing. ]]
]
[endsect]
[/////////////////////////////////////]
[section:destructor Destructor `~serial_executor_cont()`]
~serial_executor_cont();
[variablelist
[[Effects:] [Destroys the serial_executor.]]
[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]]
]
[endsect]
[/////////////////////////////////////]
[section:underlying_executor Function member `underlying_executor()`]
@@ -1579,8 +1478,6 @@ A serial executor ensuring that there are no two work units that executes concur
[[Return:] [The underlying executor instance. ]]
[[Throws:] [Nothing.]]
]
@@ -1588,82 +1485,6 @@ A serial executor ensuring that there are no two work units that executes concur
[endsect]
[//////////////////////////////////////////////////////////]
[section:generic_serial_cont_executor Class `generic_serial_cont_executor`]
A serial executor ensuring that there are no two work units that executes concurrently.
#include <boost/thread/generic_serial_cont_executor.hpp>
namespace boost {
class generic_serial_cont_executor
{
public:
template <class Executor>
generic_serial_cont_executor(Executor& ex);
generic_executor& underlying_executor() noexcept;
void close();
bool closed();
template <typename Closure>
void submit(Closure&& closure);
bool try_executing_one();
template <typename Pred>
bool reschedule_until(Pred const& pred);
};
}
[/////////////////////////////////////]
[section:constructor Constructor `generic_serial_cont_executor(Executor&)`]
template <class Executor>
generic_serial_cont_executor(Executor& ex);
[variablelist
[[Effects:] [Constructs a generic_serial_cont_executor. ]]
[[Throws:] [Nothing. ]]
]
[endsect]
[/////////////////////////////////////]
[section:destructor Destructor `~generic_serial_cont_executor()`]
~generic_serial_cont_executor();
[variablelist
[[Effects:] [Destroys the generic_serial_cont_executor.]]
[[Synchronization:] [The completion of all the closures happen before the completion of the executor destructor.]]
]
[endsect]
[/////////////////////////////////////]
[section:underlying_executor Function member `underlying_executor()`]
generic_executor& underlying_executor() noexcept;
[variablelist
[[Return:] [The underlying executor instance. ]]
]
[endsect]
[endsect]
[//////////////////////////////////////////////////////////]
[section:inline_executor Class `inline_executor`]
@@ -1675,8 +1496,10 @@ A serial executor ensuring that there are no two work units that executes concur
class inline_executor
{
public:
inline_executor(inline_executor const&) = delete;
inline_executor& operator=(inline_executor const&) = delete;
inline_executor();
~inline_executor();
void close();
bool closed();
@@ -1737,6 +1560,9 @@ A thread pool with up to a fixed number of threads.
class basic_thread_pool
{
public:
basic_thread_pool(basic_thread_pool const&) = delete;
basic_thread_pool& operator=(basic_thread_pool const&) = delete;
basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency());
template <class AtThreadEntry>
@@ -1797,6 +1623,9 @@ A thread_executor with a threads for each task.
{
public:
thread_executor(thread_executor const&) = delete;
thread_executor& operator=(thread_executor const&) = delete;
thread_executor();
template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, AtThreadEntry at_thread_entry);
@@ -1851,6 +1680,9 @@ A user scheduled executor.
class loop_executor
{
public:
loop_executor(loop_executor const&) = delete;
loop_executor& operator=(loop_executor const&) = delete;
loop_executor();
~loop_executor();

View File

@@ -23,7 +23,13 @@ Please take a look at [@http://www.boost.org/development/tests/master/developer/
[*New Experimental Features:]
* [@http://svn.boost.org/trac/boost/ticket/11231 #11231] Allow to set continuation future's destructor behavior to non-blocking
* [@http://svn.boost.org/trac/boost/ticket/11424 #11424] Provide shared_timed_mutex as an alternative name for shared_mutex and deprecate the use of shared_mutex as a timed mutex
* [@http://svn.boost.org/trac/boost/ticket/11734 #11734] future::then(Cont) should be able to execute the contination on undetermined thread
* [@http://svn.boost.org/trac/boost/ticket/11736 #11736] Allow to use launch::executor on future::then(launch::executor, cont)
* [@http://svn.boost.org/trac/boost/ticket/11737 #11737] Add a launch::inherit policy that can be used on ::then() to use the policy of the parent future
[*Fixed Bugs:]
@@ -35,23 +41,28 @@ Please take a look at [@http://www.boost.org/development/tests/master/developer/
* [@http://svn.boost.org/trac/boost/ticket/9309 #9309] test_latch fails often on clang-darwin-tot11
* [@http://svn.boost.org/trac/boost/ticket/10788 #10788] GetLogicalProcessor isn't available for Windows platform less or equals to 0x0502
* [@http://svn.boost.org/trac/boost/ticket/11090 #11090] ex_future_unwrap- ThreadSanitizer: lock-order-inversion (potential deadlock)
* [@http://svn.boost.org/trac/boost/ticket/11158 #11158] Pthread thread deadlock when faketime used
* [@http://svn.boost.org/trac/boost/ticket/11174 #11174] boost::condition_variable::timed_wait with predicate unexpectedly wakes up while should wait infinite
* [@http://svn.boost.org/trac/boost/ticket/11185 #11185] Incorrect URL redirection
* [@http://svn.boost.org/trac/boost/ticket/11192 #11192] boost::future<>::then() with an executor doesn't compile when the callback returns a future
* [@http://svn.boost.org/trac/boost/ticket/11250 #11250] future made from make_exceptional fails on assertion in destructor
* [@http://svn.boost.org/trac/boost/ticket/11256 #11256] future<>::is_ready() == false in continuation function
* [@http://svn.boost.org/trac/boost/ticket/11158 #11158] Pthread thread deadlock when faketime used
* [@http://svn.boost.org/trac/boost/ticket/11261 #11261] bad use of scoped threads in basic_thread_pool
* [@http://svn.boost.org/trac/boost/ticket/11262 #11262] bad use of direct pointer in shared_state_nullary_task
* [@http://svn.boost.org/trac/boost/ticket/11263 #11263] lock already locked lock
* [@http://svn.boost.org/trac/boost/ticket/11266 #11266] boost::packaged_task has invalid variadic signature
* [@http://svn.boost.org/trac/boost/ticket/11302 #11302] boost thread doesn't build with BOOST_THREAD_PATCH.
* [@http://svn.boost.org/trac/boost/ticket/11322 #11322] sleep_for() nanoseconds overload will always return too early on windows
* [@http://svn.boost.org/trac/boost/ticket/11329 #11329] using declarative for GetProcessHeap, .... fails
* [@http://svn.boost.org/trac/boost/ticket/11368 #11368] boost thread's usage of CreateWaitableTimer wakes PC from sleep (doh)
* [@http://svn.boost.org/trac/boost/ticket/11377 #11377] Boost condition variable always waits for system clock deadline
* [@http://svn.boost.org/trac/boost/ticket/11435 #11435] gcc compiler warning in future.hpp
* [@http://svn.boost.org/trac/boost/ticket/11555 #11555] devector.hpp assumes allocator_traits_type is always present
* [@http://svn.boost.org/trac/boost/ticket/11562 #11562] Timer (using steady_clock) expires after computer time is set forward on Ubuntu 64-bit
* [@http://svn.boost.org/trac/boost/ticket/11672 #11672] Thread: Should use unique_ptr, not auto_ptr
* [@http://svn.boost.org/trac/boost/ticket/11688 #11688] thread::try_join_until: Avoid busy wait if system clock changes
* [@http://svn.boost.org/trac/boost/ticket/11672 #11716] ::then(f) should inherit the parent Executor
[heading Version 4.5.0 - boost 1.58]

View File

@@ -29,9 +29,11 @@
enum class launch
{
none = unspecified,
async = unspecified,
deferred = unspecified,
executor = unspecified,
inherit = unspecified,
any = async | deferred
};
@@ -165,14 +167,27 @@
enum class launch
{
none = unspecified,
async = unspecified,
deferred = unspecified,
executor = unspecified,
inherit = unspecified,
any = async | deferred
};
The enum type launch is a bitmask type with launch::async and launch::deferred denoting individual bits.
A future created with `promise<>` or with a `packaged_task<>` or with `make_ready_future`/`make_exceptional_future` (has no associated launch policy), has an implicit a launch policy of `launch::none`.
A future created by `async(launch::async, ...)` or `::then(launch::async, ...)` has associated a launch policy `launch::async`.
A future created by `async(launch::deferred, ...)` or `::then(launch::deferred, ...)` has associated a launch policy `launch::deferred`.
A future created by `async(Executor, ...)` or `::then(Executor, ...)` or `::then(launch::executor, ...)` has associated a launch policy `launch::executor`.
A future created by `async(...)` or `::then(...)` has associated a launch policy `launch::none`.
A future created by `::then(launch::inherit, ...)` has associated a launch policy parent future.
The `executor` and the `inherit` launch policies have a sense only can be user only on `then()`.
[endsect]
[///////////////////////////////////////////////////////////////////////////]
[section:is_error_code_enum Specialization `is_error_code_enum<future_errc>`]
@@ -902,25 +917,37 @@ future object as a parameter. The second function takes a executor as the first
the second parameter. The third function takes a launch policy as the first parameter and a callable object as the
second parameter.]]
[[Requires:] [`INVOKE(DECAY_COPY (std::forward<F>(func)), std::move(*this))` shall be a valid expression.]]
[[Effects:] [
All the functions create a shared state that is associated with the returned future object. The further behavior of the functions is as follows.
All the functions create a shared state that is associated with the returned future object. Additionally,
- The continuation is called when the object's shared state is ready (has a value or exception stored).
- When the object's shared state is ready, the continuation
`INVOKE(DECAY_COPY(std::forward<F>(func)), std::move(*this))` is called depending on the overload (see below) with the call to DECAY_COPY() being evaluated in the thread that called then.
- The continuation launches according to the specified policy or executor.
- Any value returned from the continuation is stored as the result in the shared state of the resulting `future`.
Any exception propagated from the execution of the continuation is stored as the exceptional result in the shared state of the resulting `future`.
- When the executor or launch policy is not provided the continuation inherits the
parent's launch policy or executor.
- Any value returned from the continuation is stored as the result in the shared state of the resulting `future`. Any exception propagated from the execution of the continuation is stored as the exceptional result in the shared state of the resulting `future`.
The continuation launches according to the specified policy or executor or noting.
- If the parent was created with `promise<>` or with a `packaged_task<>` (has no associated launch policy), the
continuation behaves the same as the third overload with a policy argument of `launch::async | launch::deferred` and
the same argument for `func`.
- When the launch policy is `launch::none` the continuation is called on an unspecified thread of execution.
- If the parent has a policy of `launch::deferred` and the continuation does not have a specified launch policy or
scheduler, then the parent is filled by immediately calling `.wait()`, and the policy of the antecedent is
- When the launch policy is `launch::async` the continuation is called on a new thread of execution.
- When the launch policy is `launch::deferred` the continuation is called on demand.
- When the launch policy is `launch::executor` the continuation is called on one of the thread of execution of the executor.
- When the launch policy is `launch::inherit` the continuation inherits the parent's launch policy or executor.
- When the executor or launch policy is not provided (first overload) is if as if launch::none was specified.
- When the executor is provided (second overload) the continuation is called on one of the thread of execution of the executor.
- If the parent has a policy of `launch::deferred` and the continuation does not have a specified launch policy
executor, then the parent is filled by immediately calling `.wait()`, and the policy of the antecedent is
`launch::deferred`.
]]
@@ -1380,22 +1407,38 @@ shared_future object as a parameter. The second function takes an executor as th
the second parameter. The third function takes a launch policy as the first parameter and a callable object as the
second parameter.]]
[[Requires:] [`INVOKE(DECAY_COPY (std::forward<F>(func)), *this)` shall be a valid expression.]]
[[Effects:] [
- The continuation is called when the object's shared state is ready (has a value or exception stored).
All the functions create a shared state that is associated with the returned future object. Additionally,
- The continuation launches according to the specified policy or executor.
- When the object's shared state is ready, the continuation
`INVOKE(DECAY_COPY(std::forward<F>(func)), *this)` is called depending on the overload (see below) with the call to DECAY_COPY() being evaluated in the thread that called then.
- When the executor or launch policy is not provided the continuation inherits the
parent's launch policy or executor.
- Any value returned from the continuation is stored as the result in the shared state of the resulting `future`.
Any exception propagated from the execution of the continuation is stored as the exceptional result in the shared state of the resulting `future`.
- If the parent was created with `promise` or with a `packaged_task` (has no associated launch policy), the
continuation behaves the same as the third overload with a policy argument of `launch::async | launch::deferred` and
the same argument for func.
- If the parent has a policy of `launch::deferred` and the continuation does not have a specified launch policy or
The continuation launches according to the specified policy or executor or noting.
- When the launch policy is `launch::none` the continuation is called on an unspecified thread of execution.
- When the launch policy is `launch::async` the continuation is called on a new thread of execution.
- When the launch policy is `launch::deferred` the continuation is called on demand.
- When the launch policy is `launch::executor` the continuation is called on one of the thread of execution of the executor.
- When the launch policy is `launch::inherit` the continuation inherits the parent's launch policy or executor.
- When the executor or launch policy is not provided (first overload) is if as if launch::none was specified.
- When the executor is provided (second overload) the continuation is called on one of the thread of execution of the executor.
- If the parent has a policy of `launch::deferred` and the continuation does not have a specified launch policy
executor, then the parent is filled by immediately calling `.wait()`, and the policy of the antecedent is
`launch::deferred`
`launch::deferred`.
]]

View File

@@ -8,7 +8,7 @@
[library Thread
[quickbook 1.5]
[version 4.5.0]
[version 4.6.0]
[authors [Williams, Anthony] [Botet Escriba, Vicente J.]]
[copyright 2007-11 Anthony Williams]
[copyright 2011-15 Vicente J. Botet Escriba]

View File

@@ -18,7 +18,7 @@
#include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/loop_executor.hpp>
#include <boost/thread/executors/generic_serial_executor.hpp>
#include <boost/thread/executors/serial_executor.hpp>
#include <boost/thread/executors/inline_executor.hpp>
#include <boost/thread/executors/thread_executor.hpp>
#include <boost/thread/executors/executor.hpp>
@@ -73,7 +73,7 @@ void submit_some(boost::executor& tp)
}
void at_th_entry(boost::basic_thread_pool )
void at_th_entry(boost::basic_thread_pool& )
{
}
@@ -84,118 +84,66 @@ int test_executor_adaptor()
{
try
{
// {
// std::cout << BOOST_CONTEXTOF << std::endl;
// boost::basic_thread_pool e1;
// std::cout << BOOST_CONTEXTOF << std::endl;
// boost::basic_thread_pool e2 = e1;
// std::cout << BOOST_CONTEXTOF << std::endl;
// }
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::executor_adaptor < boost::basic_thread_pool > ea(4);
std::cout << BOOST_CONTEXTOF << std::endl;
submit_some( ea);
std::cout << BOOST_CONTEXTOF << std::endl;
}
#if 1
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::executor_adaptor < boost::basic_thread_pool > ea(4);
// fixme
// ERROR= tr1::bad_weak_ptr
// {
// boost::future<int> t1 = boost::async(ea, &f1);
// boost::future<int> t2 = boost::async(ea, &f1);
// std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
// }
std::cout << BOOST_CONTEXTOF << std::endl;
submit_some(ea);
// std::cout << BOOST_CONTEXTOF << std::endl;
// {
// boost::basic_thread_pool ea3(1);
// std::cout << BOOST_CONTEXTOF << std::endl;
// boost::future<int> t1 = boost::async(ea3, &f1);
// std::cout << BOOST_CONTEXTOF << std::endl;
// boost::future<int> t2 = boost::async(ea3, &f1);
// std::cout << BOOST_CONTEXTOF << std::endl;
// //boost::future<int> t2 = boost::async(ea3, f2, 1); // todo this doesn't compiles yet on C++11
// //boost::future<int> t2 = boost::async(ea3, boost::bind(f2, 1)); // todo this doesn't compiles yet on C++98
// std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
// }
{
boost::future<int> t1 = boost::async(ea, &f1);
boost::future<int> t2 = boost::async(ea, &f1);
std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
}
std::cout << BOOST_CONTEXTOF << std::endl;
submit_some(ea);
std::cout << BOOST_CONTEXTOF << std::endl;
}
{
boost::basic_thread_pool ea3(1);
std::cout << BOOST_CONTEXTOF << std::endl;
boost::future<int> t1 = boost::async(ea3, &f1);
std::cout << BOOST_CONTEXTOF << std::endl;
boost::future<int> t2 = boost::async(ea3, &f1);
std::cout << BOOST_CONTEXTOF << std::endl;
//boost::future<int> t2 = boost::async(ea3, f2, 1); // todo this doesn't compiles yet on C++11
//boost::future<int> t2 = boost::async(ea3, boost::bind(f2, 1)); // todo this doesn't compiles yet on C++98
std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
}
#endif
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::loop_executor e1;
boost::loop_executor e2 = e1;
boost::executor_adaptor < boost::loop_executor > ea2(e2);
submit_some( ea2);
ea2.underlying_executor().run_queued_closures();
std::cout << BOOST_CONTEXTOF << std::endl;
submit_some(ea);
std::cout << BOOST_CONTEXTOF << std::endl;
}
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::executor_adaptor < boost::loop_executor > ea2;
submit_some( ea2);
ea2.underlying_executor().run_queued_closures();
}
// std::cout << BOOST_CONTEXTOF << std::endl;
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool tp;
boost::generic_serial_executor e1(tp);
boost::generic_serial_executor e2 = e1;
}
{
boost::basic_thread_pool ea1(4);
boost::generic_serial_executor ea2(ea1);
boost::executor_adaptor < boost::generic_serial_executor > ea3(ea2);
submit_some(ea3);
}
{
boost::basic_thread_pool ea1(4);
boost::generic_serial_executor ea2(ea1);
boost::executor_adaptor < boost::generic_serial_executor > ea3(ea2);
submit_some(ea3);
}
//#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES)
{
boost::basic_thread_pool ea1(4);
boost::executor_adaptor < boost::generic_serial_executor > ea2(ea1);
submit_some(ea2);
}
//#endif
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::inline_executor e1;
boost::inline_executor e2 = e1;
boost::executor_adaptor < boost::inline_executor > ea2(e2);
boost::executor_adaptor < boost::basic_thread_pool > ea1(4);
boost::executor_adaptor < boost::serial_executor > ea2(ea1);
submit_some(ea2);
}
#endif
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::executor_adaptor < boost::inline_executor > ea1;
submit_some(ea1);
}
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::thread_executor e1;
boost::thread_executor e2 = e1;
}
{
boost::thread_executor e1;
boost::executor_adaptor < boost::generic_executor > ea2(e1);
submit_some(ea2);
}
{
boost::executor_adaptor < boost::thread_executor > ea1;
submit_some(ea1);
}
std::cout << BOOST_CONTEXTOF << std::endl;
#if 0
#if 1
// fixme
// ERROR= tr1::bad_weak_ptr
{
@@ -208,7 +156,7 @@ int test_executor_adaptor()
{
boost::async(&f1);
}
#if 0
#if 1
// fixme
// ERROR= tr1::bad_weak_ptr

View File

@@ -1,184 +0,0 @@
// Copyright (C) 2014 Vicente Botet
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif
#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS
//#define BOOST_THREAD_USES_LOG
#define BOOST_THREAD_USES_LOG_THREAD_ID
#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
#include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/loop_executor.hpp>
#include <boost/thread/executors/generic_serial_executor.hpp>
#include <boost/thread/executors/serial_executor.hpp>
#include <boost/thread/executors/inline_executor.hpp>
#include <boost/thread/executors/thread_executor.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/thread/executors/generic_executor.hpp>
#include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/assert.hpp>
#include <string>
#include <iostream>
void p1()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
//boost::this_thread::sleep_for(boost::chrono::milliseconds(200));
}
void p2()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
//boost::this_thread::sleep_for(boost::chrono::seconds(10));
}
int f1()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
return 1;
}
int f2(int i)
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(2));
return i + 1;
}
void submit_some(boost::generic_executor tp)
{
for (int i = 0; i < 3; ++i) {
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
tp.submit(&p1);
}
}
template < class Executor>
void submit_some2(Executor& tp)
{
for (int i = 0; i < 3; ++i) {
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
tp.submit(&p1);
}
}
template <class Executor>
void submit_some3(boost::serial_executor<Executor>& tp)
{
for (int i = 0; i < 3; ++i) {
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
tp.submit(&p1);
}
}
void at_th_entry(boost::basic_thread_pool)
{
}
int test_generic_executor()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
{
try
{
{
boost::basic_thread_pool ea(4);
submit_some( ea);
// {
// boost::future<int> t1 = boost::async(ea, &f1);
// boost::future<int> t2 = boost::async(ea, &f1);
// // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// // std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
// }
submit_some(ea);
// {
// boost::basic_thread_pool ea3(1);
// boost::future<int> t1 = boost::async(ea3, &f1);
// boost::future<int> t2 = boost::async(ea3, &f1);
// //boost::future<int> t2 = boost::async(ea3, f2, 1); // todo this doesn't compiles yet on C++11
// //boost::future<int> t2 = boost::async(ea3, boost::bind(f2, 1)); // todo this doesn't compiles yet on C++98
// // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// // std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
// }
submit_some(ea);
}
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::loop_executor ea2;
submit_some( ea2);
ea2.run_queued_closures();
}
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
// // std::cout << BOOST_CONTEXTOF << std::endl;
// {
// boost::basic_thread_pool ea1(4);
// boost::generic_serial_executor ea2(ea1);
// submit_some(ea2);
// }
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea1(4);
boost::serial_executor<boost::basic_thread_pool> ea2(ea1);
submit_some3(ea2);
}
#endif
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::inline_executor ea1;
submit_some(ea1);
}
// std::cout << BOOST_CONTEXTOF << std::endl;
{
//boost::thread_executor ea1;
//submit_some(ea1);
}
// std::cout << BOOST_CONTEXTOF << std::endl;
// {
// boost::basic_thread_pool ea(4, at_th_entry);
// boost::future<int> t1 = boost::async(ea, &f1);
// // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// }
}
catch (std::exception& ex)
{
std::cout << "ERROR= " << ex.what() << "" << std::endl;
return 1;
}
catch (...)
{
std::cout << " ERROR= exception thrown" << std::endl;
return 2;
}
}
// std::cout << BOOST_CONTEXTOF << std::endl;
return 0;
}
int main()
{
return test_generic_executor();
}

View File

@@ -17,12 +17,11 @@
#include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/loop_executor.hpp>
#include <boost/thread/executors/generic_serial_executor.hpp>
#include <boost/thread/executors/serial_executor.hpp>
#include <boost/thread/executors/inline_executor.hpp>
#include <boost/thread/executors/thread_executor.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/thread/executors/generic_executor_ref.hpp>
#include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/assert.hpp>
@@ -65,7 +64,7 @@ void submit_some(boost::generic_executor_ref tp)
}
void at_th_entry(boost::basic_thread_pool)
void at_th_entry(boost::basic_thread_pool& )
{
}
@@ -81,22 +80,22 @@ int test_generic_executor_ref()
{
boost::basic_thread_pool ea(4);
submit_some( ea);
// {
// boost::future<int> t1 = boost::async(ea, &f1);
// boost::future<int> t2 = boost::async(ea, &f1);
// std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
// }
// submit_some(ea);
// {
// boost::basic_thread_pool ea3(1);
// boost::future<int> t1 = boost::async(ea3, &f1);
// boost::future<int> t2 = boost::async(ea3, &f1);
// //boost::future<int> t2 = boost::async(ea3, f2, 1); // todo this doesn't compiles yet on C++11
// //boost::future<int> t2 = boost::async(ea3, boost::bind(f2, 1)); // todo this doesn't compiles yet on C++98
// std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
// }
{
boost::future<int> t1 = boost::async(ea, &f1);
boost::future<int> t2 = boost::async(ea, &f1);
std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
}
submit_some(ea);
{
boost::basic_thread_pool ea3(1);
boost::future<int> t1 = boost::async(ea3, &f1);
boost::future<int> t2 = boost::async(ea3, &f1);
//boost::future<int> t2 = boost::async(ea3, f2, 1); // todo this doesn't compiles yet on C++11
//boost::future<int> t2 = boost::async(ea3, boost::bind(f2, 1)); // todo this doesn't compiles yet on C++98
std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl;
}
submit_some(ea);
}
std::cout << BOOST_CONTEXTOF << std::endl;
@@ -109,7 +108,7 @@ int test_generic_executor_ref()
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea1(4);
boost::generic_serial_executor ea2(ea1);
boost::serial_executor ea2(ea1);
submit_some(ea2);
}
#endif
@@ -123,18 +122,18 @@ int test_generic_executor_ref()
//boost::thread_executor ea1;
//submit_some(ea1);
}
// std::cout << BOOST_CONTEXTOF << std::endl;
// {
// boost::basic_thread_pool ea(4, at_th_entry);
// boost::future<int> t1 = boost::async(ea, &f1);
// std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
// }
// std::cout << BOOST_CONTEXTOF << std::endl;
// {
// boost::basic_thread_pool ea(4, at_th_entry);
// boost::async(ea, &f1);
// std::cout << BOOST_CONTEXTOF << std::endl;
// }
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea(4, at_th_entry);
boost::future<int> t1 = boost::async(ea, &f1);
std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl;
}
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea(4, at_th_entry);
boost::async(ea, &f1);
std::cout << BOOST_CONTEXTOF << std::endl;
}
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(200));
std::cout << BOOST_CONTEXTOF << std::endl;

View File

@@ -1,117 +0,0 @@
// Copyright (C) 2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif
#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS
//#define BOOST_THREAD_USES_LOG
#define BOOST_THREAD_USES_LOG_THREAD_ID
#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
#include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/generic_serial_executor.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/assert.hpp>
#include <string>
#include <iostream>
void p1()
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(30));
std::cout << BOOST_CONTEXTOF << std::endl;
}
void p2()
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(10));
std::cout << BOOST_CONTEXTOF << std::endl;
}
int f1()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
return 1;
}
int f2(int i)
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(2));
return i + 1;
}
void submit_some(boost::generic_serial_executor& tp)
{
//std::cout << BOOST_CONTEXTOF << std::endl;
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p1);
}
//std::cout << BOOST_CONTEXTOF << std::endl;
}
void at_th_entry(boost::basic_thread_pool )
{
}
int test_executor_adaptor()
{
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool tp;
boost::generic_serial_executor e1(tp);
boost::generic_serial_executor e2 = e1;
}
std::cout << BOOST_CONTEXTOF << std::endl;
{
try
{
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea1(4);
boost::generic_serial_executor ea2(ea1);
submit_some(ea2);
}
#endif
// std::cout << BOOST_CONTEXTOF << std::endl;
}
catch (std::exception& ex)
{
std::cout << "ERROR= " << ex.what() << "" << std::endl;
return 1;
}
catch (...)
{
std::cout << " ERROR= exception thrown" << std::endl;
return 2;
}
}
// std::cout << BOOST_CONTEXTOF << std::endl;
return 0;
}
int main()
{
return test_executor_adaptor();
}

View File

@@ -1,124 +0,0 @@
// Copyright (C) 2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif
#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS
//#define BOOST_THREAD_USES_LOG
#define BOOST_THREAD_USES_LOG_THREAD_ID
#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
#include <boost/thread/caller_context.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/generic_serial_executor_cont.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/assert.hpp>
#include <string>
#include <iostream>
void p1()
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(30));
std::cout << BOOST_CONTEXTOF << std::endl;
}
void p2()
{
std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(10));
std::cout << BOOST_CONTEXTOF << std::endl;
}
int f1()
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(1));
return 1;
}
int f2(int i)
{
// std::cout << BOOST_CONTEXTOF << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(2));
return i + 1;
}
void submit_some(boost::generic_serial_executor_cont& tp)
{
//std::cout << BOOST_CONTEXTOF << std::endl;
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p1);
}
//std::cout << BOOST_CONTEXTOF << std::endl;
}
void at_th_entry(boost::basic_thread_pool)
{
}
int test_executor_adaptor()
{
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool tp;
std::cout << BOOST_CONTEXTOF << std::endl;
boost::generic_serial_executor_cont e1(tp);
std::cout << BOOST_CONTEXTOF << std::endl;
boost::generic_serial_executor_cont e2 = e1;
std::cout << BOOST_CONTEXTOF << std::endl;
}
std::cout << BOOST_CONTEXTOF << std::endl;
{
try
{
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea1(4);
std::cout << BOOST_CONTEXTOF << std::endl;
boost::generic_serial_executor_cont ea2(ea1);
std::cout << BOOST_CONTEXTOF << std::endl;
submit_some(ea2);
std::cout << BOOST_CONTEXTOF << std::endl;
}
#endif
std::cout << BOOST_CONTEXTOF << std::endl;
}
catch (std::exception& ex)
{
std::cout << "ERROR= " << ex.what() << "" << std::endl;
return 1;
}
catch (...)
{
std::cout << " ERROR= exception thrown" << std::endl;
return 2;
}
}
// std::cout << BOOST_CONTEXTOF << std::endl;
return 0;
}
int main()
{
return test_executor_adaptor();
}

View File

@@ -51,8 +51,8 @@ int f2(int i)
boost::this_thread::sleep_for(boost::chrono::seconds(2));
return i + 1;
}
template <class Executor>
void submit_some(boost::serial_executor<Executor>& tp)
void submit_some(boost::serial_executor& tp)
{
for (int i = 0; i < 3; ++i) {
std::cout << BOOST_CONTEXTOF << std::endl;
@@ -80,8 +80,9 @@ int test_executor_adaptor()
#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
{
boost::basic_thread_pool ea1(4);
boost::serial_executor<boost::basic_thread_pool> ea2(ea1);
boost::serial_executor ea2(ea1);
submit_some(ea2);
boost::this_thread::sleep_for(boost::chrono::seconds(10));
}
#endif
}

View File

@@ -52,19 +52,18 @@ int f2(int i)
return i + 1;
}
template < class Executor>
void submit_some(boost::serial_executor_cont<Executor>& tp)
void submit_some(boost::serial_executor_cont& tp)
{
//std::cout << BOOST_CONTEXTOF << std::endl;
std::cout << BOOST_CONTEXTOF << std::endl;
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p2);
}
for (int i = 0; i < 3; ++i) {
//std::cout << BOOST_CONTEXTOF << std::endl;
std::cout << BOOST_CONTEXTOF << std::endl;
tp.submit(&p1);
}
//std::cout << BOOST_CONTEXTOF << std::endl;
std::cout << BOOST_CONTEXTOF << std::endl;
}
@@ -85,8 +84,9 @@ int test_executor_adaptor()
// std::cout << BOOST_CONTEXTOF << std::endl;
{
boost::basic_thread_pool ea1(4);
boost::serial_executor_cont<boost::basic_thread_pool> ea2(ea1);
boost::serial_executor_cont ea2(ea1);
submit_some(ea2);
boost::this_thread::sleep_for(boost::chrono::seconds(10));
}
#endif
// std::cout << BOOST_CONTEXTOF << std::endl;

View File

@@ -57,6 +57,5 @@ namespace boost
// 20.8.2.4, class template enable_shared_from_this:
// 20.8.2.5, shared_ptr atomic access:
// 20.8.2.6 hash support
#include <boost/thread/csbl/memory/shared_ptr.hpp>
#endif // header

View File

@@ -11,30 +11,17 @@
#include <boost/thread/csbl/memory/config.hpp>
//#if defined BOOST_NO_CXX11_SMART_PTR
#if 1
#if defined BOOST_NO_CXX11_SMART_PTR
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/enable_shared_from_this.hpp>
namespace boost
{
template<class T> bool atomic_compare_exchange_strong( shared_ptr<T> * p, shared_ptr<T> * v, shared_ptr<T> w )
{
return atomic_compare_exchange(p,v,w);
}
namespace csbl
{
using ::boost::shared_ptr;
using ::boost::make_shared;
using ::boost::static_pointer_cast;
using ::boost::dynamic_pointer_cast;
using ::boost::const_pointer_cast;
using ::boost::enable_shared_from_this;
using ::boost::atomic_load;
using ::boost::atomic_compare_exchange_strong;
}
}
@@ -48,12 +35,6 @@ namespace boost
{
using std::shared_ptr;
using std::make_shared;
using std::dynamic_pointer_cast;
using std::static_pointer_cast;
using std::const_pointer_cast;
using std::enable_shared_from_this;
using std::atomic_load;
using std::atomic_compare_exchange_strong;
}
}

View File

@@ -72,13 +72,21 @@ namespace boost
BOOST_SYMBOL_VISIBLE
invoker& operator=(BOOST_THREAD_RV_REF(invoker) f)
{
f_ = boost::move(BOOST_THREAD_RV(f).f_);
if (this != &f)
{
f_ = boost::move(BOOST_THREAD_RV(f).f_);
}
return *this;
}
BOOST_SYMBOL_VISIBLE
invoker& operator=( BOOST_THREAD_COPY_ASSIGN_REF(invoker) f)
{
f_ = f.f_;
if (this != &f)
{
f_ = f.f_;
}
return *this;
}
result_type operator()()

View File

@@ -838,7 +838,7 @@ namespace boost
void BOOST_THREAD_DECL add_thread_exit_function(thread_exit_function_base*);
struct shared_state_base;
#if defined(BOOST_THREAD_PLATFORM_WIN32)
inline void make_ready_at_thread_exit(csbl::shared_ptr<shared_state_base> as)
inline void make_ready_at_thread_exit(shared_ptr<shared_state_base> as)
{
detail::thread_data_base* const current_thread_data(detail::get_current_thread_data());
if(current_thread_data)
@@ -847,7 +847,7 @@ namespace boost
}
}
#else
void BOOST_THREAD_DECL make_ready_at_thread_exit(csbl::shared_ptr<shared_state_base> as);
void BOOST_THREAD_DECL make_ready_at_thread_exit(shared_ptr<shared_state_base> as);
#endif
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013-2015 Vicente J. Botet Escriba
// Copyright (C) 2013-2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -18,11 +18,6 @@
#include <boost/thread/executors/work.hpp>
#include <boost/thread/csbl/vector.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/smart_ptr/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
@@ -35,267 +30,126 @@ namespace executors
/// type-erasure to store the works to do
typedef executors::work work;
private:
typedef thread thread_t;
/// A move aware vector type
typedef csbl::vector<thread_t> thread_vector;
struct shared_state : enable_shared_from_this<shared_state> {
typedef executors::work work;
/// the kind of stored threads are scoped threads to ensure that the threads are joined.
/// A move aware vector type
//typedef scoped_thread<> thread_t;
typedef thread thread_t;
typedef csbl::vector<thread_t> thread_vector;
/// A move aware vector
thread_vector threads;
/// the thread safe work queue
concurrent::sync_queue<work > work_queue;
/// the thread safe work queue
concurrent::sync_queue<work > work_queue;
/// A move aware vector
thread_vector threads;
unsigned const thread_count;
boost::function<void(basic_thread_pool)> at_thread_entry;
friend class basic_thread_pool;
public:
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
public:
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
try
{
try
work task;
if (work_queue.try_pull(task) == queue_op_status::success)
{
work task;
if (work_queue.try_pull(task) == queue_op_status::success)
{
task();
return true;
}
return false;
task();
return true;
}
catch (...)
return false;
}
catch (...)
{
std::terminate();
//return false;
}
}
/**
* Effects: schedule one task or yields
* Throws: whatever the current task constructor throws or the task() throws.
*/
void schedule_one_or_yield()
{
if ( ! try_executing_one())
{
std::terminate();
return false;
this_thread::yield();
}
}
/**
* Effects: schedule one task or yields
* Throws: whatever the current task constructor throws or the task() throws.
*/
void schedule_one_or_yield()
{
if ( ! try_executing_one())
{
this_thread::yield();
}
}
private:
/**
* The main loop of the worker threads
*/
void worker_thread()
{
// fixme: this call results on segmentation fault
//at_thread_entry(basic_thread_pool(this->shared_from_this()));
try
{
for(;;)
{
work task;
queue_op_status st = work_queue.wait_pull(task);
if (st == queue_op_status::closed) break;
task();
}
}
catch (...)
{
std::terminate();
return;
}
}
static void do_nothing_at_thread_entry(basic_thread_pool) {}
void init()
{
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
thread th (&shared_state::worker_thread, this);
threads.push_back(thread_t(boost::move(th)));
}
}
catch (...)
{
close();
throw;
}
}
public:
/// basic_thread_pool is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
shared_state(unsigned const thread_count = thread::hardware_concurrency()+1)
: thread_count(thread_count),
at_thread_entry(do_nothing_at_thread_entry)
{
}
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads
* and executes the at_thread_entry function at the entry of each created thread. .
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <class AtThreadEntry>
shared_state( unsigned const thread_count, AtThreadEntry& at_thread_entry)
: thread_count(thread_count),
at_thread_entry(at_thread_entry)
{
}
#endif
shared_state( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool))
: thread_count(thread_count),
at_thread_entry(at_thread_entry)
{
}
template <class AtThreadEntry>
shared_state( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
: thread_count(thread_count),
at_thread_entry(boost::move(at_thread_entry))
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
*/
~shared_state()
{
// signal to all the worker threads that there will be no more submissions.
close();
join();
// joins all the threads as the threads were scoped_threads
}
/**
* \b Effects: join all the threads.
*/
void join()
{
for (unsigned i = 0; i < threads.size(); ++i)
{
if (this_thread::get_id() == threads[i].get_id()) continue;
threads[i].join();
}
}
/**
* \b Effects: close the \c basic_thread_pool for submissions.
* The worker threads will work until there is no more closures to run.
*/
void close()
{
work_queue.close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return work_queue.closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
void submit(BOOST_THREAD_RV_REF(work) closure) {
work_queue.push(boost::move(closure));
}
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
//work_queue.push(work(closure));
submit(work(closure));
}
#endif
void submit(void (*closure)())
{
//work_queue.push(work(closure));
submit(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
//work_queue.push(work(boost::forward<Closure>(closure)));
work w((boost::forward<Closure>(closure)));
submit(boost::move(w));
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
do {
if ( ! try_executing_one())
{
return false;
}
} while (! pred());
return true;
}
};
}
private:
/**
* \b Effects: creates a thread pool with this shared state.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
* The main loop of the worker threads
*/
friend struct shared_state;
basic_thread_pool(shared_ptr<shared_state> ptr)
: pimpl(ptr)
void worker_thread()
{
try
{
for(;;)
{
work task;
queue_op_status st = work_queue.wait_pull(task);
if (st == queue_op_status::closed) {
return;
}
task();
}
}
catch (...)
{
std::terminate();
return;
}
}
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <class AtThreadEntry>
void worker_thread1(AtThreadEntry& at_thread_entry)
{
at_thread_entry(*this);
worker_thread();
}
#endif
void worker_thread2(void(*at_thread_entry)(basic_thread_pool&))
{
at_thread_entry(*this);
worker_thread();
}
template <class AtThreadEntry>
void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
{
at_thread_entry(*this);
worker_thread();
}
static void do_nothing_at_thread_entry(basic_thread_pool&) {}
public:
//basic_thread_pool(basic_thread_pool const&)=default;
//basic_thread_pool(basic_thread_pool &&)=default;
/// basic_thread_pool is not copyable.
BOOST_THREAD_NO_COPYABLE(basic_thread_pool)
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
: pimpl(make_shared<shared_state>(thread_count))
{
pimpl->init();
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
#if 1
thread th (&basic_thread_pool::worker_thread, this);
threads.push_back(thread_t(boost::move(th)));
#else
threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
#endif
}
}
catch (...)
{
close();
throw;
}
}
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads
* and executes the at_thread_entry function at the entry of each created thread. .
@@ -305,24 +159,61 @@ namespace executors
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry)
: pimpl(make_shared<shared_state>(thread_count, at_thread_entry))
{
pimpl->init();
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry);
threads.push_back(thread_t(boost::move(th)));
//threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
}
}
catch (...)
{
close();
throw;
}
}
#endif
basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool))
: pimpl(make_shared<shared_state>(thread_count, at_thread_entry))
basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&))
{
pimpl->init();
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry);
threads.push_back(thread_t(boost::move(th)));
//threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
}
}
catch (...)
{
close();
throw;
}
}
template <class AtThreadEntry>
basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
: pimpl(make_shared<shared_state>(thread_count, boost::forward<AtThreadEntry>(at_thread_entry)))
{
pimpl->init();
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry));
threads.push_back(thread_t(boost::move(th)));
//threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
}
}
catch (...)
{
close();
throw;
}
}
/**
* \b Effects: Destroys the thread pool.
*
@@ -330,16 +221,10 @@ namespace executors
*/
~basic_thread_pool()
{
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return pimpl->try_executing_one();
// signal to all the worker threads that there will be no more submissions.
close();
// joins all the threads before destroying the thread pool resources (e.g. the queue).
join();
}
/**
@@ -347,7 +232,10 @@ namespace executors
*/
void join()
{
pimpl->join();
for (unsigned i = 0; i < threads.size(); ++i)
{
threads[i].join();
}
}
/**
@@ -356,7 +244,7 @@ namespace executors
*/
void close()
{
pimpl->close();
work_queue.close();
}
/**
@@ -364,7 +252,7 @@ namespace executors
*/
bool closed()
{
return pimpl->closed();
return work_queue.closed();
}
/**
@@ -378,26 +266,28 @@ namespace executors
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
// void submit(BOOST_THREAD_RV_REF(work) closure) {
// work_queue.push(boost::move(closure));
// }
void submit(BOOST_THREAD_RV_REF(work) closure) {
work_queue.push(boost::move(closure));
}
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
submit(work(closure));
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
submit(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
//submit(work(boost::forward<Closure>(closure)));
work w((boost::forward<Closure>(closure)));
submit(boost::move(w));
}
/**
@@ -408,16 +298,15 @@ namespace executors
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return pimpl->reschedule_until(pred);
do {
if ( ! try_executing_one())
{
return false;
}
} while (! pred());
return true;
}
void schedule_one_or_yield()
{
return pimpl->schedule_one_or_yield();
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::basic_thread_pool;

View File

@@ -24,6 +24,7 @@ namespace detail
class priority_executor_base
{
public:
//typedef boost::function<void()> work;
typedef executors::work_pq work;
protected:
typedef Queue queue_type;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013,2015 Vicente J. Botet Escriba
// Copyright (C) 2013,2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -27,6 +27,8 @@ namespace boost
/// type-erasure to store the works to do
typedef executors::work work;
/// executor is not copyable.
BOOST_THREAD_NO_COPYABLE(executor)
executor() {}
/**
@@ -127,6 +129,7 @@ namespace boost
bool reschedule_until(Pred const& pred)
{
do {
//schedule_one_or_yield();
if ( ! try_executing_one())
{
return false;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013,2015 Vicente J. Botet Escriba
// Copyright (C) 2013,2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -12,7 +12,6 @@
#include <boost/thread/detail/config.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -31,52 +30,21 @@ namespace executors
/// type-erasure to store the works to do
typedef executor::work work;
/// executor is not copyable.
BOOST_THREAD_NO_COPYABLE(executor_adaptor)
/**
* executor_adaptor constructor
*/
//executor_adaptor(executor_adaptor const&) = default;
//executor_adaptor(executor_adaptor &&) = default;
BOOST_THREAD_COPYABLE_AND_MOVABLE(executor_adaptor)
executor_adaptor(executor_adaptor const& x) : ex(x.ex) {}
executor_adaptor(BOOST_THREAD_RV_REF(executor_adaptor) x) : ex(boost::move(x.ex)) {}
executor_adaptor& operator=(BOOST_THREAD_COPY_ASSIGN_REF(executor_adaptor) x)
{
if (this != &ex)
{
ex = x.ex;
}
return *this;
}
executor_adaptor& operator=(BOOST_THREAD_RV_REF(executor_adaptor) x)
{
if (this != &ex)
{
ex = boost::move(x.ex);
}
return *this;
}
executor_adaptor(Executor const& ex) : ex(ex) {}
executor_adaptor(BOOST_THREAD_RV_REF(Executor) ex) : ex(boost::move(ex)) {}
executor_adaptor() : ex() {}
#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES)
template <typename Arg, typename ...Args>
executor_adaptor(BOOST_THREAD_FWD_REF(Arg) arg, BOOST_THREAD_FWD_REF(Args) ... args
, typename disable_if_c<
is_same<typename decay<Arg>::type, executor_adaptor>::value
||
is_same<typename decay<Arg>::type, Executor>::value
, int* >::type=0
)
: ex(boost::forward<Arg>(arg), boost::forward<Args>(args)...) {}
template <typename ...Args>
executor_adaptor(BOOST_THREAD_RV_REF(Args) ... args) : ex(boost::forward<Args>(args)...) {}
#else
/**
* executor_adaptor constructor
*/
executor_adaptor() : ex() {}
template <typename A1>
executor_adaptor(
BOOST_THREAD_FWD_REF(A1) a1
@@ -161,9 +129,6 @@ namespace executors
};
}
using executors::executor_adaptor;
BOOST_THREAD_DCL_MOVABLE_BEG(T) executor_adaptor<T> BOOST_THREAD_DCL_MOVABLE_END
}
#include <boost/config/abi_suffix.hpp>

View File

@@ -1,150 +0,0 @@
// Copyright (C) 2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_THREAD_EXECUTORS_GENERIC_EXECUTOR_HPP
#define BOOST_THREAD_EXECUTORS_GENERIC_EXECUTOR_HPP
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/executors/executor_adaptor.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/type_traits/decay.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
{
namespace executors
{
class generic_executor
{
shared_ptr<executor> ex;
public:
/// type-erasure to store the works to do
typedef executors::work work;
//generic_executor(generic_executor const&) = default;
//generic_executor(generic_executor &&) = default;
template<typename Executor>
generic_executor(Executor const& ex
, typename boost::disable_if<is_same<Executor, generic_executor>,
int* >::type = (int*)0
)
//: ex(make_shared<executor_adaptor<Executor> >(ex)) // todo check why this doesn't work with C++03
: ex( new executor_adaptor<Executor>(ex) )
{
}
//generic_executor(generic_executor const& other) noexcept {}
//generic_executor& operator=(generic_executor const& other) noexcept {}
/**
* \par Effects
* Close the \c executor for submissions.
* The worker threads will work until there is no more closures to run.
*/
void close() { ex->close(); }
/**
* \par Returns
* Whether the pool is closed for submissions.
*/
bool closed() { return ex->closed(); }
void submit(BOOST_THREAD_RV_REF(work) closure)
{
ex->submit(boost::forward<work>(closure));
}
/**
* \par Requires
* \c Closure is a model of Callable(void()) and a model of CopyConstructible/MoveConstructible.
*
* \par Effects
* The specified closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the thread pool will call std::terminate, as is the case with threads.
*
* \par Synchronization
* Completion of closure on a particular thread happens before destruction of thread's thread local variables.
*
* \par Throws
* \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
work w ((closure));
submit(boost::move(w));
}
#endif
void submit(void (*closure)())
{
work w ((closure));
submit(boost::move(w));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
work w = boost::move(closure);
submit(boost::move(w));
}
// size_t num_pending_closures() const
// {
// return ex->num_pending_closures();
// }
/**
* \par Effects
* Try to execute one task.
*
* \par Returns
* Whether a task has been executed.
*
* \par Throws
* Whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one() { return ex->try_executing_one(); }
/**
* \par Requires
* This must be called from an scheduled task.
*
* \par Effects
* reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
do {
//schedule_one_or_yield();
if ( ! try_executing_one())
{
return false;
}
} while (! pred());
return true;
}
};
}
using executors::generic_executor;
}
#include <boost/config/abi_suffix.hpp>
#endif

View File

@@ -13,8 +13,7 @@
#include <boost/thread/detail/move.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -100,8 +99,8 @@ namespace boost
template<typename Executor>
generic_executor_ref(Executor& ex)
//: ex(make_shared<executor_ref<typename decay<Executor>::type> >(ex)) // todo check why this doesn't work with C++03
: ex( new executor_ref<typename decay<Executor>::type>(ex) )
//: ex(make_shared<executor_ref<Executor> >(ex)) // todo check why this doesn't works with C++03
: ex( new executor_ref<Executor>(ex) )
{
}

View File

@@ -1,295 +0,0 @@
// Copyright (C) 2013,2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// 2013/11 Vicente J. Botet Escriba
// first implementation of a simple serial scheduler.
#ifndef BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_HPP
#define BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_HPP
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/type_traits/is_same.hpp>
#include <boost/type_traits/decay.hpp>
#include <boost/thread/caller_context.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
{
namespace executors
{
class generic_serial_executor
{
public:
/// type-erasure to store the works to do
typedef executors::work work;
private:
struct shared_state {
typedef executors::work work;
typedef thread thread_t;
//typedef scoped_thread<> thread_t;
/// the thread safe work queue
concurrent::sync_queue<work > work_queue;
generic_executor ex;
thread_t thr;
struct try_executing_one_task {
work& task;
boost::promise<void> &p;
try_executing_one_task(work& task, boost::promise<void> &p)
: task(task), p(p) {}
void operator()() {
try {
task();
p.set_value();
} catch (...)
{
p.set_exception(current_exception());
}
}
};
public:
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor& underlying_executor() BOOST_NOEXCEPT { return ex; }
private:
/**
* The main loop of the worker thread
*/
void worker_thread()
{
try
{
for(;;)
{
work task;
queue_op_status st = work_queue.wait_pull(task);
if (st == queue_op_status::closed) return;
boost::promise<void> p;
try_executing_one_task tmp(task,p);
ex.submit(tmp);
p.get_future().wait();
}
}
catch (...)
{
std::terminate();
return;
}
}
public:
/// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
template <class Executor>
shared_state(Executor const& ex)
: ex(ex), thr(&shared_state::worker_thread, this)
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c shared_state destructor.
*/
~shared_state()
{
// signal to the worker thread that there will be no more submissions.
close();
join();
}
/**
* \b Effects: join all the threads.
*/
void join()
{
if (this_thread::get_id() == thr.get_id()) return;
thr.join();
}
/**
* \b Effects: close the \c generic_serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
work_queue.close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return work_queue.closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c generic_serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
work_queue.push(work(closure));
}
#endif
void submit(void (*closure)())
{
work_queue.push(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
work_queue.push(work(boost::forward<Closure>(closure)));
}
};
public:
// generic_serial_executor(generic_serial_executor const&) = default;
// generic_serial_executor(generic_serial_executor &&) = default;
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
template <class Executor>
generic_serial_executor(Executor const& ex
, typename boost::disable_if<is_same<Executor, generic_serial_executor>,
int* >::type = (int*)0)
: pimpl(make_shared<shared_state>(ex))
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c generic_serial_executor destructor.
*/
~generic_serial_executor()
{
}
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor& underlying_executor() BOOST_NOEXCEPT
{
return pimpl->underlying_executor();
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
{
return false;
}
/**
* \b Effects: close the \c generic_serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c generic_serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return false;
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::generic_serial_executor;
}
#include <boost/config/abi_suffix.hpp>
#endif

View File

@@ -1,265 +0,0 @@
// Copyright (C) 2015 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// 2013/11 Vicente J. Botet Escriba
// first implementation of a simple serial scheduler.
#ifndef BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_CONT_HPP
#define BOOST_THREAD_GENERIC_SERIAL_EXECUTOR_CONT_HPP
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor.hpp>
#include <boost/thread/future.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/type_traits/is_same.hpp>
#include <boost/type_traits/decay.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
{
namespace executors
{
class generic_serial_executor_cont
{
public:
/// type-erasure to store the works to do
typedef executors::work work;
private:
struct shared_state {
typedef executors::work work;
generic_executor ex_;
future<void> fut_; // protected by mtx_
bool closed_; // protected by mtx_
mutex mtx_;
struct continuation {
work task;
template <class X>
struct result {
typedef void type;
};
continuation(BOOST_THREAD_RV_REF(work) tsk)
: task(boost::move(tsk)) {}
void operator()(future<void> f)
{
try {
task();
} catch (...) {
std::terminate();
}
}
};
bool closed(lock_guard<mutex>&) const
{
return closed_;
}
public:
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor& underlying_executor() BOOST_NOEXCEPT { return ex_; }
/// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*
* \b Notes:
* * The lifetime of the associated executor must outlive the serial executor.
* * The current implementation doesn't support submission from synchronous continuation, that is,
* - the executor must execute the continuation asynchronously or
* - the continuation can not submit to this serial executor.
*/
template <class Executor>
shared_state(Executor const& ex)
: ex_(ex), fut_(make_ready_future()), closed_(false)
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c generic_serial_executor_cont destructor.
*/
~shared_state()
{
// signal to the worker thread that there will be no more submissions.
close();
}
/**
* \b Effects: close the \c generic_serial_executor_cont for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
lock_guard<mutex> lk(mtx_);
closed_ = true;;
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish.
* If the invoked closure throws an exception the \c generic_serial_executor_cont will call \c std::terminate, as is the case with threads.
*
* \b Throws: \c sync_queue_is_closed if the executor is closed.
* Whatever exception that can be throw while storing the closure.
*
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure)));
}
#endif
void submit(void (*closure)())
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure)));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(boost::forward<Closure>(closure))));
}
};
public:
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
template <class Executor>
generic_serial_executor_cont(Executor const& ex
, typename boost::disable_if<is_same<Executor, generic_serial_executor_cont>,
int* >::type = (int*)0)
: pimpl(make_shared<shared_state>(ex))
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor.
*/
~generic_serial_executor_cont()
{
}
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor& underlying_executor() BOOST_NOEXCEPT
{
return pimpl->underlying_executor();
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
{
return false;
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return false;
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::generic_serial_executor_cont;
}
#include <boost/config/abi_suffix.hpp>
#endif

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2014-2015 Vicente J. Botet Escriba
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -14,9 +14,6 @@
#include <boost/thread/detail/move.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
@@ -28,227 +25,142 @@ namespace executors
public:
/// type-erasure to store the works to do
typedef executors::work work;
private:
bool closed_;
mutable mutex mtx_;
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return false;
}
struct shared_state {
typedef executors::work work;
bool closed_;
mutable mutex mtx_;
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return false;
}
/// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
shared_state()
: closed_(false)
{
}
/**
* \b Effects: Destroys the inline executor.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c inline_executor destructor.
*/
~shared_state()
{
// signal to all the worker thread that there will be no more submissions.
close();
}
/**
* \b Effects: close the \c inline_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
lock_guard<mutex> lk(mtx_);
closed_ = true;
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed(lock_guard<mutex>& ) const
{
return closed_;
}
bool closed() const
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}
#endif
void submit(void (*closure)())
{
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& )
{
return false;
}
};
public:
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
inline_executor()
: pimpl(make_shared<shared_state>())
{
}
/// inline_executor is not copyable.
BOOST_THREAD_NO_COPYABLE(inline_executor)
/**
* \b Effects: close the \c inline_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
inline_executor()
: closed_(false)
{
}
/**
* \b Effects: Destroys the inline executor.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c inline_executor destructor.
*/
~inline_executor()
{
// signal to all the worker thread that there will be no more submissions.
close();
}
/**
* \b Returns: whether the executor is closed for submissions.
*/
bool closed() const
{
return pimpl->closed();
}
/**
* \b Effects: close the \c inline_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
lock_guard<mutex> lk(mtx_);
closed_ = true;
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed(lock_guard<mutex>& )
{
return closed_;
}
bool closed()
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
template <typename Closure>
void submit(Closure & closure)
{
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
}
void submit(void (*closure)())
{
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return pimpl->try_executing_one();
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& )
{
return false;
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& p)
{
return pimpl->reschedule_until(p);
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::inline_executor;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013-2015 Vicente J. Botet Escriba
// Copyright (C) 2013,2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -17,9 +17,6 @@
#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
@@ -33,178 +30,59 @@ namespace executors
/// type-erasure to store the works to do
typedef executors::work work;
private:
/// the thread safe work queue
concurrent::sync_queue<work > work_queue;
struct shared_state {
typedef executors::work work;
/// the thread safe work queue
concurrent::sync_queue<work > work_queue;
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
work task;
try
{
if (work_queue.try_pull(task) == queue_op_status::success)
{
task();
return true;
}
return false;
}
catch (...)
{
std::terminate();
return false;
}
}
/**
* Effects: schedule one task or yields
* Throws: whatever the current task constructor throws or the task() throws.
*/
void schedule_one_or_yield()
{
if ( ! try_executing_one())
{
this_thread::yield();
}
}
/// loop_executor is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
shared_state()
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor.
*/
~shared_state()
{
// signal to all the worker thread that there will be no more submissions.
close();
}
/**
* The main loop of the worker thread
*/
void loop()
{
while (!closed())
{
schedule_one_or_yield();
}
while (try_executing_one())
{
}
}
/**
* \b Effects: close the \c loop_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
work_queue.close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return work_queue.closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c loop_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
void submit(BOOST_THREAD_RV_REF(work) closure) {
work_queue.push(boost::move(closure));
}
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
//work_queue.push(work(closure));
submit(work(closure));
}
#endif
void submit(void (*closure)())
{
//work_queue.push(work(closure));
submit(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
//work_queue.push(work(boost::forward<Closure>(closure)));
work w((boost::forward<Closure>(closure)));
submit(boost::move(w));
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
do {
if ( ! try_executing_one())
{
return false;
}
} while (! pred());
return true;
}
/**
* run queued closures
*/
void run_queued_closures()
{
sync_queue<work>::underlying_queue_type q = work_queue.underlying_queue();
while (! q.empty())
{
work& task = q.front();
task();
q.pop_front();
}
}
};
public:
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
work task;
try
{
if (work_queue.try_pull(task) == queue_op_status::success)
{
task();
return true;
}
return false;
}
catch (...)
{
std::terminate();
//return false;
}
}
private:
/**
* Effects: schedule one task or yields
* Throws: whatever the current task constructor throws or the task() throws.
*/
void schedule_one_or_yield()
{
if ( ! try_executing_one())
{
this_thread::yield();
}
}
public:
/// loop_executor is not copyable.
BOOST_THREAD_NO_COPYABLE(loop_executor)
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
loop_executor()
: pimpl(make_shared<shared_state>())
{
}
/**
@@ -214,33 +92,22 @@ namespace executors
*/
~loop_executor()
{
// signal to all the worker thread that there will be no more submissions.
close();
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return pimpl->try_executing_one();
}
// /**
// * Effects: schedule one task or yields
// * Throws: whatever the current task constructor throws or the task() throws.
// */
// void schedule_one_or_yield()
// {
// return pimpl->schedule_one_or_yield();
// }
/**
* The main loop of the worker thread
*/
void loop()
{
pimpl->loop();
while (!closed())
{
schedule_one_or_yield();
}
while (try_executing_one())
{
}
}
/**
@@ -249,7 +116,7 @@ namespace executors
*/
void close()
{
pimpl->close();
work_queue.close();
}
/**
@@ -257,7 +124,7 @@ namespace executors
*/
bool closed()
{
return pimpl->closed();
return work_queue.closed();
}
/**
@@ -271,27 +138,29 @@ namespace executors
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
// void submit(BOOST_THREAD_RV_REF(work) closure) {
// work_queue.push(boost::move(closure));
// }
void submit(BOOST_THREAD_RV_REF(work) closure) {
work_queue.push(boost::move(closure));
}
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
}
submit(work(closure));
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
submit(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
//work_queue.push(work(boost::forward<Closure>(closure)));
work w((boost::forward<Closure>(closure)));
submit(boost::move(w));
}
/**
@@ -302,7 +171,13 @@ namespace executors
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return pimpl->reschedule_until(pred);
do {
if ( ! try_executing_one())
{
return false;
}
} while (! pred());
return true;
}
/**
@@ -310,10 +185,15 @@ namespace executors
*/
void run_queued_closures()
{
pimpl->run_queued_closures();
sync_queue<work>::underlying_queue_type q = work_queue.underlying_queue();
while (! q.empty())
{
work& task = q.front();
task();
q.pop_front();
}
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::loop_executor;

View File

@@ -9,123 +9,36 @@
#define BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP
#include <boost/thread/executors/detail/scheduled_executor_base.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/thread/csbl/vector.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
namespace boost
{
namespace executors
{
template <class Clock = chrono::steady_clock>
class scheduled_thread_pool
class scheduled_thread_pool : public detail::scheduled_executor_base<>
{
private:
struct shared_state : public detail::scheduled_executor_base<> {
/// basic_thread_pool is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
typedef detail::scheduled_executor_base<> super;
typedef typename super::work work;
typedef scoped_thread<> thread_t;
typedef csbl::vector<thread_t> thread_vector;
thread_vector threads;
shared_state(unsigned const thread_count = thread::hardware_concurrency()+1) : super()
{
try
{
threads.reserve(thread_count);
for (unsigned i = 0; i < thread_count; ++i)
{
#if 1
thread th (&shared_state::loop, this);
threads.push_back(thread_t(boost::move(th)));
#else
threads.push_back(thread_t(&shared_state::loop, this)); // do not compile
#endif
}
}
catch (...)
{
close();
throw;
}
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
*/
~shared_state()
{
this->close();
}
}; //end class
thread_group _workers;
public:
typedef typename shared_state::work work;
typedef Clock clock;
typedef typename clock::duration duration;
typedef typename clock::time_point time_point;
/**
* \b Effects: creates a thread pool that runs closures on \c thread_count threads.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
scheduled_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
: pimpl(make_shared<shared_state>(thread_count))
scheduled_thread_pool(size_t num_threads) : super()
{
for(size_t i = 0; i < num_threads; i++)
{
_workers.create_thread(bind(&super::loop, this));
}
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
*/
~scheduled_thread_pool()
{
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
this->close();
_workers.join_all();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
void submit_at(work w, const time_point& tp)
{
return pimpl->submit_at(boost::move(w), tp);
}
void submit_after(work w, const duration& d)
{
return pimpl->submit_after(boost::move(w), d);
}
private:
shared_ptr<shared_state> pimpl;
};
typedef detail::scheduled_executor_base<> super;
}; //end class
} //end executors namespace
using executors::scheduled_thread_pool;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2014-2015 Vicente J. Botet Escriba
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -13,8 +13,6 @@
#include <boost/chrono/time_point.hpp>
#include <boost/chrono/duration.hpp>
#include <boost/chrono/system_clocks.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -38,7 +36,7 @@ namespace boost
}
private:
Executor ex;
Executor& ex;
Function funct;
};
@@ -102,8 +100,8 @@ namespace boost
}
private:
Scheduler sch;
Executor ex;
Scheduler& sch;
Executor& ex;
typename clock::time_point tp;
bool is_closed;
};
@@ -152,8 +150,8 @@ namespace boost
}
private:
Scheduler sch;
Executor ex;
Scheduler& sch;
Executor& ex;
}; //end class
/// Wraps a reference to a @c Scheduler providing an @c Executor that
@@ -210,7 +208,7 @@ namespace boost
}
private:
Scheduler sch;
Scheduler& sch;
time_point tp;
bool is_closed;
}; //end class
@@ -219,71 +217,22 @@ namespace boost
/// It provides factory helper functions such as at/after that convert a @c Scheduler into an @c Executor
/// that submit the work at/after a specific time/duration respectively.
template <class Clock = chrono::steady_clock>
class scheduler
class scheduler : public detail::scheduled_executor_base<Clock>
{
private:
struct shared_state : public detail::scheduled_executor_base<Clock> {
typedef detail::scheduled_executor_base<Clock> super;
typedef typename super::work work;
thread thr;
/// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
shared_state()
: super(),
thr(&super::loop, this) {}
~shared_state()
{
this->close();
thr.join();
}
};
public:
typedef typename shared_state::work work;
typedef typename detail::scheduled_executor_base<Clock>::work work;
typedef Clock clock;
typedef typename clock::duration duration;
typedef typename clock::time_point time_point;
scheduler()
: pimpl(make_shared<shared_state>())
{}
: super(),
thr(&super::loop, this) {}
~scheduler()
{
this->close();
thr.join();
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
void submit_at(work w, const time_point& tp)
{
return pimpl->submit_at(boost::move(w), tp);
}
void submit_after(work w, const duration& d)
{
return pimpl->submit_after(boost::move(w), d);
}
template <class Ex>
scheduler_executor_wrapper<scheduler, Ex> on(Ex& ex)
{
@@ -301,10 +250,13 @@ namespace boost
{
return at_executor<scheduler>(*this, tp);
}
private:
shared_ptr<shared_state> pimpl;
typedef detail::scheduled_executor_base<Clock> super;
thread thr;
};
}
using executors::resubmitter;
using executors::resubmit;

View File

@@ -19,7 +19,7 @@ namespace executors
class scheduling_adpator : public detail::scheduled_executor_base<>
{
private:
Executor _exec;
Executor& _exec;
thread _scheduler;
public:

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2013,2015 Vicente J. Botet Escriba
// Copyright (C) 2013 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -14,179 +14,116 @@
#include <boost/thread/detail/move.hpp>
#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor.hpp>
#include <boost/thread/executors/generic_executor_ref.hpp>
#include <boost/thread/future.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
{
namespace executors
{
template <class Executor>
class serial_executor
{
public:
/// type-erasure to store the works to do
typedef executors::work work;
private:
typedef scoped_thread<> thread_t;
struct shared_state {
typedef executors::work work;
//typedef scoped_thread<> thread_t;
typedef thread thread_t;
/// the thread safe work queue
concurrent::sync_queue<work > work_queue;
generic_executor_ref ex;
thread_t thr;
/// the thread safe work queue
concurrent::sync_queue<work > work_queue;
Executor ex;
thread_t thr;
struct try_executing_one_task {
work& task;
boost::promise<void> &p;
try_executing_one_task(work& task, boost::promise<void> &p)
: task(task), p(p) {}
void operator()() {
try {
task();
p.set_value();
} catch (...)
{
p.set_exception(current_exception());
}
}
};
public:
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
Executor& underlying_executor() BOOST_NOEXCEPT { return ex; }
private:
/**
* The main loop of the worker thread
*/
void worker_thread()
{
try
struct try_executing_one_task {
work& task;
boost::promise<void> &p;
try_executing_one_task(work& task, boost::promise<void> &p)
: task(task), p(p) {}
void operator()() {
try {
task();
p.set_value();
} catch (...)
{
for(;;)
{
work task;
queue_op_status st = work_queue.wait_pull(task);
if (st == queue_op_status::closed) return;
boost::promise<void> p;
try_executing_one_task tmp(task,p);
ex.submit(tmp);
p.get_future().wait();
}
}
catch (...)
{
std::terminate();
return;
p.set_exception(current_exception());
}
}
public:
/// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
shared_state(Executor& ex)
: ex(ex), thr(&shared_state::worker_thread, this)
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c shared_state destructor.
*/
~shared_state()
{
// signal to the worker thread that there will be no more submissions.
close();
thr.join();
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
work_queue.close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return work_queue.closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
void submit(BOOST_THREAD_RV_REF(work) closure)
{
work_queue.push(boost::move(closure));
}
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
//work_queue.push(work(closure));
submit(work(closure));
}
#endif
void submit(void (*closure)())
{
//work_queue.push(work(closure));
submit(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
//work_queue.push(work(boost::move(closure)));
work w((boost::forward<Closure>(closure)));
submit(boost::move(w));
}
};
public:
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; }
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
work task;
try
{
if (work_queue.try_pull(task) == queue_op_status::success)
{
boost::promise<void> p;
try_executing_one_task tmp(task,p);
ex.submit(tmp);
p.get_future().wait();
return true;
}
return false;
}
catch (...)
{
std::terminate();
//return false;
}
}
private:
/**
* Effects: schedule one task or yields
* Throws: whatever the current task constructor throws or the task() throws.
*/
void schedule_one_or_yield()
{
if ( ! try_executing_one())
{
this_thread::yield();
}
}
/**
* The main loop of the worker thread
*/
void worker_thread()
{
while (!closed())
{
schedule_one_or_yield();
}
while (try_executing_one())
{
}
}
public:
/// serial_executor is not copyable.
BOOST_THREAD_NO_COPYABLE(serial_executor)
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
template <class Executor>
serial_executor(Executor& ex)
: pimpl(make_shared<shared_state>(ex))
: ex(ex), thr(&serial_executor::worker_thread, this)
{
}
/**
@@ -196,24 +133,8 @@ namespace executors
*/
~serial_executor()
{
}
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
Executor& underlying_executor() BOOST_NOEXCEPT
{
return pimpl->underlying_executor();
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
{
return false;
// signal to the worker thread that there will be no more submissions.
close();
}
/**
@@ -222,7 +143,7 @@ namespace executors
*/
void close()
{
pimpl->close();
work_queue.close();
}
/**
@@ -230,7 +151,7 @@ namespace executors
*/
bool closed()
{
return pimpl->closed();
return work_queue.closed();
}
/**
@@ -244,40 +165,47 @@ namespace executors
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
// void submit(BOOST_THREAD_RV_REF(work) closure)
// {
// work_queue.push(boost::move(closure));
// }
void submit(BOOST_THREAD_RV_REF(work) closure)
{
work_queue.push(boost::move(closure));
}
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
submit(work(closure));
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
submit(work(closure));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
work w((boost::forward<Closure>(closure)));
submit(boost::move(w));
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return false;
do {
if ( ! try_executing_one())
{
return false;
}
} while (! pred());
return true;
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::serial_executor;

View File

@@ -12,20 +12,18 @@
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/generic_executor_ref.hpp>
#include <boost/thread/future.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
{
namespace executors
{
template <class Executor>
class serial_executor_cont
{
public:
@@ -33,156 +31,93 @@ namespace executors
typedef executors::work work;
private:
struct shared_state {
typedef executors::work work;
generic_executor_ref ex_;
future<void> fut_; // protected by mtx_
bool closed_; // protected by mtx_
mutex mtx_;
Executor ex_;
future<void> fut_; // protected by mtx_
bool closed_; // protected by mtx_
mutex mtx_;
struct continuation {
work task;
template <class X>
struct result {
typedef void type;
};
continuation(BOOST_THREAD_RV_REF(work) tsk)
: task(boost::move(tsk)) {}
void operator()(future<void> f)
{
try {
task();
} catch (...) {
std::terminate();
}
}
struct continuation {
work task;
template <class X>
struct result {
typedef void type;
};
bool closed(lock_guard<mutex>&) const
continuation(BOOST_THREAD_RV_REF(work) tsk)
: task(boost::move(tsk)) {}
void operator()(future<void> f)
{
return closed_;
}
public:
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
Executor& underlying_executor() BOOST_NOEXCEPT { return ex_; }
/// shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*
* \b Notes:
* * The lifetime of the associated executor must outlive the serial executor.
* * The current implementation doesn't support submission from synchronous continuation, that is,
* - the executor must execute the continuation asynchronously or
* - the continuation can not submit to this serial executor.
*/
shared_state(Executor& ex)
: ex_(ex), fut_(make_ready_future()), closed_(false)
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor_cont destructor.
*/
~shared_state()
{
// signal to the worker thread that there will be no more submissions.
close();
}
/**
* \b Effects: close the \c serial_executor_cont for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
lock_guard<mutex> lk(mtx_);
closed_ = true;;
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish.
* If the invoked closure throws an exception the \c serial_executor_cont will call \c std::terminate, as is the case with threads.
*
* \b Throws: \c sync_queue_is_closed if the executor is closed.
* Whatever exception that can be throw while storing the closure.
*
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure)));
}
#endif
void submit(void (*closure)())
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure)));
}
template <typename Closure>
void submit(BOOST_THREAD_RV_REF(Closure) closure)
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(boost::forward<Closure>(closure))));
try {
task();
} catch (...) {
std::terminate();
}
}
};
bool closed(lock_guard<mutex>&) const
{
return closed_;
}
public:
/**
* \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex_; }
/// serial_executor_cont is not copyable.
BOOST_THREAD_NO_COPYABLE(serial_executor_cont)
/**
* \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor.
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*
* \b Notes:
* * The lifetime of the associated executor must outlive the serial executor.
* * The current implementation doesn't support submission from synchronous continuation, that is,
* - the executor must execute the continuation asynchronously or
* - the continuation can not submit to this serial executor.
*/
template <class Executor>
serial_executor_cont(Executor& ex)
: pimpl(make_shared<shared_state>(ex))
: ex_(ex), fut_(make_ready_future()), closed_(false)
{
}
/**
* \b Effects: Destroys the thread pool.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor.
* \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor_cont destructor.
*/
~serial_executor_cont()
{
// signal to the worker thread that there will be no more submissions.
close();
}
/**
* \par Returns
* The underlying executor wrapped on a generic executor reference.
* \b Effects: close the \c serial_executor_cont for submissions.
* The loop will work until there is no more closures to run.
*/
Executor& underlying_executor() BOOST_NOEXCEPT
void close()
{
return pimpl->underlying_executor();
lock_guard<mutex> lk(mtx_);
closed_ = true;;
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
/**
* Effects: none.
* Returns: always false.
* Throws: No.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
bool try_executing_one()
@@ -190,64 +125,41 @@ namespace executors
return false;
}
/**
* \b Effects: close the \c serial_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
pimpl->close();
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
{
return pimpl->closed();
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads.
* \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish.
* If the invoked closure throws an exception the \c serial_executor_cont will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* \b Throws: \c sync_queue_is_closed if the executor is closed.
* Whatever exception that can be throw while storing the closure.
*
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure)));
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(closure)));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
fut_ = fut_.then(ex_, continuation(work(boost::forward<Closure>(closure))));
}
/**
* \b Returns: always false as a serial executor can not re-enter.
* Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks.
*/
template <typename Pred>
bool reschedule_until(Pred const& pred)
{
return false;
}
private:
shared_ptr<shared_state> pimpl;
};
}
using executors::serial_executor_cont;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2014-2015 Vicente J. Botet Escriba
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -18,9 +18,6 @@
#include <boost/thread/scoped_thread.hpp>
#include <boost/thread/csbl/vector.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost
@@ -32,148 +29,33 @@ namespace executors
public:
/// type-erasure to store the works to do
typedef executors::work work;
private:
bool closed_;
typedef scoped_thread<> thread_t;
typedef csbl::vector<thread_t> threads_type;
threads_type threads_;
mutable mutex mtx_;
struct shared_state {
typedef executors::work work;
bool closed_;
//typedef scoped_thread<> thread_t;
typedef thread thread_t;
typedef csbl::vector<thread_t> threads_type;
threads_type threads_;
mutable mutex mtx_;
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return false;
}
/// thread_executor::shared_state is not copyable.
BOOST_THREAD_NO_COPYABLE(shared_state)
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
shared_state()
: closed_(false)
{
}
/**
* \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor.
*/
~shared_state()
{
// signal to all the worker thread that there will be no more submissions.
close();
// all the scoped threads will join before destroying
join();
}
/**
* \b Effects: join all the threads.
*/
void join()
{
for (unsigned i = 0; i < threads_.size(); ++i)
{
if (this_thread::get_id() == threads_[i].get_id()) continue;
threads_[i].join();
}
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return false;
}
/**
* \b Effects: close the \c thread_executor for submissions.
* The loop will work until there is no more closures to run.
*/
void close()
{
lock_guard<mutex> lk(mtx_);
closed_ = true;
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed(lock_guard<mutex>& )
{
return closed_;
}
bool closed()
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
*
* \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
* If invoked closure throws an exception the \c thread_executor will call \c std::terminate, as is the case with threads.
*
* \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
*
* \b Throws: \c sync_queue_is_closed if the thread pool is closed.
* Whatever exception that can be throw while storing the closure.
*/
#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
template <typename Closure>
void submit(Closure & closure)
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(closure);
threads_.push_back(thread_t(boost::move(th)));
}
#endif
void submit(void (*closure)())
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(closure);
threads_.push_back(thread_t(boost::move(th)));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(boost::forward<Closure>(closure));
threads_.push_back(thread_t(boost::move(th)));
}
/**
* \b Requires: This must be called from an scheduled task.
*
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const&)
{
return false;
}
};
public:
/// thread_executor is not copyable.
BOOST_THREAD_NO_COPYABLE(thread_executor)
/**
* \b Effects: creates a inline executor that runs closures immediately.
*
* \b Throws: Nothing.
*/
thread_executor()
: pimpl(make_shared<shared_state>())
: closed_(false)
{
}
/**
@@ -183,16 +65,9 @@ namespace executors
*/
~thread_executor()
{
}
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
* Throws: whatever the current task constructor throws or the task() throws.
*/
bool try_executing_one()
{
return pimpl->try_executing_one();
// signal to all the worker thread that there will be no more submissions.
close();
// all the scoped threads will join before destroying
}
/**
@@ -201,15 +76,21 @@ namespace executors
*/
void close()
{
pimpl->close();
lock_guard<mutex> lk(mtx_);
closed_ = true;
}
/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed(lock_guard<mutex>& )
{
return closed_;
}
bool closed()
{
return pimpl->closed();
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
/**
@@ -228,18 +109,30 @@ namespace executors
template <typename Closure>
void submit(Closure & closure)
{
pimpl->submit(closure);
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(closure);
threads_.push_back(thread_t(boost::move(th)));
}
#endif
void submit(void (*closure)())
{
pimpl->submit(closure);
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(closure);
threads_.push_back(thread_t(boost::move(th)));
}
template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
pimpl->submit(boost::forward<Closure>(closure));
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(boost::forward<Closure>(closure));
threads_.push_back(thread_t(boost::move(th)));
}
/**
@@ -248,14 +141,12 @@ namespace executors
* \b Effects: reschedule functions until pred()
*/
template <typename Pred>
bool reschedule_until(Pred const& p)
bool reschedule_until(Pred const&)
{
return pimpl->reschedule_until(p);
return false;
}
private:
shared_ptr<shared_state> pimpl;
};
};
}
using executors::thread_executor;
}

View File

@@ -116,9 +116,9 @@ BOOST_THREAD_INLINE_NAMESPACE(v2)
template<typename F>
friend void task_region_final(BOOST_THREAD_FWD_REF(F) f);
template <class Ex, typename F>
friend void task_region(Ex const&, BOOST_THREAD_FWD_REF(F) f);
friend void task_region(Ex&, BOOST_THREAD_FWD_REF(F) f);
template<class Ex, typename F>
friend void task_region_final(Ex const&, BOOST_THREAD_FWD_REF(F) f);
friend void task_region_final(Ex&, BOOST_THREAD_FWD_REF(F) f);
void wait_all()
{
@@ -153,20 +153,21 @@ protected:
#if defined BOOST_THREAD_TASK_REGION_HAS_SHARED_CANCELED && defined BOOST_THREAD_PROVIDES_EXECUTORS
task_region_handle_gen()
: canceled(false)
, ex(0)
{}
task_region_handle_gen(Executor const& ex)
task_region_handle_gen(Executor& ex)
: canceled(false)
, ex(ex)
, ex(&ex)
{}
#endif
#if ! defined BOOST_THREAD_TASK_REGION_HAS_SHARED_CANCELED && defined BOOST_THREAD_PROVIDES_EXECUTORS
task_region_handle_gen()
//: ex(0)
: ex(0)
{}
task_region_handle_gen(Executor const& ex)
: ex(ex)
task_region_handle_gen(Executor& ex)
: ex(&ex)
{}
#endif
@@ -187,7 +188,7 @@ protected:
bool canceled;
#endif
#if defined BOOST_THREAD_PROVIDES_EXECUTORS
Executor ex;
Executor* ex;
#endif
exception_list exs;
typedef csbl::vector<future<void> > group_type;
@@ -210,13 +211,13 @@ protected:
}
}
#if defined BOOST_THREAD_PROVIDES_EXECUTORS
group.push_back(async(ex, detail::wrapped<task_region_handle_gen<Executor>, F>(*this, forward<F>(f))));
group.push_back(async(*ex, detail::wrapped<task_region_handle_gen<Executor>, F>(*this, forward<F>(f))));
#else
group.push_back(async(detail::wrapped<task_region_handle_gen<Executor>, F>(*this, forward<F>(f))));
#endif
#else
#if defined BOOST_THREAD_PROVIDES_EXECUTORS
group.push_back(async(ex, forward<F>(f)));
group.push_back(async(*ex, forward<F>(f)));
#else
group.push_back(async(forward<F>(f)));
#endif
@@ -244,18 +245,17 @@ protected:
class task_region_handle :
public task_region_handle_gen<default_executor>
{
//default_executor tp;
default_executor tp;
template <typename F>
friend void task_region(BOOST_THREAD_FWD_REF(F) f);
template<typename F>
friend void task_region_final(BOOST_THREAD_FWD_REF(F) f);
protected:
task_region_handle()
: task_region_handle_gen<default_executor>()
task_region_handle() : task_region_handle_gen<default_executor>()
{
#if defined BOOST_THREAD_PROVIDES_EXECUTORS
//ex = &tp;
ex = &tp;
#endif
}
BOOST_DELETED_FUNCTION(task_region_handle(const task_region_handle&))
@@ -265,7 +265,7 @@ protected:
};
template <typename Executor, typename F>
void task_region_final(Executor const& ex, BOOST_THREAD_FWD_REF(F) f)
void task_region_final(Executor& ex, BOOST_THREAD_FWD_REF(F) f)
{
task_region_handle_gen<Executor> tr(ex);
try
@@ -280,7 +280,7 @@ protected:
}
template <typename Executor, typename F>
void task_region(Executor const& ex, BOOST_THREAD_FWD_REF(F) f)
void task_region(Executor& ex, BOOST_THREAD_FWD_REF(F) f)
{
task_region_final(ex, forward<F>(f));
}

File diff suppressed because it is too large Load Diff

View File

@@ -22,6 +22,7 @@ namespace boost
#ifdef BOOST_THREAD_PROVIDES_EXECUTORS
executor = 4,
#endif
inherit = 8,
any = async | deferred
}
BOOST_SCOPED_ENUM_DECLARE_END(launch)

View File

@@ -13,8 +13,7 @@
#include <boost/thread/mutex.hpp>
#include <boost/thread/pthread/condition_variable_fwd.hpp>
//#include <boost/shared_ptr.hpp>
#include <boost/thread/csbl/memory/shared_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/assert.hpp>
#ifdef BOOST_THREAD_USES_CHRONO
@@ -128,7 +127,7 @@ namespace boost
> notify_list_t;
notify_list_t notify;
typedef std::vector<csbl::shared_ptr<shared_state_base> > async_states_t;
typedef std::vector<shared_ptr<shared_state_base> > async_states_t;
async_states_t async_states_;
//#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
@@ -163,7 +162,7 @@ namespace boost
notify.push_back(std::pair<condition_variable*, mutex*>(cv, m));
}
void make_ready_at_thread_exit(csbl::shared_ptr<shared_state_base> as)
void make_ready_at_thread_exit(shared_ptr<shared_state_base> as)
{
async_states_.push_back(as);
}

View File

@@ -91,7 +91,19 @@ namespace boost
cv.wait_until(lk, t);
}
#if defined BOOST_THREAD_SLEEP_FOR_IS_STEADY && ! defined BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC
#if defined BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC && defined BOOST_CHRONO_HAS_CLOCK_STEADY
template <class Rep, class Period>
void sleep_for(const chrono::duration<Rep, Period>& d)
{
using namespace chrono;
if (d > duration<Rep, Period>::zero())
{
steady_clock::time_point c_timeout = steady_clock::now() + ceil<nanoseconds>(d);
sleep_until(c_timeout);
}
}
#elif defined BOOST_THREAD_SLEEP_FOR_IS_STEADY
template <class Rep, class Period>
void sleep_for(const chrono::duration<Rep, Period>& d)
@@ -127,7 +139,8 @@ namespace boost
using namespace chrono;
if (d > duration<Rep, Period>::zero())
{
steady_clock::time_point c_timeout = steady_clock::now() + ceil<nanoseconds>(d);
//system_clock::time_point c_timeout = time_point_cast<system_clock::duration>(system_clock::now() + ceil<nanoseconds>(d));
system_clock::time_point c_timeout = system_clock::now() + ceil<system_clock::duration>(d);
sleep_until(c_timeout);
}
}

View File

@@ -830,7 +830,7 @@ namespace boost
}
namespace detail {
void BOOST_THREAD_DECL make_ready_at_thread_exit(csbl::shared_ptr<shared_state_base> as)
void BOOST_THREAD_DECL make_ready_at_thread_exit(shared_ptr<shared_state_base> as)
{
detail::thread_data_base* const current_thread_data(detail::get_current_thread_data());
if(current_thread_data)

View File

@@ -31,7 +31,8 @@ project
<toolset>gcc:<cxxflags>-Wno-long-long
#<toolset>gcc:<cxxflags>-ansi
#<toolset>gcc:<cxxflags>-fpermissive
<toolset>gcc:<cxxflags>-Wno-variadic-macros
<toolset>gcc-4:<cxxflags>-Wno-variadic-macros
<toolset>gcc-5:<cxxflags>-Wno-variadic-macros
#<toolset>gcc:<cxxflags>-Wunused-local-typedefs
<toolset>gcc:<cxxflags>-Wunused-function
<toolset>gcc:<cxxflags>-Wno-unused-parameter
@@ -810,10 +811,7 @@ rule thread-compile ( sources : reqs * : name )
[ thread-run2 ../example/user_scheduler.cpp : ex_user_scheduler ]
[ thread-run2 ../example/executor.cpp : ex_executor ]
[ thread-run2 ../example/generic_executor_ref.cpp : ex_generic_executor_ref ]
[ thread-run2 ../example/generic_executor.cpp : ex_generic_executor ]
[ thread-run2 ../example/generic_serial_executor.cpp : ex_generic_serial_executor ]
[ thread-run2 ../example/serial_executor.cpp : ex_serial_executor ]
#[ thread-run2 ../example/generic_serial_executor_cont.cpp : ex_generic_serial_executor_cont ]
#[ thread-run2 ../example/serial_executor_cont.cpp : ex_serial_executor_cont ]
[ thread-run2 ../example/future_when_all.cpp : ex_future_when_all ]
[ thread-run2 ../example/parallel_accumulate.cpp : ex_parallel_accumulate ]
@@ -962,12 +960,9 @@ rule thread-compile ( sources : reqs * : name )
test-suite ts_
:
#[ thread-run test_11256.cpp ]
#[ thread-run2 ../example/generic_serial_executor_cont.cpp : ex_generic_serial_executor_cont2 ]
#[ thread-run2 ../example/serial_executor_cont.cpp : ex_serial_executor_cont2 ]
#[ thread-run test_11256.cpp ]
#[ thread-run test_11499.cpp ]
#[ thread-run test_11611.cpp ]
#[ thread-run test_11633.cpp ]
;

View File

@@ -38,6 +38,27 @@ struct TestCallback
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
//boost::future<void> ff = future.get();
return boost::make_ready_future();
}
result_type operator()(boost::shared_future<void> future) const
{
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
assert(future.is_ready());
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
future.wait();
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
return boost::make_ready_future();
}
result_type operator()(boost::shared_future<boost::future<void> > future) const
{
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
assert(future.is_ready());
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
assert(future.get().is_ready());
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
//boost::future<void> ff = future.get();
return boost::make_ready_future();
}
};
@@ -130,10 +151,12 @@ int main()
BOOST_STATIC_ASSERT(std::is_same<decltype(f3), boost::future<boost::future<void> > >::value);
f3.wait();
}
#if 1
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
// fixme
for (int i=0; i< number_of_tests; i++)
{
boost::basic_thread_pool executor(1);
boost::basic_thread_pool executor(2);
auto f1 = boost::make_ready_future().then(executor, TestCallback());
BOOST_STATIC_ASSERT(std::is_same<decltype(f1), boost::future<boost::future<void> > >::value);
@@ -146,6 +169,7 @@ int main()
BOOST_STATIC_ASSERT(std::is_same<decltype(f3), boost::future<boost::future<void> > >::value);
f3.wait();
}
#endif
std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
for (int i=0; i< number_of_tests; i++)
{

View File

@@ -29,10 +29,10 @@ int main()
});
{
//boost::serial_executor_cont<boost::loop_executor> serial(ex);
boost::serial_executor<boost::loop_executor> serial(ex);
//boost::serial_executor_cont serial(ex);
boost::serial_executor serial(ex);
for (size_t i = 0; i < 1000000; i++)
for (size_t i = 0; i < 100000; i++)
serial.submit([i] {
//std::cout << i << ".";
});

View File

@@ -5,12 +5,13 @@
#define BOOST_THREAD_VERSION 2
#define BOOST_THREAD_PROVIDES_INTERRUPTIONS
#define BOOST_TEST_MODULE Boost.Threads: 2309
#include <boost/test/unit_test.hpp>
//#define BOOST_TEST_MODULE Boost.Threads: 2309
//#include <boost/test/unit_test.hpp>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/detail/lightweight_test.hpp>
using namespace std;
@@ -40,7 +41,7 @@
}
}
BOOST_AUTO_TEST_CASE(test)
void ticket_2309_test()
{
try
{
@@ -57,9 +58,13 @@
}
catch (...)
{
BOOST_CHECK(false && "exception raised");
BOOST_TEST(false && "exception raised");
}
}
int main()
{
ticket_2309_test();
}

View File

@@ -24,7 +24,7 @@
using namespace boost::chrono;
typedef boost::scheduled_thread_pool<> scheduled_tp;
typedef boost::scheduled_thread_pool scheduled_tp;
void fn(int x)
{
@@ -46,18 +46,19 @@ void func2(scheduled_tp* tp, steady_clock::duration d)
void test_timing(const int n)
{
//This function should take n seconds to execute.
boost::scheduled_thread_pool<> se(4);
boost::scheduled_thread_pool se(4);
for(int i = 1; i <= n; i++)
{
se.submit_after(boost::bind(fn,i), milliseconds(i*100));
}
boost::this_thread::sleep_for(boost::chrono::seconds(10));
//dtor is called here so all task will have to be executed before we return
}
void test_deque_timing()
{
boost::scheduled_thread_pool<> se(4);
boost::scheduled_thread_pool se(4);
for(int i = 0; i < 10; i++)
{
steady_clock::duration d = milliseconds(i*100);
@@ -84,10 +85,10 @@ void test_deque_multi(const int n)
int main()
{
//steady_clock::time_point start = steady_clock::now();
steady_clock::time_point start = steady_clock::now();
test_timing(5);
//steady_clock::duration diff = steady_clock::now() - start;
//BOOST_TEST(diff > milliseconds(500));
steady_clock::duration diff = steady_clock::now() - start;
BOOST_TEST(diff > milliseconds(500));
test_deque_timing();
test_deque_multi(4);
test_deque_multi(8);

View File

@@ -27,6 +27,7 @@ typedef boost::executors::basic_thread_pool thread_pool;
void fn(int x)
{
//std::cout << "[" << __LINE__ << "] " << steady_clock::now() << std::endl;
std::cout << x << std::endl;
}
@@ -74,7 +75,7 @@ int main()
test_after(5, sch);
test_at(5, sch);
test_on(5, sch, tp);
std::cout << "[" << __LINE__ << "] " << std::endl;
boost::this_thread::sleep_for(boost::chrono::seconds(10));
return boost::report_errors();
}

View File

@@ -28,6 +28,7 @@ typedef boost::executors::basic_thread_pool thread_pool;
void fn(int x)
{
//std::cout << "[" << __LINE__ << "] " << steady_clock::now() << std::endl;
std::cout << x << std::endl;
}
@@ -40,10 +41,14 @@ void test_timing(const int n)
sa.submit_after(boost::bind(fn,i),seconds(i));
sa.submit_after(boost::bind(fn,i), milliseconds(i*100));
}
boost::this_thread::sleep_for(boost::chrono::seconds(10));
}
int main()
{
steady_clock::time_point start = steady_clock::now();
test_timing(5);
steady_clock::duration diff = steady_clock::now() - start;
BOOST_TEST(diff > seconds(5));
return boost::report_errors();
}