diff --git a/include/boost/thread/executors/serial_executor_cont.hpp b/include/boost/thread/executors/serial_executor_cont.hpp index b360c947..d6b2238c 100644 --- a/include/boost/thread/executors/serial_executor_cont.hpp +++ b/include/boost/thread/executors/serial_executor_cont.hpp @@ -31,41 +31,57 @@ namespace executors typedef executors::work work; private: - /// the thread safe work queue - concurrent::sync_queue work_queue; - generic_executor_ref ex; - future fut; + generic_executor_ref ex_; + future fut_; // protected by mtx_ + bool closed_; // protected by mtx_ + mutex mtx_; struct continuation { work task; - continuation(work&& tsk) + template + struct result { + typedef void type; + }; + continuation(BOOST_THREAD_RV_REF(work) tsk) : task(boost::move(tsk)) {} void operator()(future f) { try { task(); } catch (...) { + std::terminate(); } } }; + + bool closed(lock_guard&) const + { + return closed_; + } public: /** * \par Returns * The underlying executor wrapped on a generic executor reference. */ - generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; } + generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex_; } /// serial_executor is not copyable. BOOST_THREAD_NO_COPYABLE(serial_executor) /** - * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. + * \b Effects: creates a serial executor that runs closures in fifo order using one the associated executor. * * \b Throws: Whatever exception is thrown while initializing the needed resources. + * + * \b Notes: + * * The lifetime of the associated executor must outlive the serial executor. + * * The current implementation doesn't support submission from synchronous continuation, that is, + * - the executor must execute the continuation asynchronously or + * - the continuation can not submit to this serial executor. */ template serial_executor(Executor& ex) - : ex(ex), fut(make_ready_future()) + : ex_(ex), fut_(make_ready_future()), closed_(false) { } /** @@ -85,7 +101,8 @@ namespace executors */ void close() { - work_queue.close(); + lock_guard lk(mtx_); + closed_ = true;; } /** @@ -93,37 +110,54 @@ namespace executors */ bool closed() { - return work_queue.closed(); + lock_guard lk(mtx_); + return closed(lk); + } + + /** + * Effects: none. + * Returns: always false. + * Throws: No. + * Remark: A serial executor can not execute one of its pending tasks as the tasks depends on the other tasks. + */ + bool try_executing_one() + { + return false; } /** * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. * - * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. - * If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads. + * \b Effects: The specified \c closure will be scheduled for execution after the last submitted closure finish. + * If the invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads. * - * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. - * - * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * \b Throws: \c sync_queue_is_closed if the executor is closed. * Whatever exception that can be throw while storing the closure. + * */ #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) template void submit(Closure & closure) { - fut = fut.then(ex, continuation(work(closure))); + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(closure))); } #endif void submit(void (*closure)()) { - fut = fut.then(ex, continuation(work(closure))); + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(closure))); } template void submit(BOOST_THREAD_RV_REF(Closure) closure) { - fut = fut.then(ex, continuation(work(boost::forward(closure)))); + lock_guard lk(mtx_); + if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); + fut_ = fut_.then(ex_, continuation(work(boost::forward(closure)))); } };