2
0
mirror of https://github.com/boostorg/thread.git synced 2026-02-11 12:12:12 +00:00

merge from develop.

This commit is contained in:
Vicente J. Botet Escriba
2015-05-28 19:59:13 +02:00
25 changed files with 436 additions and 149 deletions

View File

@@ -173,7 +173,6 @@ namespace boost
private:
bool start_thread_noexcept();
bool start_thread_noexcept(const attributes& attr);
//public:
void start_thread()
{
if (!start_thread_noexcept())

View File

@@ -13,7 +13,7 @@
#include <boost/thread/detail/config.hpp>
#include <boost/thread/detail/delete.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/concurrent_queues/sync_queue.hpp>
#include <boost/thread/executors/work.hpp>
#include <boost/thread/csbl/vector.hpp>

View File

@@ -137,7 +137,7 @@ namespace boost
continuations_type continuations;
// This declaration should be only included conditionally, but is included to maintain the same layout.
virtual void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base>)
virtual void launch_continuation()
{
}
@@ -150,6 +150,19 @@ namespace boost
policy_(launch::none),
continuations()
{}
shared_state_base(exceptional_ptr const& ex):
exception(ex.ptr_),
done(true),
is_valid_(true),
is_deferred_(false),
is_constructed(false),
cnt_(0),
policy_(launch::none),
continuations()
{}
virtual ~shared_state_base()
{
BOOST_ASSERT(cnt_==0);
@@ -221,8 +234,7 @@ namespace boost
continuations.clear();
relocker rlk(lock);
for (continuations_type::iterator it = the_continuations.begin(); it != the_continuations.end(); ++it) {
boost::unique_lock<boost::mutex> cont_lock((*it)->mutex);
(*it)->launch_continuation(cont_lock, *it);
(*it)->launch_continuation();
}
}
}
@@ -407,7 +419,7 @@ namespace boost
return policy_;
}
future_state::state get_state(boost::unique_lock<boost::mutex>& lk) const
future_state::state get_state(boost::unique_lock<boost::mutex>&) const
{
if(!done)
{
@@ -483,6 +495,10 @@ namespace boost
shared_state():
result()
{}
shared_state(exceptional_ptr const& ex):
detail::shared_state_base(ex), result()
{}
~shared_state()
{}
@@ -624,6 +640,10 @@ namespace boost
result(0)
{}
shared_state(exceptional_ptr const& ex):
detail::shared_state_base(ex), result(0)
{}
~shared_state()
{
}
@@ -687,6 +707,10 @@ namespace boost
shared_state()
{}
shared_state(exceptional_ptr const& ex):
detail::shared_state_base(ex)
{}
void mark_finished_with_result_internal(boost::unique_lock<boost::mutex>& lock)
{
mark_finished_internal(lock);
@@ -1150,16 +1174,7 @@ namespace boost
static //BOOST_CONSTEXPR
future_ptr make_exceptional_future_ptr(exceptional_ptr const& ex) {
promise<R> p;
p.set_exception(ex.ptr_);
return p.get_future().future_;
}
void set_exceptional_if_invalid() {
if (valid()) return;
promise<R> p;
p.set_exception(future_uninitialized());
future_ = p.get_future().future_;
return future_ptr(new detail::shared_state<R>(ex));
}
future_ptr future_;
@@ -2867,7 +2882,7 @@ namespace boost
private:
task_shared_state(task_shared_state&);
#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
typedef R (*CallableType)(BOOST_THREAD_RV_REF(ArgTypes) ... );
typedef R (*CallableType)(ArgTypes ... );
#else
typedef R (*CallableType)();
#endif
@@ -3098,7 +3113,7 @@ namespace boost
private:
task_shared_state(task_shared_state&);
#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
typedef void (*CallableType)(BOOST_THREAD_RV_REF(ArgTypes)...);
typedef void (*CallableType)(ArgTypes...);
#else
typedef void (*CallableType)();
#endif
@@ -3421,7 +3436,7 @@ namespace boost
// execution
#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
void operator()(BOOST_THREAD_RV_REF(ArgTypes)... args) {
void operator()(ArgTypes... args) {
if(!task) {
boost::throw_exception(task_moved());
}
@@ -4092,12 +4107,6 @@ namespace detail {
}
#endif
template <typename T>
BOOST_THREAD_FUTURE<T> make_ready_future(exception_ptr ex) {
promise<T> p;
p.set_exception(ex);
return BOOST_THREAD_MAKE_RV_REF(p.get_future());
}
template <typename T>
BOOST_THREAD_FUTURE<T> make_exceptional_future(exception_ptr ex) {
@@ -4119,16 +4128,9 @@ namespace detail {
p.set_exception(boost::current_exception());
return BOOST_THREAD_MAKE_RV_REF(p.get_future());
}
template <typename T>
BOOST_THREAD_FUTURE<T> make_exceptional_future_if_invalid(BOOST_THREAD_FWD_REF(BOOST_THREAD_FUTURE<T>) fut) {
fut.set_exceptional_if_invalid();
return boost::move(fut);
}
template <typename T>
shared_future<T> make_exceptional_future_if_invalid(shared_future<T> fut) {
fut.set_exceptional_if_invalid();
return fut;
BOOST_THREAD_FUTURE<T> make_ready_future(exception_ptr ex) {
return make_exceptional_future<T>(ex);
}
#if 0
@@ -4187,8 +4189,9 @@ namespace detail
}
void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base> that) {
this->thr_ = thread(&future_async_continuation_shared_state::run, that);
void launch_continuation() {
boost::lock_guard<boost::mutex> lk(this->mutex);
this->thr_ = thread(&future_async_continuation_shared_state::run, this->shared_from_this());
}
static void run(shared_ptr<boost::detail::shared_state_base> that_) {
@@ -4218,8 +4221,9 @@ namespace detail
centinel(parent.future_) {
}
void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base> that) {
this->thr_ = thread(&future_async_continuation_shared_state::run, that);
void launch_continuation() {
boost::lock_guard<boost::mutex> lk(this->mutex);
this->thr_ = thread(&future_async_continuation_shared_state::run, this->shared_from_this());
}
static void run(shared_ptr<boost::detail::shared_state_base> that_) {
@@ -4269,9 +4273,8 @@ namespace detail
this->set_executor();
}
void launch_continuation(boost::unique_lock<boost::mutex>& lck, shared_ptr<shared_state_base> that ) {
relocker relock(lck);
run_it<future_executor_continuation_shared_state> fct(that);
void launch_continuation() {
run_it<future_executor_continuation_shared_state> fct(this->shared_from_this());
ex->submit(fct);
}
@@ -4308,9 +4311,8 @@ namespace detail
this->set_executor();
}
void launch_continuation(boost::unique_lock<boost::mutex>& lck, shared_ptr<shared_state_base> that ) {
relocker relock(lck);
run_it<future_executor_continuation_shared_state> fct(that);
void launch_continuation() {
run_it<future_executor_continuation_shared_state> fct(this->shared_from_this());
ex->submit(fct);
}
@@ -4351,8 +4353,9 @@ namespace detail
centinel(parent.future_) {
}
void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base> that) {
this->thr_ = thread(&shared_future_async_continuation_shared_state::run, that);
void launch_continuation() {
boost::lock_guard<boost::mutex> lk(this->mutex);
this->thr_ = thread(&shared_future_async_continuation_shared_state::run, this->shared_from_this());
}
static void run(shared_ptr<boost::detail::shared_state_base> that_) {
@@ -4381,8 +4384,9 @@ namespace detail
centinel(parent.future_) {
}
void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base> that) {
this->thr_ = thread(&shared_future_async_continuation_shared_state::run, that);
void launch_continuation() {
boost::lock_guard<boost::mutex> lk(this->mutex);
this->thr_ = thread(&shared_future_async_continuation_shared_state::run, this->shared_from_this());
}
static void run(shared_ptr<boost::detail::shared_state_base> that_) {
@@ -4419,9 +4423,8 @@ namespace detail
this->set_executor();
}
void launch_continuation(boost::unique_lock<boost::mutex>& lck, shared_ptr<shared_state_base> that) {
relocker relock(lck);
run_it<shared_future_executor_continuation_shared_state> fct(that);
void launch_continuation() {
run_it<shared_future_executor_continuation_shared_state> fct(this->shared_from_this());
ex->submit(fct);
}
@@ -4457,9 +4460,8 @@ namespace detail
centinel(parent.future_) {
}
void launch_continuation(boost::unique_lock<boost::mutex>& lck, shared_ptr<shared_state_base> that) {
relocker relock(lck);
run_it<shared_future_executor_continuation_shared_state> fct(that);
void launch_continuation() {
run_it<shared_future_executor_continuation_shared_state> fct(this->shared_from_this());
ex->submit(fct);
}
@@ -4499,7 +4501,8 @@ namespace detail
this->set_deferred();
}
virtual void launch_continuation(boost::unique_lock<boost::mutex>&lk, shared_ptr<shared_state_base> ) {
virtual void launch_continuation() {
boost::unique_lock<boost::mutex> lk(this->mutex);
if (this->is_deferred_) {
this->is_deferred_=false;
this->execute(lk);
@@ -4537,7 +4540,8 @@ namespace detail
~future_deferred_continuation_shared_state() {
}
virtual void launch_continuation(boost::unique_lock<boost::mutex>& lk, shared_ptr<shared_state_base> ) {
virtual void launch_continuation() {
boost::unique_lock<boost::mutex> lk(this->mutex);
if (this->is_deferred_) {
this->is_deferred_=false;
this->execute(lk);
@@ -4576,7 +4580,8 @@ namespace detail
this->set_deferred();
}
virtual void launch_continuation(boost::unique_lock<boost::mutex>& lk, shared_ptr<shared_state_base> ) {
virtual void launch_continuation() {
boost::unique_lock<boost::mutex> lk(this->mutex);
if (this->is_deferred_) {
this->is_deferred_=false;
this->execute(lk);
@@ -4612,7 +4617,8 @@ namespace detail
this->set_deferred();
}
virtual void launch_continuation(boost::unique_lock<boost::mutex>& lk, shared_ptr<shared_state_base> ) {
virtual void launch_continuation() {
boost::unique_lock<boost::mutex> lk(this->mutex);
if (this->is_deferred_) {
this->is_deferred_=false;
this->execute(lk);
@@ -4826,14 +4832,17 @@ namespace detail
boost::unique_lock<boost::mutex> lock(this->future_->mutex);
if (underlying_cast<int>(policy) & int(launch::async)) {
lock.unlock();
return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>(
lock, boost::move(*this), boost::forward<F>(func)
)));
} else if (underlying_cast<int>(policy) & int(launch::deferred)) {
lock.unlock();
return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_deferred_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>(
lock, boost::move(*this), boost::forward<F>(func)
)));
} else {
lock.unlock();
return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>(
lock, boost::move(*this), boost::forward<F>(func)
)));
@@ -4849,6 +4858,7 @@ namespace detail
BOOST_THREAD_ASSERT_PRECONDITION(this->future_!=0, future_uninitialized());
boost::unique_lock<boost::mutex> lock(this->future_->mutex);
lock.unlock();
return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_executor_continuation_shared_state<Ex, BOOST_THREAD_FUTURE<R>, future_type, F>(ex,
lock, boost::move(*this), boost::forward<F>(func)
)));
@@ -4864,15 +4874,18 @@ namespace detail
boost::unique_lock<boost::mutex> lock(this->future_->mutex);
if (underlying_cast<int>(this->launch_policy(lock)) & int(launch::async)) {
lock.unlock();
return boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>(
lock, boost::move(*this), boost::forward<F>(func)
);
} else if (underlying_cast<int>(this->launch_policy(lock)) & int(launch::deferred)) {
this->future_->wait_internal(lock);
lock.unlock();
return boost::detail::make_future_deferred_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>(
lock, boost::move(*this), boost::forward<F>(func)
);
} else {
lock.unlock();
return boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>(
lock, boost::move(*this), boost::forward<F>(func)
);
@@ -5013,41 +5026,87 @@ namespace detail
template<typename F, typename Rp>
struct future_unwrap_shared_state: shared_state<Rp>
{
F parent;
F wrapped;
typename F::value_type unwrapped;
public:
explicit future_unwrap_shared_state(BOOST_THREAD_RV_REF(F) f)
: parent(boost::move(f)) {}
typename F::value_type parent_value(boost::unique_lock<boost::mutex>& ) {
typename F::value_type r = parent.get();
r.set_exceptional_if_invalid();
return boost::move(r);
: wrapped(boost::move(f)) {
}
virtual void wait(boost::unique_lock<boost::mutex>& lk, bool ) { // todo see if rethrow must be used
parent_value(lk).wait();
}
virtual Rp get(boost::unique_lock<boost::mutex>& lk) {
return parent_value(lk).get();
}
#if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
typedef shared_ptr<shared_state_base> continuation_ptr_type;
virtual void set_continuation_ptr(continuation_ptr_type continuation, boost::unique_lock<boost::mutex>& lock)
void launch_continuation()
{
boost::unique_lock<boost::mutex> lk(parent.future_->mutex);
parent.future_->set_continuation_ptr(continuation, lk);
boost::unique_lock<boost::mutex> lk(this->mutex);
if (! unwrapped.valid() )
{
if (wrapped.has_exception()) {
this->mark_exceptional_finish_internal(wrapped.get_exception_ptr(), lk);
} else {
unwrapped = wrapped.get();
if (unwrapped.valid())
{
lk.unlock();
boost::unique_lock<boost::mutex> lk2(unwrapped.future_->mutex);
unwrapped.future_->set_continuation_ptr(this->shared_from_this(), lk2);
} else {
this->mark_exceptional_finish_internal(boost::copy_exception(future_uninitialized()), lk);
}
}
} else {
if (unwrapped.has_exception()) {
this->mark_exceptional_finish_internal(unwrapped.get_exception_ptr(), lk);
} else {
this->mark_finished_with_result_internal(unwrapped.get(), lk);
}
}
}
#endif
};
template<typename F>
struct future_unwrap_shared_state<F,void>: shared_state<void>
{
F wrapped;
typename F::value_type unwrapped;
public:
explicit future_unwrap_shared_state(BOOST_THREAD_RV_REF(F) f)
: wrapped(boost::move(f)) {
}
void launch_continuation()
{
boost::unique_lock<boost::mutex> lk(this->mutex);
if (! unwrapped.valid() )
{
if (wrapped.has_exception()) {
this->mark_exceptional_finish_internal(wrapped.get_exception_ptr(), lk);
} else {
unwrapped = wrapped.get();
if (unwrapped.valid())
{
lk.unlock();
boost::unique_lock<boost::mutex> lk2(unwrapped.future_->mutex);
unwrapped.future_->set_continuation_ptr(this->shared_from_this(), lk2);
} else {
this->mark_exceptional_finish_internal(boost::copy_exception(future_uninitialized()), lk);
}
}
} else {
if (unwrapped.has_exception()) {
this->mark_exceptional_finish_internal(unwrapped.get_exception_ptr(), lk);
} else {
this->mark_finished_with_result_internal(lk);
}
}
}
};
template <class F, class Rp>
BOOST_THREAD_FUTURE<Rp>
make_future_unwrap_shared_state(boost::unique_lock<boost::mutex> &lock, BOOST_THREAD_RV_REF(F) f) {
shared_ptr<future_unwrap_shared_state<F, Rp> >
h(new future_unwrap_shared_state<F, Rp>(boost::move(f)));
lock.lock();
h->parent.future_->set_continuation_ptr(h, lock);
h->wrapped.future_->set_continuation_ptr(h, lock);
lock.unlock();
return BOOST_THREAD_FUTURE<Rp>(h);
}

View File

@@ -120,6 +120,15 @@ namespace boost
unique_lock<mutex>& m,
duration_type const& wait_duration)
{
if (wait_duration.is_pos_infinity())
{
wait(m); // or do_wait(m,detail::timeout::sentinel());
return true;
}
if (wait_duration.is_special())
{
return true;
}
return timed_wait(m,get_system_time()+wait_duration);
}
@@ -149,6 +158,18 @@ namespace boost
unique_lock<mutex>& m,
duration_type const& wait_duration,predicate_type pred)
{
if (wait_duration.is_pos_infinity())
{
while (!pred())
{
wait(m); // or do_wait(m,detail::timeout::sentinel());
}
return true;
}
if (wait_duration.is_special())
{
return pred();
}
return timed_wait(m,get_system_time()+wait_duration,pred);
}
#endif

View File

@@ -339,8 +339,8 @@ namespace boost
{
if (wait_duration.is_pos_infinity())
{
wait(m); // or do_wait(m,detail::timeout::sentinel());
return true;
wait(m); // or do_wait(m,detail::timeout::sentinel());
return true;
}
if (wait_duration.is_special())
{
@@ -362,6 +362,18 @@ namespace boost
template<typename duration_type,typename predicate_type>
bool timed_wait(unique_lock<mutex>& m,duration_type const& wait_duration,predicate_type pred)
{
if (wait_duration.is_pos_infinity())
{
while (!pred())
{
wait(m); // or do_wait(m,detail::timeout::sentinel());
}
return true;
}
if (wait_duration.is_special())
{
return pred();
}
return do_wait(m,wait_duration.total_milliseconds(),pred);
}
#endif