diff --git a/example/executor.cpp b/example/executor.cpp index 4a9a83a8..8d1f0392 100644 --- a/example/executor.cpp +++ b/example/executor.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -25,109 +26,33 @@ void p2() << boost::this_thread::get_id() << " P2" << BOOST_THREAD_END_LOG; } -void push(boost::csbl::deque &data_, BOOST_THREAD_RV_REF(boost::thread_detail::work) closure) -{ - try - { - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " (closure); - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " (closure)); - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " -void submit(boost::csbl::deque &data_, BOOST_THREAD_FWD_REF(Closure) closure) -{ - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " (closure))); - boost::thread_detail::work v =boost::forward(closure); - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " data_; - data_.push_back(boost::move(f)); - data_.push_back(boost::thread_detail::work(&p1)); - submit(data_, &p1); - } - catch (std::exception& ex) - { - BOOST_THREAD_LOG - << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG; - } - catch (...) - { - BOOST_THREAD_LOG - << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG; - } - - typedef boost::csbl::vector thread_vector; - thread_vector threads; - - } -#endif -#if 1 { try { boost::executor_adaptor ea; - boost::executor &tp=ea; - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " ea2; + submit_some(ea2); + ea2.underlying_executor().run_queued_closures(); + } catch (std::exception& ex) { @@ -142,7 +67,6 @@ int main() return 2; } } -#endif BOOST_THREAD_LOG << boost::this_thread::get_id() << "MAIN>" << BOOST_THREAD_END_LOG; return 0; diff --git a/example/thread_pool.cpp b/example/thread_pool.cpp index 6b59878f..dfc2bddc 100644 --- a/example/thread_pool.cpp +++ b/example/thread_pool.cpp @@ -24,108 +24,29 @@ void p2() << boost::this_thread::get_id() << " P2" << BOOST_THREAD_END_LOG; } -void push(boost::csbl::deque &data_, BOOST_THREAD_RV_REF(boost::thread_detail::work) closure) -{ - try - { - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " (closure); - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " (closure)); - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " -void submit(boost::csbl::deque &data_, BOOST_THREAD_FWD_REF(Closure) closure) -{ - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " (closure))); - boost::thread_detail::work v =boost::forward(closure); - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " data_; - data_.push_back(boost::move(f)); - data_.push_back(boost::thread_detail::work(&p1)); - submit(data_, &p1); - } - catch (std::exception& ex) - { - BOOST_THREAD_LOG - << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG; - } - catch (...) - { - BOOST_THREAD_LOG - << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG; - } - - typedef boost::csbl::vector thread_vector; - thread_vector threads; - - } -#endif -#if 1 { try { boost::thread_pool tp; - BOOST_THREAD_LOG - << boost::this_thread::get_id() << " " << BOOST_THREAD_END_LOG; return 0; diff --git a/example/user_scheduler.cpp b/example/user_scheduler.cpp new file mode 100644 index 00000000..214792c5 --- /dev/null +++ b/example/user_scheduler.cpp @@ -0,0 +1,70 @@ +// Copyright (C) 2013 Vicente Botet +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#define BOOST_THREAD_VERSION 4 +#define BOOST_THREAD_USES_LOG +#define BOOST_THREAD_USES_LOG_THREAD_ID + +#include +#include +#include +#include + +void p1() +{ + BOOST_THREAD_LOG + << boost::this_thread::get_id() << " P1" << BOOST_THREAD_END_LOG; +} + +void p2() +{ + BOOST_THREAD_LOG + << boost::this_thread::get_id() << " P2" << BOOST_THREAD_END_LOG; +} + +void submit_some(boost::user_scheduler& tp) { + tp.submit(&p1); + tp.submit(&p2); + tp.submit(&p1); + tp.submit(&p2); + tp.submit(&p1); + tp.submit(&p2); + tp.submit(&p1); + tp.submit(&p2); + tp.submit(&p1); + tp.submit(&p2); +} + +int main() +{ + BOOST_THREAD_LOG + << boost::this_thread::get_id() << " " << BOOST_THREAD_END_LOG; + return 0; +} diff --git a/include/boost/thread/executor.hpp b/include/boost/thread/executor.hpp index 195d250f..1bed6162 100644 --- a/include/boost/thread/executor.hpp +++ b/include/boost/thread/executor.hpp @@ -144,6 +144,8 @@ namespace boost template executor_adaptor(BOOST_THREAD_FWD_REF(A1) a1) : ex(boost::forward(a1)) {} #endif + Executor& underlying_executor() { return ex; } + /** * \b Effects: close the \c executor for submissions. * The worker threads will work until there is no more closures to run. diff --git a/include/boost/thread/sync_queue.hpp b/include/boost/thread/sync_queue.hpp index bb90209d..c94745a9 100644 --- a/include/boost/thread/sync_queue.hpp +++ b/include/boost/thread/sync_queue.hpp @@ -33,6 +33,7 @@ namespace boost { public: typedef ValueType value_type; + typedef csbl::deque underlying_queue_type; typedef std::size_t size_type; // Constructors/Assignment/Destructors @@ -68,12 +69,16 @@ namespace boost inline bool try_pull(value_type&); inline bool try_pull(no_block_tag,value_type&); inline shared_ptr try_pull(); + inline underlying_queue_type underlying_queue() { + lock_guard lk(mtx_); + return boost::move(data_); + } private: mutable mutex mtx_; condition_variable not_empty_; size_type waiting_empty_; - csbl::deque data_; + underlying_queue_type data_; bool closed_; inline bool empty(unique_lock& ) const BOOST_NOEXCEPT diff --git a/include/boost/thread/thread_pool.hpp b/include/boost/thread/thread_pool.hpp index 0e811403..291ab425 100644 --- a/include/boost/thread/thread_pool.hpp +++ b/include/boost/thread/thread_pool.hpp @@ -16,28 +16,13 @@ #include #include #include - - -//#ifdef BOOST_NO_CXX11_HDR_FUNCTIONAL -//#include -//#else -//#include -//#endif - #include -//#if defined BOOST_NO_CXX11_RVALUE_REFERENCES -//#include -//#else -//#include -//#endif - #include namespace boost { - class thread_pool { /// type-erasure to store the works to do diff --git a/include/boost/thread/user_scheduler.hpp b/include/boost/thread/user_scheduler.hpp new file mode 100644 index 00000000..c401431c --- /dev/null +++ b/include/boost/thread/user_scheduler.hpp @@ -0,0 +1,202 @@ +// Copyright (C) 2013 Vicente J. Botet Escriba +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// 2013/11 Vicente J. Botet Escriba +// first implementation of a simple serial scheduler. + +#ifndef BOOST_THREAD_USER_SCHEDULER_HPP +#define BOOST_THREAD_USER_SCHEDULER_HPP + +#include +#include +#include +#include +#include + +#include + +namespace boost +{ + + class user_scheduler + { + /// type-erasure to store the works to do + typedef thread_detail::work work; + + /// the thread safe work queue + sync_queue work_queue; + + public: + /** + * Effects: try to execute one task. + * Returns: whether a task has been executed. + * Throws: whatever the current task constructor throws or the task() throws. + */ + bool try_executing_one() + { + work task; + try + { + if (work_queue.try_pull(task)) + { + task(); + return true; + } + return false; + } + catch (std::exception& ) + { + return false; + } + catch (...) + { + return false; + } + } + private: + /** + * Effects: schedule one task or yields + * Throws: whatever the current task constructor throws or the task() throws. + */ + void schedule_one_or_yield() + { + if ( ! try_executing_one()) + { + this_thread::yield(); + } + } + + + /** + * The main loop of the worker thread + */ + void worker_thread() + { + while (!closed()) + { + schedule_one_or_yield(); + } + while (try_executing_one()) + { + } + } + + public: + /// user_scheduler is not copyable. + BOOST_THREAD_NO_COPYABLE(user_scheduler) + + /** + * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods. + * + * \b Throws: Whatever exception is thrown while initializing the needed resources. + */ + user_scheduler() + { + } + /** + * \b Effects: Destroys the thread pool. + * + * \b Synchronization: The completion of all the closures happen before the completion of the \c user_scheduler destructor. + */ + ~user_scheduler() + { + // signal to all the worker thread that there will be no more submissions. + close(); + } + + /** + * loop + */ + void loop() { worker_thread(); } + /** + * \b Effects: close the \c user_scheduler for submissions. + * The loop will work until there is no more closures to run. + */ + void close() + { + work_queue.close(); + } + + /** + * \b Returns: whether the pool is closed for submissions. + */ + bool closed() + { + return work_queue.closed(); + } + + /** + * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible. + * + * \b Effects: The specified \c closure will be scheduled for execution at some point in the future. + * If invoked closure throws an exception the \c user_scheduler will call \c std::terminate, as is the case with threads. + * + * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables. + * + * \b Throws: \c sync_queue_is_closed if the thread pool is closed. + * Whatever exception that can be throw while storing the closure. + */ + +#if defined(BOOST_NO_CXX11_RVALUE_REFERENCES) + template + void submit(Closure & closure) + { + work w ((closure)); + work_queue.push(boost::move(w)); + //work_queue.push(work(closure)); // todo check why this doesn't work + } +#endif + void submit(void (*closure)()) + { + work w ((closure)); + work_queue.push(boost::move(w)); + //work_queue.push(work(closure)); // todo check why this doesn't work + } + + template + void submit(BOOST_THREAD_RV_REF(Closure) closure) + { + work w =boost::move(closure); + work_queue.push(boost::move(w)); + //work_queue.push(work(boost::move(closure))); // todo check why this doesn't work + } + + /** + * \b Requires: This must be called from an scheduled task. + * + * \b Effects: reschedule functions until pred() + */ + template + bool reschedule_until(Pred const& pred) + { + do { + if ( ! try_executing_one()) + { + return false; + } + } while (! pred()); + return true; + } + /** + * run queued closures + */ + void run_queued_closures() + { + sync_queue::underlying_queue_type q = work_queue.underlying_queue(); + while (q.empty()) + { + work task = q.front(); + q.pop_front(); + task(); + } + } + + }; + +} + +#include + +#endif diff --git a/test/Jamfile.v2 b/test/Jamfile.v2 index 6c4e31a5..269c851a 100644 --- a/test/Jamfile.v2 +++ b/test/Jamfile.v2 @@ -721,6 +721,7 @@ rule thread-compile ( sources : reqs * : name ) [ thread-run2 ../example/lambda_future.cpp : ex_lambda_future ] [ thread-run2 ../example/not_interleaved2.cpp : ex_not_interleaved2 ] [ thread-run2 ../example/thread_pool.cpp : ex_thread_pool ] + [ thread-run2 ../example/user_scheduler.cpp : ex_user_scheduler ] [ thread-run2 ../example/executor.cpp : ex_executor ] [ thread-run2 ../example/future_when_all.cpp : future_when_all ] @@ -813,6 +814,7 @@ rule thread-compile ( sources : reqs * : name ) #[ thread-run ../example/std_async_test.cpp ] #[ compile virtual_noexcept.cpp ] #[ thread-run clang_main.cpp ] + #[ thread-run test_9303.cpp ] ; }