From 8ce4eeca24e034b721099923b0f97cd52f6a2779 Mon Sep 17 00:00:00 2001 From: "William E. Kempf" Date: Tue, 3 Jul 2001 18:28:10 +0000 Subject: [PATCH] Changed boost::thread to use a noncopyable design. [SVN r10518] --- example/monitor/monitor.cpp | 8 +- example/starvephil/starvephil.cpp | 21 +- example/tennis/tennis.cpp | 8 +- src/thread.cpp | 471 ++++++++++++++++-------------- test/test_thread.cpp | 20 +- 5 files changed, 281 insertions(+), 247 deletions(-) diff --git a/example/monitor/monitor.cpp b/example/monitor/monitor.cpp index d160c354..00a0c59e 100644 --- a/example/monitor/monitor.cpp +++ b/example/monitor/monitor.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace { const int ITERS = 100; @@ -83,9 +84,10 @@ void do_test(M* dummy=0) { typedef buffer_t buffer_type; buffer_type::get_buffer(); - boost::thread::create(&buffer_type::do_sender_thread); - boost::thread::create(&buffer_type::do_receiver_thread); - boost::thread::join_all(); + boost::thread thrd1(&buffer_type::do_sender_thread); + boost::thread thrd2(&buffer_type::do_receiver_thread); + thrd1.join(); + thrd2.join(); } void test_buffer() diff --git a/example/starvephil/starvephil.cpp b/example/starvephil/starvephil.cpp index 4589ad24..0ea755c7 100644 --- a/example/starvephil/starvephil.cpp +++ b/example/starvephil/starvephil.cpp @@ -151,13 +151,20 @@ private: int main(int argc, char* argv[]) { - boost::thread::create(&chef); + boost::thread thrd_chef(&chef); phil p[] = { phil(0), phil(1), phil(2), phil(3), phil(4) }; - boost::thread::create(thread_adapter(&phil::do_thread, &p[0])); - boost::thread::create(thread_adapter(&phil::do_thread, &p[1])); - boost::thread::create(thread_adapter(&phil::do_thread, &p[2])); - boost::thread::create(thread_adapter(&phil::do_thread, &p[3])); - boost::thread::create(thread_adapter(&phil::do_thread, &p[4])); - boost::thread::join_all(); + boost::thread thrd_phil0(thread_adapter(&phil::do_thread, &p[0])); + boost::thread thrd_phil1(thread_adapter(&phil::do_thread, &p[1])); + boost::thread thrd_phil2(thread_adapter(&phil::do_thread, &p[2])); + boost::thread thrd_phil3(thread_adapter(&phil::do_thread, &p[3])); + boost::thread thrd_phil4(thread_adapter(&phil::do_thread, &p[4])); + + thrd_chef.join(); + thrd_phil0.join(); + thrd_phil1.join(); + thrd_phil2.join(); + thrd_phil3.join(); + thrd_phil4.join(); + return 0; } diff --git a/example/tennis/tennis.cpp b/example/tennis/tennis.cpp index b883cd5f..54ae3214 100644 --- a/example/tennis/tennis.cpp +++ b/example/tennis/tennis.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -81,8 +82,8 @@ int main(int argc, char* argv[]) { state = START; - boost::thread::create(thread_adapter(&player, (void*)PLAYER_A)); - boost::thread::create(thread_adapter(&player, (void*)PLAYER_B)); + boost::thread thrda(thread_adapter(&player, (void*)PLAYER_A)); + boost::thread thrdb(thread_adapter(&player, (void*)PLAYER_B)); boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); @@ -109,7 +110,8 @@ int main(int argc, char* argv[]) std::cout << "GAME OVER" << std::endl; - boost::thread::join_all(); + thrda.join(); + thrdb.join(); return 0; } \ No newline at end of file diff --git a/src/thread.cpp b/src/thread.cpp index 943763bb..7172d6fa 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -13,286 +13,252 @@ * Revision History (excluding minor changes for specific compilers) * 8 Feb 01 Initial version. * 1 Jun 01 Added boost::thread initial implementation. + * 3 Jul 01 Redesigned boost::thread to be noncopyable. */ #include -#include -#include #include -#include #include #if defined(BOOST_HAS_WINTHREADS) # include # include -#elif defined(BOOST_HAS_PTHREADS) -# include #endif #include "timeconv.inl" -namespace +#if defined(BOOST_HAS_PTHREADS) +namespace boost { - class thread_counter + // This class is used to signal thread objects when the thread dies. + class thread::thread_list { public: - thread_counter() : _threads(0) { } - - void start() + thread_list() { } + ~thread_list() { - boost::mutex::lock lock(_mutex); - ++_threads; + mutex::lock lock(m_mutex); + for (std::list::iterator it = m_list.begin(); it != m_list.end(); ++it) + { + mutex::lock lock((*it)->m_mutex); + (*it)->m_list = 0; + (*it)->m_cond.notify_all(); + } } - void stop() + void add(thread* thrd) { - boost::mutex::lock lock(_mutex); - if (--_threads == 0) - _cond.notify_all(); + mutex::lock lock(m_mutex); + m_list.push_back(thrd); } - void wait() + void remove(thread* thrd) { - boost::mutex::lock lock(_mutex); - while (_threads != 0) - _cond.wait(lock); + mutex::lock lock(m_mutex); + std::list::iterator it = std::find(m_list.begin(), m_list.end(), thrd); + if (it != m_list.end()) + m_list.erase(it); } private: - unsigned long _threads; - boost::mutex _mutex; - boost::condition _cond; + std::list m_list; + mutex m_mutex; + }; +} +#endif + +namespace +{ +#if defined(BOOST_HAS_PTHREADS) + pthread_key_t key; + pthread_once_t once = PTHREAD_ONCE_INIT; + + void destroy_list(void* p) + { + boost::thread::thread_list* list = static_cast(p); + delete list; + } + + void init_key() + { + int res = pthread_key_create(&key, &destroy_list); + assert(res == 0); + } + + pthread_key_t get_key() + { + int res = pthread_once(&once, &init_key); + assert(res == 0); + return key; + } + + boost::thread::thread_list* get_list() + { + pthread_key_t key = get_key(); + boost::thread::thread_list* list = static_cast(pthread_getspecific(key)); + if (!list) + { + list = new boost::thread::thread_list; + pthread_setspecific(key, list); + } + return list; + } +#endif + + class thread_param + { + public: + thread_param(const boost::function0& threadfunc) : m_threadfunc(threadfunc), m_started(false) { } + void wait() + { + boost::mutex::lock lock(m_mutex); + while (!m_started) + m_cond.wait(lock); + } + void started() + { + boost::mutex::lock lock(m_mutex); + m_started = true; + m_cond.notify_one(); + } + + boost::mutex m_mutex; + boost::condition m_cond; + const boost::function0& m_threadfunc; + bool m_started; +#if defined(BOOST_HAS_PTHREADS) + boost::thread::thread_list* m_list; +#endif }; - thread_counter threads; - boost::tss tss_state; +#if defined(BOOST_HAS_WINTHREADS) + unsigned __stdcall thread_proxy(void* param) +#elif defined(BOOST_HAS_PTHREADS) + void* thread_proxy(void* param) +#endif + { + thread_param* p = static_cast(param); + boost::function0 threadfunc = p->m_threadfunc; +#if defined(BOOST_HAS_PTHREADS) + p->m_list = get_list(); // create the list +#endif + p->started(); + threadfunc(); + return 0; + } } namespace boost { - namespace detail - { - class thread_state - { - enum - { - creating, - created, - started, - finished - }; - - public: - thread_state(); - ~thread_state(); - - void add_ref(); - void release(); - bool is_alive(); - void join(); - -#if defined(BOOST_HAS_WINTHREADS) - static unsigned __stdcall thread_proc(void* param); -#elif defined(BOOST_HAS_PTHREADS) - static void* thread_proc(void* param); -#endif - - static thread_state* create(const boost::detail::threadfunc& func); - - private: - unsigned long _refs; - mutex _mutex; - condition _cond; - int _state; - threadfunc _func; -#if defined(BOOST_HAS_WINTHREADS) - HANDLE _thread; -#elif defined(BOOST_HAS_PTHREADS) - pthread_t _thread; -#endif - - // This line illustrate the internal compiler error encountered on MSVC -// boost::function _function; - }; - - thread_state::thread_state() - : _state(creating), _refs(2) // Both created thread and creating thread own at first - { - } - - thread_state::~thread_state() - { - if (_state == finished) - { -#if defined(BOOST_HAS_WINTHREADS) - int res = CloseHandle(_thread); - assert(res); -#elif defined(BOOST_HAS_PTHREADS) - int res = pthread_detach(_thread); - assert(res == 0); -#endif - } - } - - void thread_state::add_ref() - { - mutex::lock lock(_mutex); - ++_refs; - } - - void thread_state::release() - { - bool del = false; - { - mutex::lock lock(_mutex); - del = (--_refs == 0); - } - if (del) delete this; - } - - bool thread_state::is_alive() - { - mutex::lock lock(_mutex); - return _state == started; - } - - void thread_state::join() - { - mutex::lock lock(_mutex); - while (_state != finished) - _cond.wait(lock); - } - -#if defined(BOOST_HAS_WINTHREADS) - unsigned __stdcall thread_state::thread_proc(void* param) -#elif defined(BOOST_HAS_PTHREADS) - void* thread_state::thread_proc(void* param) -#endif - { - thread_state* state = static_cast(param); - assert(state); - - tss_state.set(state); - - { - mutex::lock lock(state->_mutex); - - while (state->_state != created) - state->_cond.wait(lock); - - state->_state = started; - state->_cond.notify_all(); - threads.start(); - } - - try - { - state->_func(); - } - catch (...) - { - } - - { - mutex::lock lock(state->_mutex); - state->_state = finished; - state->_cond.notify_all(); - } - - state->release(); - threads.stop(); - - return 0; - } - - thread_state* thread_state::create(const boost::detail::threadfunc& func) - { - thread_state* state = new thread_state(); - mutex::lock lock(state->_mutex); - - assert(func); - state->_func = func; - -#if defined(BOOST_HAS_WINTHREADS) - unsigned id; - state->_thread = (HANDLE)_beginthreadex(0, 0, &thread_proc, state, 0, &id); - assert(state->_thread); - - if (state->_thread == 0) - { - delete state; - return 0; - } -#elif defined(BOOST_HAS_PTHREADS) - int res = pthread_create(&state->_thread, 0, &thread_proc, state); - assert(res == 0); - - if (res != 0) - { - delete state; - return 0; - } -#endif - - state->_state = created; - state->_cond.notify_all(); - - while (state->_state != started) - state->_cond.wait(lock); - - return state; - } - } - lock_error::lock_error() : std::runtime_error("thread lock error") { } - thread::thread(const thread& other) - : _state(other._state) + thread::thread() { - if (_state) - _state->add_ref(); +#if defined(BOOST_HAS_WINTHREADS) + HANDLE cur = GetCurrentThread(); + HANDLE real; + DuplicateHandle(GetCurrentProcess(), cur, GetCurrentProcess(), &real, 0, FALSE, DUPLICATE_SAME_ACCESS); + m_thread = reinterpret_cast(real); + m_id = GetCurrentThreadId(); +#elif defined(BOOST_HAS_PTHREADS) + m_thread = pthread_self(); + m_list = get_list(); + m_list->add(this); +#endif + } + + thread::thread(const boost::function0& threadfunc) + { + thread_param param(threadfunc); +#if defined(BOOST_HAS_WINTHREADS) + m_thread = _beginthreadex(0, 0, &thread_proxy, ¶m, 0, &m_id); + assert(m_thread); +#elif defined(BOOST_HAS_PTHREADS) + int res = pthread_create(&m_thread, 0, &thread_proxy, ¶m); + assert(res == 0); +#endif + param.wait(); +#if defined(BOOST_HAS_PTHREADS) + m_list = param.m_list; + assert(m_list); + m_list->add(this); +#endif } thread::~thread() { - if (_state) - _state->release(); + int res = 0; +#if defined(BOOST_HAS_WINTHREADS) + res = CloseHandle(reinterpret_cast(m_thread)); + assert(res); +#elif defined(BOOST_HAS_PTHREADS) + { + mutex::lock lock(m_mutex); + if (m_list) + m_list->remove(this); + } + + res = pthread_detach(m_thread); + assert(res == 0); +#endif } - bool thread::is_alive() const + bool thread::operator==(const thread& other) { - if (_state) - return _state->is_alive(); - return false; +#if defined(BOOST_HAS_WINTHREADS) + return other.m_id == m_id; +#elif defined(BOOST_HAS_PTHREADS) + return pthread_equal(m_thread, other.m_thread) != 0; +#endif + } + + bool thread::operator!=(const thread& other) + { + return operator!=(other); } void thread::join() { - if (_state) - _state->join(); + int res; +#if defined(BOOST_HAS_WINTHREADS) + res = WaitForSingleObject(reinterpret_cast(m_thread), INFINITE); + assert(res == WAIT_OBJECT_0); +#elif defined(BOOST_HAS_PTHREADS) + mutex::lock lock(m_mutex); + while (m_list) + m_cond.wait(lock); +#endif } - thread thread::create(const detail::threadfunc& func) + bool thread::try_join() { - thread temp; - temp._state = detail::thread_state::create(func); - return temp; +#if defined(BOOST_HAS_WINTHREADS) + return WaitForSingleObject(reinterpret_cast(m_thread), 0) == WAIT_OBJECT_0; +#elif defined(BOOST_HAS_PTHREADS) + mutex::lock lock(m_mutex); + bool ret = (m_list == 0); + return ret; +#endif } - thread thread::self() + bool thread::timed_join(const xtime& xt) { - thread temp; - temp._state = static_cast(tss_state.get()); - if (temp._state) - temp._state->add_ref(); - return temp; - } - - void thread::join_all() - { - threads.wait(); +#if defined(BOOST_HAS_WINTHREADS) + unsigned milliseconds; + to_duration(xt, milliseconds); + return WaitForSingleObject(reinterpret_cast(m_thread), 0) == WAIT_OBJECT_0; +#elif defined(BOOST_HAS_PTHREADS) + mutex::lock lock(m_mutex); + while (m_list) + { + if (!m_cond.timed_wait(lock, xt)) + break; + } + bool ret = (m_list == 0); + return ret; +#endif } void thread::sleep(const xtime& xt) @@ -336,4 +302,57 @@ namespace boost # endif #endif } + + thread_group::thread_group() + { + } + + thread_group::~thread_group() + { + // We shouldn't have to lock here, since referencing this object from another thread + // while we're deleting it in the current thread is going to lead to undefined behavior + // any way. + for (std::list::iterator it = m_threads.begin(); it != m_threads.end(); ++it) + delete (*it); + } + + thread* thread_group::create_thread(const function0& threadfunc) + { + // No lock required here since the only "shared data" that's modified here occurs + // inside add_thread which does lock. + std::auto_ptr thrd(new thread(threadfunc)); + add_thread(thrd.get()); + return thrd.release(); + } + + void thread_group::add_thread(thread* thrd) + { + mutex::lock lock(m_mutex); + + // For now we'll simply ignore requests to add a thread object multiple times. + // Should we consider this an error and either throw or return an error value? + std::list::iterator it = std::find(m_threads.begin(), m_threads.end(), thrd); + assert(it == m_threads.end()); + if (it == m_threads.end()) + m_threads.push_back(thrd); + } + + void thread_group::remove_thread(thread* thrd) + { + mutex::lock lock(m_mutex); + + // For now we'll simply ignore requests to remove a thread object that's not in the group. + // Should we consider this an error and either throw or return an error value? + std::list::iterator it = std::find(m_threads.begin(), m_threads.end(), thrd); + assert(it != m_threads.end()); + if (it != m_threads.end()) + m_threads.erase(it); + } + + void thread_group::join_all() + { + mutex::lock lock(m_mutex); + for (std::list::iterator it = m_threads.begin(); it != m_threads.end(); ++it) + (*it)->join(); + } } \ No newline at end of file diff --git a/test/test_thread.cpp b/test/test_thread.cpp index 0ae4ef1a..bc016632 100644 --- a/test/test_thread.cpp +++ b/test/test_thread.cpp @@ -5,6 +5,7 @@ #include //#include #include +#include #define BOOST_INCLUDE_MAIN #include @@ -224,7 +225,7 @@ void test_condition_notify_one() { condition_test_data data; - boost::thread::create(thread_adapter(&condition_test_thread, &data)); + boost::thread thread(thread_adapter(&condition_test_thread, &data)); { boost::mutex::lock lock(data.mutex); @@ -233,17 +234,18 @@ void test_condition_notify_one() data.condition.notify_one(); } - boost::thread::join_all(); + thread.join(); BOOST_TEST(data.awoken == 1); } void test_condition_notify_all() { const int NUMTHREADS = 5; + boost::thread_group threads; condition_test_data data; for (int i = 0; i < NUMTHREADS; ++i) - boost::thread::create(thread_adapter(&condition_test_thread, &data)); + threads.create_thread(thread_adapter(&condition_test_thread, &data)); { boost::mutex::lock lock(data.mutex); @@ -252,7 +254,7 @@ void test_condition_notify_all() data.condition.notify_all(); } - boost::thread::join_all(); + threads.join_all(); BOOST_TEST(data.awoken == NUMTHREADS); } @@ -312,7 +314,7 @@ void test_condition_waits() { condition_test_data data; - boost::thread::create(thread_adapter(&condition_test_waits, &data)); + boost::thread thread(thread_adapter(&condition_test_waits, &data)); boost::xtime xt; @@ -356,7 +358,7 @@ void test_condition_waits() BOOST_TEST(boost::xtime_get(&xt, boost::TIME_UTC) == boost::TIME_UTC); xt.sec += 1; boost::thread::sleep(xt); - boost::thread::join_all(); + thread.join(); BOOST_TEST(data.awoken == 4); } @@ -416,9 +418,11 @@ void test_tss_thread() void test_tss() { + const int NUMTHREADS=5; + boost::thread_group threads; for (int i=0; i<5; ++i) - boost::thread::create(&test_tss_thread); - boost::thread::join_all(); + threads.create_thread(&test_tss_thread); + threads.join_all(); } int test_main(int, char*[])