From f7f0347780188d4d4ccaf7d008f225b30fdef1a7 Mon Sep 17 00:00:00 2001 From: "Vicente J. Botet Escriba" Date: Wed, 16 Oct 2013 06:15:51 +0000 Subject: [PATCH] Thread: first steps toward async(executor&, f). [SVN r86327] --- include/boost/thread/detail/move.hpp | 56 +++++- include/boost/thread/executor.hpp | 3 + include/boost/thread/future.hpp | 257 ++++++++++++++++++++++++- test/sync/futures/async/async_pass.cpp | 97 +++++++++- 4 files changed, 404 insertions(+), 9 deletions(-) diff --git a/include/boost/thread/detail/move.hpp b/include/boost/thread/detail/move.hpp index 4f90d3b3..fb3bd343 100644 --- a/include/boost/thread/detail/move.hpp +++ b/include/boost/thread/detail/move.hpp @@ -14,13 +14,22 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include #endif #include #include #include #include - +#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES +#include +#endif namespace boost { @@ -237,8 +246,51 @@ namespace detail namespace boost -{ namespace thread_detail +{ + namespace thread_detail { +#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES + template + struct remove_reference : boost::remove_reference {}; + template + struct decay : boost::decay {}; +#else + template + struct remove_reference + { + typedef Tp type; + }; + template + struct remove_reference + { + typedef Tp type; + }; + template + struct remove_reference< rv > { + typedef Tp type; + }; + + template + struct decay + { + private: + typedef typename boost::move_detail::remove_rvalue_reference::type Up0; + typedef typename boost::remove_reference::type Up; + public: + typedef typename conditional + < + is_array::value, + typename remove_extent::type*, + typename conditional + < + is_function::value, + typename add_pointer::type, + typename remove_cv::type + >::type + >::type type; + }; +#endif + #ifndef BOOST_NO_CXX11_RVALUE_REFERENCES template typename decay::type diff --git a/include/boost/thread/executor.hpp b/include/boost/thread/executor.hpp index 7d42542e..d257ff8f 100644 --- a/include/boost/thread/executor.hpp +++ b/include/boost/thread/executor.hpp @@ -135,6 +135,9 @@ namespace boost #if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) template executor_adaptor(BOOST_THREAD_RV_REF(Args) ... args) : ex(boost::forward(args)...) {} +#else + template + executor_adaptor(BOOST_THREAD_FWD_REF(A1) a1) : ex(boost::forward(a1)) {} #endif /** * \b Effects: close the \c executor for submissions. diff --git a/include/boost/thread/future.hpp b/include/boost/thread/future.hpp index d81d5417..63345d3a 100644 --- a/include/boost/thread/future.hpp +++ b/include/boost/thread/future.hpp @@ -71,6 +71,10 @@ //#define BOOST_THREAD_VECTOR std::vector #endif +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS +#include +#endif + #if defined BOOST_THREAD_PROVIDES_FUTURE #define BOOST_THREAD_FUTURE future #else @@ -86,6 +90,9 @@ namespace boost none = 0, async = 1, deferred = 2, +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS + executor = 4, +#endif any = async | deferred } BOOST_SCOPED_ENUM_DECLARE_END(launch) @@ -256,7 +263,13 @@ namespace boost is_deferred_ = false; policy_ = launch::async; } - +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS + void set_executor() + { + is_deferred_ = false; + policy_ = launch::executor; + } +#endif waiter_list::iterator register_external_waiter(boost::condition_variable_any& cv) { boost::unique_lock lock(mutex); @@ -3565,6 +3578,248 @@ namespace boost } } +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS + namespace detail { + ///////////////////////// + /// shared_state_nullary_task + ///////////////////////// + template + struct shared_state_nullary_task + { + shared_state* that; + Fp f_; + public: + shared_state_nullary_task(shared_state* st, BOOST_THREAD_FWD_REF(Fp) f) + : that(st), f_(boost::forward(f)) + {}; + void operator()() + { + try + { + that->mark_finished_with_result(f_()); + } +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + catch(thread_interrupted& ) + { + that->mark_interrupted_finish(); + } +#endif + catch(...) + { + that->mark_exceptional_finish(); + } + + } + }; + + template + struct shared_state_nullary_task + { + shared_state* that; + Fp f_; + public: + shared_state_nullary_task(shared_state* st, BOOST_THREAD_FWD_REF(Fp) f) + : that(st), f_(boost::forward(f)) + {}; + void operator()() + { + try + { + f_(); + that->mark_finished_with_result(); + } +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + catch(thread_interrupted& ) + { + that->mark_interrupted_finish(); + } +#endif + catch(...) + { + that->mark_exceptional_finish(); + } + } + }; + + template + struct shared_state_nullary_task + { + shared_state* that; + Fp f_; + public: + shared_state_nullary_task(shared_state* st, BOOST_THREAD_FWD_REF(Fp) f) + : that(st), f_(boost::forward(f)) + {}; + void operator()() + { + try + { + that->mark_finished_with_result(f_()); + } +#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS + catch(thread_interrupted& ) + { + that->mark_interrupted_finish(); + } +#endif + catch(...) + { + that->mark_exceptional_finish(); + } + } + }; + + ///////////////////////// + /// future_executor_shared_state_base + ///////////////////////// + template + struct future_executor_shared_state: shared_state + { + typedef shared_state base_type; + protected: + boost::executor& ex_; + public: + template + future_executor_shared_state(boost::executor& ex, BOOST_THREAD_FWD_REF(Fp) f) + : ex_(ex) + { + this->set_executor(); + shared_state_nullary_task t(this, boost::forward(f)); + ex_.submit(boost::move(t)); + } + + ~future_executor_shared_state() + { + this->wait(false); + } + }; + + //////////////////////////////// + // make_future_executor_shared_state + //////////////////////////////// + template + BOOST_THREAD_FUTURE + make_future_executor_shared_state(executor& ex, BOOST_THREAD_FWD_REF(Fp) f) + { + shared_ptr > + h(new future_executor_shared_state(ex, boost::forward(f))); + return BOOST_THREAD_FUTURE(h); + } + + } // detail + + //////////////////////////////// + // template + // future async(executor& ex, F&&, ArgTypes&&...); + //////////////////////////////// + + #if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) + #if defined BOOST_THREAD_RVALUE_REFERENCES_DONT_MATCH_FUNTION_PTR + + template + BOOST_THREAD_FUTURE + async(executor& ex, R(*f)(BOOST_THREAD_FWD_REF(ArgTypes)...), BOOST_THREAD_FWD_REF(ArgTypes)... args) + { + typedef R(*F)(BOOST_THREAD_FWD_REF(ArgTypes)...); + typedef detail::async_func::type, typename decay::type...> BF; + typedef typename BF::result_type Rp; + + return BOOST_THREAD_MAKE_RV_REF(boost::detail::make_future_executor_shared_state(ex, + BF( + thread_detail::decay_copy(boost::forward(f)) + , thread_detail::decay_copy(boost::forward(args))... + ) + )); + } + #endif // defined BOOST_THREAD_RVALUE_REFERENCES_DONT_MATCH_FUNTION_PTR + + template + BOOST_THREAD_FUTURE::type( + typename decay::type... + )>::type> + async(executor& ex, BOOST_THREAD_FWD_REF(F) f, BOOST_THREAD_FWD_REF(ArgTypes)... args) + { + typedef detail::async_func::type, typename decay::type...> BF; + typedef typename BF::result_type Rp; + + return BOOST_THREAD_MAKE_RV_REF(boost::detail::make_future_executor_shared_state(ex, + BF( + thread_detail::decay_copy(boost::forward(f)) + , thread_detail::decay_copy(boost::forward(args))... + ) + )); + } + #else // ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) + #if defined BOOST_THREAD_RVALUE_REFERENCES_DONT_MATCH_FUNTION_PTR + + template + BOOST_THREAD_FUTURE + async(executor& ex, R(*f)()) + { + typedef R(*F)(); + typedef detail::async_func::type> BF; + typedef typename BF::result_type Rp; + + return BOOST_THREAD_MAKE_RV_REF(boost::detail::make_future_executor_shared_state(ex, + BF( + thread_detail::decay_copy(boost::forward(f)) + ) + )); + } + + template + BOOST_THREAD_FUTURE + async(executor& ex, R(*f)(BOOST_THREAD_FWD_REF(A1)), BOOST_THREAD_FWD_REF(A1) a1) + { + typedef R(*F)(BOOST_THREAD_FWD_REF(A1)); + typedef detail::async_func::type, typename decay::type> BF; + typedef typename BF::result_type Rp; + + return BOOST_THREAD_MAKE_RV_REF(boost::detail::make_future_executor_shared_state(ex, + BF( + thread_detail::decay_copy(boost::forward(f)) + , thread_detail::decay_copy(boost::forward(a1)) + ) + )); + } + + #endif // defined BOOST_THREAD_RVALUE_REFERENCES_DONT_MATCH_FUNTION_PTR + template + BOOST_THREAD_FUTURE::type()>::type> + async(executor& ex, BOOST_THREAD_FWD_REF(F) f) + { + typedef detail::async_func::type> BF; + typedef typename BF::result_type Rp; + + return boost::detail::make_future_executor_shared_state(ex, + BF( + thread_detail::decay_copy(boost::forward(f)) + ) + ); + } + + template + BOOST_THREAD_FUTURE::type( + typename decay::type + )>::type> + async(executor& ex, BOOST_THREAD_FWD_REF(F) f, BOOST_THREAD_FWD_REF(A1) a1) + { + typedef detail::async_func::type, typename decay::type> BF; + typedef typename BF::result_type Rp; + + return BOOST_THREAD_MAKE_RV_REF(boost::detail::make_future_executor_shared_state(ex, + BF( + thread_detail::decay_copy(boost::forward(f)) + , thread_detail::decay_copy(boost::forward(a1)) + ) + )); + } + + #endif //! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) + + +#endif + //////////////////////////////// // template // future async(F&&, ArgTypes&&...); diff --git a/test/sync/futures/async/async_pass.cpp b/test/sync/futures/async/async_pass.cpp index 519e46fb..0ecfa239 100644 --- a/test/sync/futures/async/async_pass.cpp +++ b/test/sync/futures/async/async_pass.cpp @@ -22,8 +22,13 @@ // future::type> // async(launch policy, F&& f, Args&&... args); +// template +// future::type> +// async(executor& ex, F&& f, Args&&... args); + //#define BOOST_THREAD_VERSION 3 #define BOOST_THREAD_VERSION 4 +#define BOOST_THREAD_PROVIDES_EXECUTORS #include #include @@ -32,6 +37,9 @@ #include #include #include +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS +#include +#endif typedef boost::chrono::high_resolution_clock Clock; typedef boost::chrono::milliseconds ms; @@ -210,6 +218,30 @@ int main() } } +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS + { + try + { + boost::executor_adaptor ex(1); + boost::future f = boost::async(ex, &f0); + boost::this_thread::sleep_for(ms(300)); + Clock::time_point t0 = Clock::now(); + BOOST_TEST(f.get() == 3); + Clock::time_point t1 = Clock::now(); + BOOST_TEST(t1 - t0 < ms(300)); + std::cout << __FILE__ << "[" << __LINE__ << "] " << (t1 - t0).count() << std::endl; + } + catch (std::exception& ex) + { + std::cout << __FILE__ << "[" << __LINE__ << "]" << ex.what() << std::endl; + BOOST_TEST(false && "exception thrown"); + } + catch (...) + { + BOOST_TEST(false && "exception thrown"); + } + } +#endif std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; { try @@ -233,17 +265,43 @@ int main() } } +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS2 + std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; + { + try + { + boost::executor_adaptor ex(1); + boost::future f = boost::async(ex, A(3)); + boost::this_thread::sleep_for(ms(300)); + Clock::time_point t0 = Clock::now(); + BOOST_TEST(f.get() == 3); + Clock::time_point t1 = Clock::now(); + BOOST_TEST(t1 - t0 < ms(300)); + std::cout << __FILE__ << "[" << __LINE__ << "] " << (t1 - t0).count() << std::endl; + } + catch (std::exception& ex) + { + std::cout << __FILE__ << "[" << __LINE__ << "]" << ex.what() << std::endl; + BOOST_TEST(false && "exception thrown"); + } + catch (...) + { + BOOST_TEST(false && "exception thrown"); + } + + } +#endif std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; { try { boost::future f = boost::async(boost::launch::async, BOOST_THREAD_MAKE_RV_REF(MoveOnly())); - // boost::this_thread::sleep_for(ms(300)); - // Clock::time_point t0 = Clock::now(); - // BOOST_TEST(f.get() == 3); - // Clock::time_point t1 = Clock::now(); - // BOOST_TEST(t1 - t0 < ms(300)); - // std::cout << __FILE__ <<"["<<__LINE__<<"] "<< (t1 - t0).count() << std::endl; + boost::this_thread::sleep_for(ms(300)); + Clock::time_point t0 = Clock::now(); + BOOST_TEST(f.get() == 3); + Clock::time_point t1 = Clock::now(); + BOOST_TEST(t1 - t0 < ms(300)); + std::cout << __FILE__ <<"["<<__LINE__<<"] "<< (t1 - t0).count() << std::endl; } catch (std::exception& ex) { @@ -255,6 +313,33 @@ int main() BOOST_TEST(false && "exception thrown"); } } +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS2 + std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; + { + try + { + boost::executor_adaptor ex(1); + MoveOnly mo; + boost::future f = boost::async(ex, boost::move(mo)); + //boost::future f = boost::async(ex, MoveOnly()); + boost::this_thread::sleep_for(ms(300)); + Clock::time_point t0 = Clock::now(); + BOOST_TEST(f.get() == 3); + Clock::time_point t1 = Clock::now(); + BOOST_TEST(t1 - t0 < ms(300)); + std::cout << __FILE__ <<"["<<__LINE__<<"] "<< (t1 - t0).count() << std::endl; + } + catch (std::exception& ex) + { + std::cout << __FILE__ << "[" << __LINE__ << "]" << ex.what() << std::endl; + BOOST_TEST(false && "exception thrown"); + } + catch (...) + { + BOOST_TEST(false && "exception thrown"); + } + } +#endif std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; { try