diff --git a/example/executor.cpp b/example/executor.cpp index 7f6b1ce3..f8e9eb14 100644 --- a/example/executor.cpp +++ b/example/executor.cpp @@ -98,6 +98,10 @@ int test_executor_adaptor() submit_some(ea); } // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::loop_executor e1; + boost::loop_executor e2 = e1; + } { boost::executor_adaptor < boost::loop_executor > ea2; submit_some( ea2); @@ -112,11 +116,19 @@ int test_executor_adaptor() } #endif // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::inline_executor e1; + boost::inline_executor e2 = e1; + } { boost::executor_adaptor < boost::inline_executor > ea1; submit_some(ea1); } // std::cout << BOOST_CONTEXTOF << std::endl; + { + boost::thread_executor e1; + boost::thread_executor e2 = e1; + } { boost::executor_adaptor < boost::thread_executor > ea1; submit_some(ea1); diff --git a/include/boost/thread/executors/inline_executor.hpp b/include/boost/thread/executors/inline_executor.hpp index 5dd52318..c975bd83 100644 --- a/include/boost/thread/executors/inline_executor.hpp +++ b/include/boost/thread/executors/inline_executor.hpp @@ -25,142 +25,227 @@ namespace executors public: /// type-erasure to store the works to do typedef executors::work work; - bool closed_; - mutable mutex mtx_; - /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. - */ - bool try_executing_one() - { - return false; - } + private: + struct shared_state { + typedef executors::work work; + + bool closed_; + mutable mutex mtx_; + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return false; + } + + /// shared_state is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a inline executor that runs closures immediately. + * + * \b Throws: Nothing. + */ + shared_state() + : closed_(false) + { + } + /** + * \b Effects: Destroys the inline executor. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c inline_executor destructor. + */ + ~shared_state() + { + // signal to all the worker thread that there will be no more submissions. + close(); + } + + /** + * \b Effects: close the \c inline_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + lock_guard lk(mtx_); + closed_ = true; + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed(lock_guard& ) const + { + return closed_; + } + bool closed() const + { + lock_guard lk(mtx_); + return closed(lk); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } + } + #endif + void submit(void (*closure)()) + { + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } + } + + template + void submit(BOOST_THREAD_FWD_REF(Closure) closure) + { + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + } + try + { + closure(); + } + catch (...) + { + std::terminate(); + return; + } + } + + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& ) + { + return false; + } + }; public: - /// inline_executor is not copyable. - BOOST_THREAD_NO_COPYABLE(inline_executor) + /** + * \b Effects: creates a inline executor that runs closures immediately. + * + * \b Throws: Nothing. + */ + inline_executor() + : pimpl(make_shared()) + { + } - /** - * \b Effects: creates a inline executor that runs closures immediately. - * - * \b Throws: Nothing. - */ - inline_executor() - : closed_(false) - { - } - /** - * \b Effects: Destroys the inline executor. - * - * \b Synchronization: The completion of all the closures happen before the completion of the \c inline_executor destructor. - */ - ~inline_executor() - { - // signal to all the worker thread that there will be no more submissions. - close(); - } + /** + * \b Effects: close the \c inline_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + pimpl->close(); + } - /** - * \b Effects: close the \c inline_executor for submissions. - * The loop will work until there is no more closures to run. - */ - void close() - { - lock_guard lk(mtx_); - closed_ = true; - } + /** + * \b Returns: whether the executor is closed for submissions. + */ + bool closed() const + { + return pimpl->closed(); + } - /** - * \b Returns: whether the pool is closed for submissions. - */ - bool closed(lock_guard& ) - { - return closed_; - } - bool closed() - { - lock_guard lk(mtx_); - return closed(lk); - } - - /** - * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. - * - * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. - * If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads. - * - * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. - * - * \b Throws: \c sync_queue_is_closed if the thread pool is closed. - * Whatever exception that can be throw while storing the closure. - */ + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c inline_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) - template - void submit(Closure & closure) - { - { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - } - try - { - closure(); - } - catch (...) - { - std::terminate(); - return; - } - } + template + void submit(Closure & closure) + { + pimpl->submit(closure); + } #endif - void submit(void (*closure)()) - { - { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - } - try - { - closure(); - } - catch (...) - { - std::terminate(); - return; - } - } + void submit(void (*closure)()) + { + pimpl->submit(closure); + } - template - void submit(BOOST_THREAD_FWD_REF(Closure) closure) - { - { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - } - try - { - closure(); - } - catch (...) - { - std::terminate(); - return; - } - } + template + void submit(BOOST_THREAD_FWD_REF(Closure) closure) + { + pimpl->submit(boost::forward(closure)); + } - /** - * \b Requires: This must be called from an scheduled task. - * - * \b Effects: reschedule functions until pred() - */ - template - bool reschedule_until(Pred const& ) - { - return false; - } + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return pimpl->try_executing_one(); + } + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& p) + { + return pimpl->reschedule_until(p); + } + private: + shared_ptr pimpl; }; } using executors::inline_executor; diff --git a/include/boost/thread/executors/loop_executor.hpp b/include/boost/thread/executors/loop_executor.hpp index e9eadadf..a8ca3758 100644 --- a/include/boost/thread/executors/loop_executor.hpp +++ b/include/boost/thread/executors/loop_executor.hpp @@ -30,59 +30,170 @@ namespace executors /// type-erasure to store the works to do typedef executors::work work; private: - /// the thread safe work queue - concurrent::sync_queue work_queue; - public: - /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. - */ - bool try_executing_one() - { - work task; - try + struct shared_state { + typedef executors::work work; + /// the thread safe work queue + concurrent::sync_queue work_queue; + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() { - if (work_queue.try_pull(task) == queue_op_status::success) + work task; + try { + if (work_queue.try_pull(task) == queue_op_status::success) + { + task(); + return true; + } + return false; + } + catch (...) + { + std::terminate(); + return false; + } + } + /** + * Effects: schedule one task or yields + * Throws: whatever the current task constructor throws or the task() throws. + */ + void schedule_one_or_yield() + { + if ( ! try_executing_one()) + { + this_thread::yield(); + } + } + + /// loop_executor is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + shared_state() + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor. + */ + ~shared_state() + { + // signal to all the worker thread that there will be no more submissions. + close(); + } + + /** + * The main loop of the worker thread + */ + void loop() + { + while (!closed()) + { + schedule_one_or_yield(); + } + while (try_executing_one()) + { + } + } + + /** + * \b Effects: close the \c loop_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + work_queue.close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return work_queue.closed(); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c loop_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + work_queue.push(work(closure)); + } + #endif + void submit(void (*closure)()) + { + work_queue.push(work(closure)); + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + work_queue.push(work(boost::forward(closure))); + } + + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& pred) + { + do { + if ( ! try_executing_one()) + { + return false; + } + } while (! pred()); + return true; + } + + /** + * run queued closures + */ + void run_queued_closures() + { + sync_queue::underlying_queue_type q = work_queue.underlying_queue(); + while (! q.empty()) + { + work& task = q.front(); task(); - return true; + q.pop_front(); } - return false; } - catch (...) - { - std::terminate(); - return false; - } - } - private: - /** - * Effects: schedule one task or yields - * Throws: whatever the current task constructor throws or the task() throws. - */ - void schedule_one_or_yield() - { - if ( ! try_executing_one()) - { - this_thread::yield(); - } - } - - - + }; public: - /// loop_executor is not copyable. - BOOST_THREAD_NO_COPYABLE(loop_executor) - /** * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. * * \b Throws: Whatever exception is thrown while initializing the needed resources. */ loop_executor() + : pimpl(make_shared()) { } /** @@ -92,22 +203,33 @@ namespace executors */ ~loop_executor() { - // signal to all the worker thread that there will be no more submissions. - close(); } + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return pimpl->try_executing_one(); + } +// /** +// * Effects: schedule one task or yields +// * Throws: whatever the current task constructor throws or the task() throws. +// */ +// void schedule_one_or_yield() +// { +// return pimpl->schedule_one_or_yield(); +// } + + /** * The main loop of the worker thread */ void loop() { - while (!closed()) - { - schedule_one_or_yield(); - } - while (try_executing_one()) - { - } + pimpl->loop(); } /** @@ -116,7 +238,7 @@ namespace executors */ void close() { - work_queue.close(); + pimpl->close(); } /** @@ -124,7 +246,7 @@ namespace executors */ bool closed() { - return work_queue.closed(); + return pimpl->closed(); } /** @@ -143,18 +265,18 @@ namespace executors template void submit(Closure & closure) { - work_queue.push(work(closure)); + pimpl->submit(closure); } #endif void submit(void (*closure)()) { - work_queue.push(work(closure)); + pimpl->submit(closure); } template void submit(BOOST_THREAD_RV_REF(Closure) closure) { - work_queue.push(work(boost::forward(closure))); + pimpl->submit(boost::forward(closure)); } /** @@ -165,13 +287,7 @@ namespace executors template bool reschedule_until(Pred const& pred) { - do { - if ( ! try_executing_one()) - { - return false; - } - } while (! pred()); - return true; + return pimpl->reschedule_until(pred); } /** @@ -179,15 +295,10 @@ namespace executors */ void run_queued_closures() { - sync_queue::underlying_queue_type q = work_queue.underlying_queue(); - while (! q.empty()) - { - work& task = q.front(); - task(); - q.pop_front(); - } + pimpl->run_queued_closures(); } - + private: + shared_ptr pimpl; }; } using executors::loop_executor; diff --git a/include/boost/thread/executors/thread_executor.hpp b/include/boost/thread/executors/thread_executor.hpp index a8cd5c21..eb57adfa 100644 --- a/include/boost/thread/executors/thread_executor.hpp +++ b/include/boost/thread/executors/thread_executor.hpp @@ -29,33 +29,134 @@ namespace executors public: /// type-erasure to store the works to do typedef executors::work work; - bool closed_; - typedef scoped_thread<> thread_t; - typedef csbl::vector threads_type; - threads_type threads_; - mutable mutex mtx_; + private: - /** - * Effects: try to execute one task. - * Returns: whether a task has been executed. - * Throws: whatever the current task constructor throws or the task() throws. - */ - bool try_executing_one() - { - return false; - } + struct shared_state { + typedef executors::work work; + bool closed_; + typedef scoped_thread<> thread_t; + typedef csbl::vector threads_type; + threads_type threads_; + mutable mutex mtx_; + /// thread_executor is not copyable. + BOOST_THREAD_NO_COPYABLE(shared_state) + + /** + * \b Effects: creates a inline executor that runs closures immediately. + * + * \b Throws: Nothing. + */ + shared_state() + : closed_(false) + { + } + /** + * \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor. + */ + ~shared_state() + { + // signal to all the worker thread that there will be no more submissions. + close(); + // all the scoped threads will join before destroying + } + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return false; + } + + /** + * \b Effects: close the \c thread_executor for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + lock_guard lk(mtx_); + closed_ = true; + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed(lock_guard& ) + { + return closed_; + } + bool closed() + { + lock_guard lk(mtx_); + return closed(lk); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c thread_executor will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + + #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); + thread th(closure); + threads_.push_back(thread_t(boost::move(th))); + } + #endif + void submit(void (*closure)()) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); + thread th(closure); + threads_.push_back(thread_t(boost::move(th))); + } + + template + void submit(BOOST_THREAD_FWD_REF(Closure) closure) + { + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + threads_.reserve(threads_.size() + 1); + thread th(boost::forward(closure)); + threads_.push_back(thread_t(boost::move(th))); + } + + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const&) + { + return false; + } + }; public: - /// thread_executor is not copyable. - BOOST_THREAD_NO_COPYABLE(thread_executor) - /** * \b Effects: creates a inline executor that runs closures immediately. * * \b Throws: Nothing. */ thread_executor() - : closed_(false) + : pimpl(make_shared()) { } /** @@ -65,9 +166,16 @@ namespace executors */ ~thread_executor() { - // signal to all the worker thread that there will be no more submissions. - close(); - // all the scoped threads will join before destroying + } + + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + return pimpl->try_executing_one(); } /** @@ -76,21 +184,15 @@ namespace executors */ void close() { - lock_guard lk(mtx_); - closed_ = true; + pimpl->close(); } /** * \b Returns: whether the pool is closed for submissions. */ - bool closed(lock_guard& ) - { - return closed_; - } bool closed() { - lock_guard lk(mtx_); - return closed(lk); + return pimpl->closed(); } /** @@ -109,30 +211,18 @@ namespace executors template void submit(Closure & closure) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - threads_.reserve(threads_.size() + 1); - thread th(closure); - threads_.push_back(thread_t(boost::move(th))); + pimpl->submit(closure); } #endif void submit(void (*closure)()) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - threads_.reserve(threads_.size() + 1); - thread th(closure); - threads_.push_back(thread_t(boost::move(th))); + pimpl->submit(closure); } template void submit(BOOST_THREAD_FWD_REF(Closure) closure) { - lock_guard lk(mtx_); - if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); - threads_.reserve(threads_.size() + 1); - thread th(boost::forward(closure)); - threads_.push_back(thread_t(boost::move(th))); + pimpl->submit(boost::forward(closure)); } /** @@ -141,12 +231,14 @@ namespace executors * \b Effects: reschedule functions until pred() */ template - bool reschedule_until(Pred const&) + bool reschedule_until(Pred const& p) { - return false; + return pimpl->reschedule_until(p); } - + private: + shared_ptr pimpl; }; + } using executors::thread_executor; }