diff --git a/example/executor.cpp b/example/executor.cpp index 3a1bac13..cd550dce 100644 --- a/example/executor.cpp +++ b/example/executor.cpp @@ -67,12 +67,13 @@ void submit_some(boost::executor& tp) } + void at_th_entry(boost::basic_thread_pool& ) { } -int main() +int test_executor_adaptor() { // std::cout << BOOST_CONTEXTOF << std::endl; { @@ -144,3 +145,9 @@ int main() // std::cout << BOOST_CONTEXTOF << std::endl; return 0; } + + +int main() +{ + return test_executor_adaptor(); +} diff --git a/example/generic_executor_ref.cpp b/example/generic_executor_ref.cpp new file mode 100644 index 00000000..4d2aa7f1 --- /dev/null +++ b/example/generic_executor_ref.cpp @@ -0,0 +1,156 @@ +// Copyright (C) 2014 Vicente Botet +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#if ! defined BOOST_NO_CXX11_DECLTYPE +#define BOOST_RESULT_OF_USE_DECLTYPE +#endif +#ifndef BOOST_NO_CXX11_DECLTYPE_N3276 +#define BOOST_THREAD_NO_CXX11_DECLTYPE_N3276 +#endif + +#define BOOST_THREAD_VERSION 4 +#define BOOST_THREAD_PROVIDES_EXECUTORS +//#define BOOST_THREAD_USES_LOG +#define BOOST_THREAD_USES_LOG_THREAD_ID +#define BOOST_THREAD_QUEUE_DEPRECATE_OLD + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +void p1() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + //boost::this_thread::sleep_for(boost::chrono::milliseconds(200)); +} + +void p2() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + //boost::this_thread::sleep_for(boost::chrono::seconds(10)); +} + +int f1() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + return 1; +} +int f2(int i) +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + boost::this_thread::sleep_for(boost::chrono::seconds(2)); + return i + 1; +} + +void submit_some(boost::generic_executor_ref tp) +{ + for (int i = 0; i < 3; ++i) { + tp.submit(&p2); + } + for (int i = 0; i < 3; ++i) { + tp.submit(&p1); + } + +} + +void at_th_entry(boost::basic_thread_pool& ) +{ + +} + + + +int test_generic_executor_ref() +{ + // std::cout << BOOST_CONTEXTOF << std::endl; + { + try + { + { + boost::basic_thread_pool ea(4); + submit_some( ea); + { + boost::future t1 = boost::async(ea, &f1); + boost::future t2 = boost::async(ea, &f1); + // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl; + // std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl; + } + submit_some(ea); + { + boost::basic_thread_pool ea3(1); + boost::future t1 = boost::async(ea3, &f1); + boost::future t2 = boost::async(ea3, &f1); + //boost::future t2 = boost::async(ea3, f2, 1); // todo this doesn't compiles yet on C++11 + //boost::future t2 = boost::async(ea3, boost::bind(f2, 1)); // todo this doesn't compiles yet on C++98 + // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl; + // std::cout << BOOST_CONTEXTOF << " t2= " << t2.get() << std::endl; + } + submit_some(ea); + } + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::loop_executor ea2; + submit_some( ea2); + ea2.run_queued_closures(); + } +#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::basic_thread_pool ea1(4); + boost::serial_executor ea2(ea1); + submit_some(ea2); + } +#endif + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::inline_executor ea1; + submit_some(ea1); + } + // std::cout << BOOST_CONTEXTOF << std::endl; + { + //boost::thread_executor ea1; + //submit_some(ea1); + } + // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::basic_thread_pool ea(4, at_th_entry); + boost::future t1 = boost::async(ea, &f1); + // std::cout << BOOST_CONTEXTOF << " t1= " << t1.get() << std::endl; + } + } + catch (std::exception& ex) + { + std::cout << "ERROR= " << ex.what() << "" << std::endl; + return 1; + } + catch (...) + { + std::cout << " ERROR= exception thrown" << std::endl; + return 2; + } + } + // std::cout << BOOST_CONTEXTOF << std::endl; + return 0; +} + + +int main() +{ + return test_generic_executor_ref(); + + +} diff --git a/example/producer_consumer.cpp b/example/producer_consumer.cpp index ba24336b..fc397a8b 100644 --- a/example/producer_consumer.cpp +++ b/example/producer_consumer.cpp @@ -87,25 +87,24 @@ void consumer2(the_ostream &mos, boost::sync_queue & sbq) mos << "exception !!!\n"; } } -//void consumer3(the_ostream &mos, boost::sync_queue & sbq) -//{ -// using namespace boost; -// bool closed=false; -// try { -// for(int i=0; ;++i) -// { -// int r; -// queue_op_status res = sbq.wait_and_pull(r); -// if (res==queue_op_status::closed) break; -// mos << i << " wait_and_pull(" << r << ")\n"; -// this_thread::sleep_for(chrono::milliseconds(250)); -// } -// } -// catch(...) -// { -// mos << "exception !!!\n"; -// } -//} +void consumer3(the_ostream &mos, boost::sync_queue & sbq) +{ + using namespace boost; + try { + for(int i=0; ;++i) + { + int r; + queue_op_status res = sbq.wait_pull_front(r); + if (res==queue_op_status::closed) break; + mos << i << " wait_pull_front(" << r << ")\n"; + this_thread::sleep_for(chrono::milliseconds(250)); + } + } + catch(...) + { + mos << "exception !!!\n"; + } +} int main() { diff --git a/example/producer_consumer2.cpp b/example/producer_consumer2.cpp new file mode 100644 index 00000000..7c4f1ded --- /dev/null +++ b/example/producer_consumer2.cpp @@ -0,0 +1,145 @@ +// (C) Copyright 2014 Vicente Botet +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// adapted from the example given by Howard Hinnant in + +#define BOOST_THREAD_VERSION 4 +//#define BOOST_THREAD_QUEUE_DEPRECATE_OLD + +#include +#include +#ifdef XXXX +#include + typedef boost::externally_locked_stream the_ostream; +#else + typedef std::ostream the_ostream; + typedef std::istream the_istream; +#endif +#include +#include +#include + +void producer(the_ostream &mos, boost::queue_back::type sbq) +{ + using namespace boost; + try { + for(int i=0; ;++i) + { + sbq.push(i); + //sbq << i; + mos << "push(" << i << ") "<< sbq.size()<<"\n"; + this_thread::sleep_for(chrono::milliseconds(200)); + } + } + catch(sync_queue_is_closed&) + { + mos << "closed !!!\n"; + } + catch(...) + { + mos << "exception !!!\n"; + } +} + +void consumer( + the_ostream &mos, + boost::queue_front::type sbq) +{ + using namespace boost; + try { + for(int i=0; ;++i) + { + int r; + sbq.pull(r); + //sbq >> r; + mos << i << " pull(" << r << ") "<< sbq.size()<<"\n"; + + this_thread::sleep_for(chrono::milliseconds(250)); + } + } + catch(sync_queue_is_closed&) + { + mos << "closed !!!\n"; + } + catch(...) + { + mos << "exception !!!\n"; + } +} +void consumer2(the_ostream &mos, boost::queue_front::type sbq) +{ + using namespace boost; + try { + for(int i=0; ;++i) + { + int r; + queue_op_status st = sbq.try_pull(r); + if (queue_op_status::closed == st) break; + if (queue_op_status::success == st) { + mos << i << " try_pull(" << r << ")\n"; + } + this_thread::sleep_for(chrono::milliseconds(250)); + } + } + catch(...) + { + mos << "exception !!!\n"; + } +} +void consumer3(the_ostream &mos, boost::queue_front::type sbq) +{ + using namespace boost; + try { + for(int i=0; ;++i) + { + int r; + queue_op_status res = sbq.wait_pull(r); + if (res==queue_op_status::closed) break; + mos << i << " wait_pull(" << r << ")\n"; + this_thread::sleep_for(chrono::milliseconds(250)); + } + } + catch(...) + { + mos << "exception !!!\n"; + } +} + +int main() +{ + using namespace boost; + +#ifdef XXXX + recursive_mutex terminal_mutex; + + externally_locked_stream mcerr(std::cerr, terminal_mutex); + externally_locked_stream mcout(std::cout, terminal_mutex); + externally_locked_stream mcin(std::cin, terminal_mutex); +#else + the_ostream &mcerr = std::cout; + the_ostream &mcout = std::cerr; + //the_istream &mcin = std::cin; +#endif + + queue_adaptor > sbq; + //sync_queue sbq; + + { + mcout << "begin of main" << std::endl; + scoped_thread<> t11(boost::thread(producer, boost::ref(mcerr), concurrent::queue_back::type(sbq))); + scoped_thread<> t12(boost::thread(producer, boost::ref(mcerr), concurrent::queue_back::type(sbq))); + scoped_thread<> t2(boost::thread(consumer, boost::ref(mcout), concurrent::queue_front::type(sbq))); + + this_thread::sleep_for(chrono::seconds(1)); + + mcout << "closed()" << std::endl; + sbq.close(); + mcout << "closed()" << std::endl; + + } // all threads joined here. + mcout << "end of main" << std::endl; + return 0; +} + diff --git a/include/boost/thread/concurrent_queues/queue_adaptor.hpp b/include/boost/thread/concurrent_queues/queue_adaptor.hpp new file mode 100644 index 00000000..3259c284 --- /dev/null +++ b/include/boost/thread/concurrent_queues/queue_adaptor.hpp @@ -0,0 +1,75 @@ +#ifndef BOOST_THREAD_QUEUE_ADAPTOR_HPP +#define BOOST_THREAD_QUEUE_ADAPTOR_HPP + +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Vicente J. Botet Escriba 2014. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/thread for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include + +#include + +namespace boost +{ +namespace concurrent +{ + + template + class queue_adaptor : public queue_base + { + Queue queue; + public: + typedef typename Queue::value_type value_type; + typedef std::size_t size_type; + + // Constructors/Assignment/Destructors + + queue_adaptor() {} + + // Observers + bool empty() const { return queue.empty(); } + bool full() const { return queue.full(); } + size_type size() const { return queue.size(); } + bool closed() const { return queue.closed(); } + + // Modifiers + void close() { queue.close(); } + + void push_back(const value_type& x) { queue.push_back(x); } + void push_back(BOOST_THREAD_RV_REF(value_type) x) { queue.push_back(boost::forward(x)); } + + void pull_front(value_type& x) { queue.pull_front(x); }; + // enable_if is_nothrow_copy_movable + value_type pull_front() { return queue.pull_front(); } + + queue_op_status try_push_back(const value_type& x) { return queue.try_push_back(x); } + queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x) { return queue.try_push_back(boost::forward(x)); } + queue_op_status try_pull_front(value_type& x) { return queue.try_pull_front(x); } + + queue_op_status nonblocking_push_back(const value_type& x) { return queue.nonblocking_push_back(x); } + queue_op_status nonblocking_push_back(BOOST_THREAD_RV_REF(value_type) x) { return queue.nonblocking_push_back(boost::forward(x)); } + queue_op_status nonblocking_pull_front(value_type& x) { return queue.nonblocking_pull_front(x); } + + queue_op_status wait_push_back(const value_type& x) { return queue.wait_push_back(x); } + queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x) { return queue.wait_push_back(boost::forward(x)); } + queue_op_status wait_pull_front(value_type& x) { return queue.wait_pull_front(x); } + + }; + +} +using concurrent::queue_adaptor; + +} + +#include + +#endif diff --git a/include/boost/thread/concurrent_queues/queue_base.hpp b/include/boost/thread/concurrent_queues/queue_base.hpp new file mode 100755 index 00000000..68e3f9e8 --- /dev/null +++ b/include/boost/thread/concurrent_queues/queue_base.hpp @@ -0,0 +1,73 @@ +#ifndef BOOST_THREAD_QUEUE_BASE_HPP +#define BOOST_THREAD_QUEUE_BASE_HPP + +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Vicente J. Botet Escriba 2014. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/thread for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include + +#include + +namespace boost +{ +namespace concurrent +{ + + template + class queue_base + { + public: + typedef ValueType value_type; + typedef std::size_t size_type; + + // Constructors/Assignment/Destructors + virtual ~queue_base() {}; + + // Observers + virtual bool empty() const = 0; + virtual bool full() const = 0; + virtual size_type size() const = 0; + virtual bool closed() const = 0; + + // Modifiers + virtual void close() = 0; + + virtual void push_back(const value_type& x) = 0; + virtual void push_back(BOOST_THREAD_RV_REF(value_type) x) = 0; + + virtual void pull_front(value_type&) = 0; + // enable_if is_nothrow_copy_movable + virtual value_type pull_front() = 0; + + virtual queue_op_status try_push_back(const value_type& x) = 0; + virtual queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x) = 0; + virtual queue_op_status try_pull_front(value_type&) = 0; + + virtual queue_op_status nonblocking_push_back(const value_type& x) = 0; + virtual queue_op_status nonblocking_push_back(BOOST_THREAD_RV_REF(value_type) x) = 0; + virtual queue_op_status nonblocking_pull_front(value_type&) = 0; + + virtual queue_op_status wait_push_back(const value_type& x) = 0; + virtual queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x) = 0; + virtual queue_op_status wait_pull_front(ValueType& elem) = 0; + + }; + + +} +using concurrent::queue_base; + +} + +#include + +#endif diff --git a/include/boost/thread/concurrent_queues/queue_op_status.hpp b/include/boost/thread/concurrent_queues/queue_op_status.hpp new file mode 100644 index 00000000..abc3229d --- /dev/null +++ b/include/boost/thread/concurrent_queues/queue_op_status.hpp @@ -0,0 +1,40 @@ +#ifndef BOOST_THREAD_QUEUE_OP_STATUS_HPP +#define BOOST_THREAD_QUEUE_OP_STATUS_HPP + +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Vicente J. Botet Escriba 2014. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/thread for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#include +#include + +#include + +namespace boost +{ +namespace concurrent +{ + + BOOST_SCOPED_ENUM_DECLARE_BEGIN(queue_op_status) + { success = 0, empty, full, closed, busy } + BOOST_SCOPED_ENUM_DECLARE_END(queue_op_status) +} + +#ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD + struct no_block_tag{}; + BOOST_CONSTEXPR_OR_CONST no_block_tag no_block = {}; +#endif + +using concurrent::queue_op_status; + +} + +#include + +#endif diff --git a/include/boost/thread/concurrent_queues/queue_views.hpp b/include/boost/thread/concurrent_queues/queue_views.hpp new file mode 100644 index 00000000..ec488b38 --- /dev/null +++ b/include/boost/thread/concurrent_queues/queue_views.hpp @@ -0,0 +1,157 @@ +#ifndef BOOST_THREAD_QUEUE_VIEWS_HPP +#define BOOST_THREAD_QUEUE_VIEWS_HPP + +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Vicente J. Botet Escriba 2014. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/thread for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include + +#include + +namespace boost +{ +namespace concurrent +{ + + template + class queue_back_view + { + Queue& queue; + public: + typedef typename Queue::value_type value_type; + typedef typename Queue::size_type size_type; + + // Constructors/Assignment/Destructors + queue_back_view(Queue& q) BOOST_NOEXCEPT : queue(q) {} + + // Observers + bool empty() const { return queue.empty(); } + bool full() const { return queue.full(); } + size_type size() const { return queue.size(); } + bool closed() const { return queue.closed(); } + + // Modifiers + void close() { queue.close(); } + + void push(const value_type& x) { queue.push_back(x); } + void push(BOOST_THREAD_RV_REF(value_type) x) { queue.push_back(forward(x)); } + + void pull(value_type& x) { queue.pull_back(x); } + // enable_if is_nothrow_copy_movable + value_type pull() { return queue.pull_back(); } + + queue_op_status try_push(const value_type& x) { return queue.try_push_back(x); } + queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x) { return queue.try_push_back(forward(x)); } + + queue_op_status try_pull(value_type& x) { return queue.try_pull_back(x); } + + queue_op_status nonblocking_push(const value_type& x) { return queue.nonblocking_push_back(x); } + queue_op_status nonblocking_push(BOOST_THREAD_RV_REF(value_type) x) { return queue.nonblocking_push_back(forward(x)); } + + queue_op_status nonblocking_pull(value_type& x) { return queue.nonblocking_pull_back(x); } + + queue_op_status wait_push(const value_type& x) { return queue.wait_push_back(x); } + queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x) { return queue.wait_push_back(forward(x)); } + queue_op_status wait_pull_front(value_type& x) { return queue.wait_pull_back(x); } + + }; + + template + class queue_front_view + { + Queue& queue; + public: + typedef typename Queue::value_type value_type; + typedef typename Queue::size_type size_type; + + // Constructors/Assignment/Destructors + queue_front_view(Queue& q) BOOST_NOEXCEPT : queue(q) {} + + // Observers + bool empty() const { return queue.empty(); } + bool full() const { return queue.full(); } + size_type size() const { return queue.size(); } + bool closed() const { return queue.closed(); } + + // Modifiers + void close() { queue.close(); } + + void push(const value_type& x) { queue.push_front(x); } + void push(BOOST_THREAD_RV_REF(value_type) x) { queue.push_front(forward(x)); } + + void pull(value_type& x) { queue.pull_front(x); }; + // enable_if is_nothrow_copy_movable + value_type pull() { return queue.pull_front(); } + + queue_op_status try_push(const value_type& x) { return queue.try_push_front(x); } + queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x) { return queue.try_push_front(forward(x)); } + + queue_op_status try_pull(value_type& x) { return queue.try_pull_front(x); } + + queue_op_status nonblocking_push(const value_type& x) { return queue.nonblocking_push_front(x); } + queue_op_status nonblocking_push(BOOST_THREAD_RV_REF(value_type) x) { return queue.nonblocking_push_front(forward(x)); } + + queue_op_status nonblocking_pull(value_type& x) { return queue.nonblocking_pull_front(x); } + + queue_op_status wait_push(const value_type& x) { return queue.wait_push_front(x); } + queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x) { return queue.wait_push_front(forward(x)); } + queue_op_status wait_pull(value_type& x) { return queue.wait_pull_front(x); } + + }; + +#if 0 + template + using queue_back = queue_back_view > ; + template + using queue_front = queue_front_view > ; +#else + template + struct queue_back { + typedef queue_back_view > type; + }; + template + struct queue_front { + typedef queue_front_view > type; + }; + +#endif + +// template +// queue_back_view back(Queue & q) { return queue_back_view(q); } +// template +// queue_front_view front(Queue & q) { return queue_front_view(q); } +//#if 0 +// template +// queue_back back(queue_base & q) { return queue_back(q); } +// template +// queue_front front(queue_base & q) { return queue_front(q); } +//#else +// template +// typename queue_back::type back(queue_base & q) { return typename queue_back::type(q); } +// template +// typename queue_front::type front(queue_base & q) { return typename queue_front::type(q); } +//#endif +} + +using concurrent::queue_back_view; +using concurrent::queue_front_view; +using concurrent::queue_back; +using concurrent::queue_front; +//using concurrent::back; +//using concurrent::front; + +} + +#include + +#endif diff --git a/include/boost/thread/executors/basic_thread_pool.hpp b/include/boost/thread/executors/basic_thread_pool.hpp index df350d6e..9f477e6b 100644 --- a/include/boost/thread/executors/basic_thread_pool.hpp +++ b/include/boost/thread/executors/basic_thread_pool.hpp @@ -264,33 +264,20 @@ namespace executors template void submit(Closure & closure) { - //work w ((closure)); - //work_queue.push_back(boost::move(w)); - work_queue.push_back(work(closure)); // todo check why this doesn't work + work_queue.push_back(work(closure)); } #endif void submit(void (*closure)()) { - //work w ((closure)); - //work_queue.push_back(boost::move(w)); - work_queue.push_back(work(closure)); // todo check why this doesn't work + work_queue.push_back(work(closure)); } -#if 0 template void submit(BOOST_THREAD_RV_REF(Closure) closure) - { - work w = boost::move(closure); - work_queue.push_back(boost::move(w)); - //work_queue.push_back(work(boost::move(closure))); // todo check why this doesn't work - } -#else - template - void submit(BOOST_THREAD_FWD_REF(Closure) closure) { work_queue.push_back(work(boost::forward(closure))); } -#endif + /** * \b Requires: This must be called from an scheduled task. * diff --git a/include/boost/thread/executors/executor.hpp b/include/boost/thread/executors/executor.hpp index 3faad3d2..b66b378c 100644 --- a/include/boost/thread/executors/executor.hpp +++ b/include/boost/thread/executors/executor.hpp @@ -32,43 +32,54 @@ namespace boost executor() {} /** - * \b Effects: Destroys the executor. + * \par Effects + * Destroys the executor. * - * \b Synchronization: The completion of all the closures happen before the completion of the executor destructor. + * \par Synchronization + * The completion of all the closures happen before the completion of the executor destructor. */ virtual ~executor() {}; /** - * \b Effects: close the \c executor for submissions. + * \par Effects + * Close the \c executor for submissions. * The worker threads will work until there is no more closures to run. */ virtual void close() = 0; /** - * \b Returns: whether the pool is closed for submissions. + * \par Returns + * Whether the pool is closed for submissions. */ virtual bool closed() = 0; /** - * \b Effects: The specified closure will be scheduled for execution at some point in the future. + * \par Effects + * The specified closure will be scheduled for execution at some point in the future. * If invoked closure throws an exception the executor will call std::terminate, as is the case with threads. * - * \b Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables. + * \par Synchronization + * Ccompletion of closure on a particular thread happens before destruction of thread's thread local variables. * - * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * \par Throws + * \c sync_queue_is_closed if the thread pool is closed. * Whatever exception that can be throw while storing the closure. */ virtual void submit(BOOST_THREAD_RV_REF(work) closure) = 0; /** - * \b Requires: \c Closure is a model of Callable(void()) and a model of CopyConstructible/MoveConstructible. + * \par Requires + * \c Closure is a model of Callable(void()) and a model of CopyConstructible/MoveConstructible. * - * \b Effects: The specified closure will be scheduled for execution at some point in the future. + * \par Effects + * The specified closure will be scheduled for execution at some point in the future. * If invoked closure throws an exception the thread pool will call std::terminate, as is the case with threads. * - * \b Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables. + * \par Synchronization + * Completion of closure on a particular thread happens before destruction of thread's thread local variables. * - * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * \par Throws + * \c sync_queue_is_closed if the thread pool is closed. * Whatever exception that can be throw while storing the closure. */ @@ -94,16 +105,23 @@ namespace boost } /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. + * \par Effects + * Try to execute one task. + * + * \par Returns + * Whether a task has been executed. + * + * \par Throws + * Whatever the current task constructor throws or the task() throws. */ virtual bool try_executing_one() = 0; /** - * \b Requires: This must be called from an scheduled task. + * \par Requires + * This must be called from an scheduled task. * - * \b Effects: reschedule functions until pred() + * \par Effects + * Reschedule functions until pred() */ template bool reschedule_until(Pred const& pred) @@ -119,7 +137,6 @@ namespace boost } }; - } using executors::executor; } diff --git a/include/boost/thread/executors/executor_adaptor.hpp b/include/boost/thread/executors/executor_adaptor.hpp index 28042300..ef21cf2d 100644 --- a/include/boost/thread/executors/executor_adaptor.hpp +++ b/include/boost/thread/executors/executor_adaptor.hpp @@ -97,39 +97,26 @@ namespace executors */ void submit(BOOST_THREAD_RV_REF(work) closure) { return ex.submit(boost::move(closure)); - //return ex.submit(boost::forward(closure)); + //return ex.submit(boost::forward(closure)); // todo check why this doesn't work } #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) template void submit(Closure & closure) { - work w ((closure)); - submit(boost::move(w)); - //submit(work(closure)); + submit(work(closure)); } #endif void submit(void (*closure)()) { - work w ((closure)); - submit(boost::move(w)); - //submit(work(closure)); + submit(work(closure)); } -#if 0 template void submit(BOOST_THREAD_RV_REF(Closure) closure) - { - work w =boost::move(closure); - submit(boost::move(w)); - } -#else - template - void submit(BOOST_THREAD_FWD_REF(Closure) closure) { submit(work(boost::forward(closure))); } -#endif /** * Effects: try to execute one task. diff --git a/include/boost/thread/executors/generic_executor_ref.hpp b/include/boost/thread/executors/generic_executor_ref.hpp new file mode 100644 index 00000000..f7e09f26 --- /dev/null +++ b/include/boost/thread/executors/generic_executor_ref.hpp @@ -0,0 +1,209 @@ +// Copyright (C) 2014 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_THREAD_EXECUTORS_GENERIC_EXECUTOR_REF_HPP +#define BOOST_THREAD_EXECUTORS_GENERIC_EXECUTOR_REF_HPP + +#include + +#include +#include +#include + +#include + +#include + +namespace boost +{ + namespace executors + { + + template + class executor_ref : public executor + { + Executor& ex; + public: + /// type-erasure to store the works to do + typedef executors::work work; + + /// executor is not copyable. + BOOST_THREAD_NO_COPYABLE(executor_ref) + executor_ref(Executor& ex) : ex(ex) {} + + /** + * \par Effects + * Destroys the executor. + * + * \par Synchronization + * The completion of all the closures happen before the completion of the executor destructor. + */ + ~executor_ref() {}; + + /** + * \par Effects + * Close the \c executor for submissions. + * The worker threads will work until there is no more closures to run. + */ + void close() { ex.close(); } + + /** + * \par Returns + * Whether the pool is closed for submissions. + */ + bool closed() { return ex.closed(); } + + /** + * \par Effects + * The specified closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the executor will call std::terminate, as is the case with threads. + * + * \par Synchronization + * Ccompletion of closure on a particular thread happens before destruction of thread's thread local variables. + * + * \par Throws + * \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + void submit(BOOST_THREAD_RV_REF(work) closure) { + //ex.submit(boost::forward(closure)); // todo check why this doesn't work + ex.submit(boost::move(closure)); + } + + + /** + * \par Effects + * Try to execute one task. + * + * \par Returns + * Whether a task has been executed. + * + * \par Throws + * Whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() { return ex.try_executing_one(); } + + }; + + class generic_executor_ref + { + shared_ptr ex; + public: + /// type-erasure to store the works to do + typedef executors::work work; + + template + generic_executor_ref(Executor& ex) + //: ex(make_shared >(ex)) // todo check why this doesn't works with C++03 + : ex( new executor_ref(ex) ) + { + } + + //generic_executor_ref(generic_executor_ref const& other) noexcept {} + //generic_executor_ref& operator=(generic_executor_ref const& other) noexcept {} + + + /** + * \par Effects + * Close the \c executor for submissions. + * The worker threads will work until there is no more closures to run. + */ + void close() { ex->close(); } + + /** + * \par Returns + * Whether the pool is closed for submissions. + */ + bool closed() { return ex->closed(); } + + void submit(BOOST_THREAD_RV_REF(work) closure) + { + ex->submit(boost::forward(closure)); + } + + /** + * \par Requires + * \c Closure is a model of Callable(void()) and a model of CopyConstructible/MoveConstructible. + * + * \par Effects + * The specified closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the thread pool will call std::terminate, as is the case with threads. + * + * \par Synchronization + * Completion of closure on a particular thread happens before destruction of thread's thread local variables. + * + * \par Throws + * \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + +#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + work w ((closure)); + submit(boost::move(w)); + } +#endif + void submit(void (*closure)()) + { + work w ((closure)); + submit(boost::move(w)); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + work w = boost::move(closure); + submit(boost::move(w)); + } + +// size_t num_pending_closures() const +// { +// return ex->num_pending_closures(); +// } + + /** + * \par Effects + * Try to execute one task. + * + * \par Returns + * Whether a task has been executed. + * + * \par Throws + * Whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() { return ex->try_executing_one(); } + + /** + * \par Requires + * This must be called from an scheduled task. + * + * \par Effects + * reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& pred) + { + do { + //schedule_one_or_yield(); + if ( ! try_executing_one()) + { + return false; + } + } while (! pred()); + return true; + } + + }; + } + using executors::executor_ref; + using executors::generic_executor_ref; +} + +#include + +#endif diff --git a/include/boost/thread/executors/loop_executor.hpp b/include/boost/thread/executors/loop_executor.hpp index ae84a6a9..a4dd3bab 100644 --- a/include/boost/thread/executors/loop_executor.hpp +++ b/include/boost/thread/executors/loop_executor.hpp @@ -148,24 +148,18 @@ namespace executors template void submit(Closure & closure) { - work w ((closure)); - work_queue.push_back(boost::move(w)); - //work_queue.push(work(closure)); // todo check why this doesn't work + work_queue.push_back(work(closure)); } #endif void submit(void (*closure)()) { - work w ((closure)); - work_queue.push_back(boost::move(w)); - //work_queue.push_back(work(closure)); // todo check why this doesn't work + work_queue.push_back(work(closure)); } template void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work w =boost::move(closure); - work_queue.push_back(boost::move(w)); - //work_queue.push_back(work(boost::move(closure))); // todo check why this doesn't work + work_queue.push_back(work(boost::forward(closure))); } /** diff --git a/include/boost/thread/executors/serial_executor.hpp b/include/boost/thread/executors/serial_executor.hpp index 5aaaacf9..8f1ea575 100644 --- a/include/boost/thread/executors/serial_executor.hpp +++ b/include/boost/thread/executors/serial_executor.hpp @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include #include @@ -34,7 +34,7 @@ namespace executors /// the thread safe work queue sync_queue work_queue; - executor& ex; + generic_executor_ref ex; thread_t thr; struct try_executing_one_task { @@ -48,6 +48,12 @@ namespace executors } }; public: + /** + * \par Returns + * The underlying executor wrapped on a generic executor reference. + */ + generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; } + /** * Effects: try to execute one task. * Returns: whether a task has been executed. @@ -118,7 +124,8 @@ namespace executors * * \b Throws: Whatever exception is thrown while initializing the needed resources. */ - serial_executor(executor& ex) + template + serial_executor(Executor& ex) : ex(ex), thr(&serial_executor::worker_thread, this) { } @@ -166,24 +173,18 @@ namespace executors template void submit(Closure & closure) { - work w ((closure)); - work_queue.push_back(boost::move(w)); - //work_queue.push(work(closure)); // todo check why this doesn't work + work_queue.push_back(work(closure)); } #endif void submit(void (*closure)()) { - work w ((closure)); - work_queue.push_back(boost::move(w)); - //work_queue.push_back(work(closure)); // todo check why this doesn't work + work_queue.push_back(work(closure)); } template void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work w =boost::move(closure); - work_queue.push_back(boost::move(w)); - //work_queue.push_back(work(boost::move(closure))); // todo check why this doesn't work + work_queue.push_back(work(boost::forward(closure))); } /** diff --git a/include/boost/thread/sync_bounded_queue.hpp b/include/boost/thread/sync_bounded_queue.hpp index 3e3958da..7e8c805c 100644 --- a/include/boost/thread/sync_bounded_queue.hpp +++ b/include/boost/thread/sync_bounded_queue.hpp @@ -3,7 +3,7 @@ ////////////////////////////////////////////////////////////////////////////// // -// (C) Copyright Vicente J. Botet Escriba 2013. Distributed under the Boost +// (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost // Software License, Version 1.0. (See accompanying file // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // @@ -16,6 +16,8 @@ #include #include #include +#include + #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD #include #include @@ -25,14 +27,6 @@ namespace boost { - BOOST_SCOPED_ENUM_DECLARE_BEGIN(queue_op_status) - { success = 0, empty, full, closed, busy } - BOOST_SCOPED_ENUM_DECLARE_END(queue_op_status) - -#ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD - struct no_block_tag{}; - BOOST_CONSTEXPR_OR_CONST no_block_tag no_block = {}; -#endif struct sync_queue_is_closed : std::exception { diff --git a/include/boost/thread/sync_queue.hpp b/include/boost/thread/sync_queue.hpp index 489cd252..c5704e2d 100644 --- a/include/boost/thread/sync_queue.hpp +++ b/include/boost/thread/sync_queue.hpp @@ -3,7 +3,7 @@ ////////////////////////////////////////////////////////////////////////////// // -// (C) Copyright Vicente J. Botet Escriba 2013. Distributed under the Boost +// (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost // Software License, Version 1.0. (See accompanying file // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // @@ -15,6 +15,8 @@ #include #include #include +#include + #include #include #include diff --git a/test/Jamfile.v2 b/test/Jamfile.v2 index 4fd56c11..1092a2f0 100644 --- a/test/Jamfile.v2 +++ b/test/Jamfile.v2 @@ -849,6 +849,8 @@ rule thread-compile ( sources : reqs * : name ) #[ thread-run test_9720.cpp ] #[ thread-run test_10125.cpp ] #[ thread-run test_10128.cpp ] + #[ thread-run2 ../example/producer_consumer2.cpp : ex_producer_consumer2 ] + [ thread-run2 ../example/generic_executor_ref.cpp : generic_executor_ref ] ; }