From 370f5d461cec744601ac1fda6dd44180a0bd83e9 Mon Sep 17 00:00:00 2001 From: Anthony Williams Date: Thu, 1 Nov 2007 17:07:47 +0000 Subject: [PATCH] condition wait and sleep are now cancellation points [SVN r40647] --- .../thread/pthread/condition_variable.hpp | 134 ++++++++---------- .../thread/pthread/condition_variable_fwd.hpp | 53 +++++++ include/boost/thread/pthread/thread.hpp | 35 +---- include/boost/thread/pthread/thread_data.hpp | 93 ++++++++++++ .../boost/thread/win32/condition_variable.hpp | 5 +- include/boost/thread/win32/thread.hpp | 6 +- src/pthread/thread.cpp | 134 ++++++++++-------- test/test_condition.cpp | 23 ++- 8 files changed, 309 insertions(+), 174 deletions(-) create mode 100644 include/boost/thread/pthread/condition_variable_fwd.hpp create mode 100644 include/boost/thread/pthread/thread_data.hpp diff --git a/include/boost/thread/pthread/condition_variable.hpp b/include/boost/thread/pthread/condition_variable.hpp index 99f58c57..84a1e8cb 100644 --- a/include/boost/thread/pthread/condition_variable.hpp +++ b/include/boost/thread/pthread/condition_variable.hpp @@ -13,78 +13,56 @@ #include #include "timespec.hpp" #include "pthread_mutex_scoped_lock.hpp" +#include "thread_data.hpp" +#include "condition_variable_fwd.hpp" namespace boost { - class condition_variable + inline condition_variable::condition_variable() { - private: - pthread_cond_t cond; + int const res=pthread_cond_init(&cond,NULL); + if(res) + { + throw thread_resource_error(); + } + } + inline condition_variable::~condition_variable() + { + int const res=pthread_cond_destroy(&cond); + BOOST_ASSERT(!res); + } + + inline void condition_variable::wait(unique_lock& m) + { + detail::cancel_wrapper allow_cancel(&cond); + int const cond_res=pthread_cond_wait(&cond,m.mutex()->native_handle()); + BOOST_ASSERT(!cond_res); + } + + inline bool condition_variable::timed_wait(unique_lock& m,boost::system_time const& wait_until) + { + detail::cancel_wrapper allow_cancel(&cond); + struct timespec const timeout=detail::get_timespec(wait_until); + int const cond_res=pthread_cond_timedwait(&cond,m.mutex()->native_handle(),&timeout); + if(cond_res==ETIMEDOUT) + { + return false; + } + BOOST_ASSERT(!cond_res); + return true; + } + + inline void condition_variable::notify_one() + { + int const res=pthread_cond_signal(&cond); + BOOST_ASSERT(!res); + } - condition_variable(condition_variable&); - condition_variable& operator=(condition_variable&); - public: - condition_variable() - { - int const res=pthread_cond_init(&cond,NULL); - if(res) - { - throw thread_resource_error(); - } - } - ~condition_variable() - { - int const res=pthread_cond_destroy(&cond); - BOOST_ASSERT(!res); - } - - void wait(unique_lock& m) - { - int const cond_res=pthread_cond_wait(&cond,m.mutex()->native_handle()); - BOOST_ASSERT(!cond_res); - } - - template - void wait(unique_lock& m,predicate_type pred) - { - while(!pred()) wait(m); - } - - bool timed_wait(unique_lock& m,boost::system_time const& wait_until) - { - struct timespec const timeout=detail::get_timespec(wait_until); - int const cond_res=pthread_cond_timedwait(&cond,m.mutex()->native_handle(),&timeout); - if(cond_res==ETIMEDOUT) - { - return false; - } - BOOST_ASSERT(!cond_res); - return true; - } - - template - bool timed_wait(unique_lock& m,boost::system_time const& wait_until,predicate_type pred) - { - while (!pred()) - { - if(!timed_wait(m, wait_until)) - return false; - } - return true; - } - - void notify_one() - { - int const res=pthread_cond_signal(&cond); - BOOST_ASSERT(!res); - } - - void notify_all() - { - int const res=pthread_cond_broadcast(&cond); - BOOST_ASSERT(!res); - } - }; + inline void condition_variable::notify_all() + { + int const res=pthread_cond_broadcast(&cond); + BOOST_ASSERT(!res); + } class condition_variable_any { @@ -123,11 +101,14 @@ namespace boost { int res=0; { - boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); - m.unlock(); - res=pthread_cond_wait(&cond,&internal_mutex); + detail::cancel_wrapper allow_cancel(&cond); + { + boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); + m.unlock(); + res=pthread_cond_wait(&cond,&internal_mutex); + } + m.lock(); } - m.lock(); if(res) { throw condition_error(); @@ -146,11 +127,14 @@ namespace boost struct timespec const timeout=detail::get_timespec(wait_until); int res=0; { - boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); - m.unlock(); - res=pthread_cond_timedwait(&cond,&internal_mutex,&timeout); + detail::cancel_wrapper allow_cancel(&cond); + { + boost::pthread::pthread_mutex_scoped_lock internal_lock(&internal_mutex); + m.unlock(); + res=pthread_cond_timedwait(&cond,&internal_mutex,&timeout); + } + m.lock(); } - m.lock(); if(res==ETIMEDOUT) { return false; diff --git a/include/boost/thread/pthread/condition_variable_fwd.hpp b/include/boost/thread/pthread/condition_variable_fwd.hpp new file mode 100644 index 00000000..fb38f584 --- /dev/null +++ b/include/boost/thread/pthread/condition_variable_fwd.hpp @@ -0,0 +1,53 @@ +#ifndef BOOST_THREAD_PTHREAD_CONDITION_VARIABLE_FWD_HPP +#define BOOST_THREAD_PTHREAD_CONDITION_VARIABLE_FWD_HPP +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// (C) Copyright 2007 Anthony Williams + +#include +#include +#include + +namespace boost +{ + class condition_variable + { + private: + pthread_cond_t cond; + + condition_variable(condition_variable&); + condition_variable& operator=(condition_variable&); + + struct cancel_wrapper; + public: + condition_variable(); + ~condition_variable(); + + void wait(unique_lock& m); + + template + void wait(unique_lock& m,predicate_type pred) + { + while(!pred()) wait(m); + } + + bool timed_wait(unique_lock& m,boost::system_time const& wait_until); + + template + bool timed_wait(unique_lock& m,boost::system_time const& wait_until,predicate_type pred) + { + while (!pred()) + { + if(!timed_wait(m, wait_until)) + return false; + } + return true; + } + + void notify_one(); + void notify_all(); + }; +} + +#endif diff --git a/include/boost/thread/pthread/thread.hpp b/include/boost/thread/pthread/thread.hpp index 4ca7db08..baf04d19 100644 --- a/include/boost/thread/pthread/thread.hpp +++ b/include/boost/thread/pthread/thread.hpp @@ -20,10 +20,10 @@ #include #include #include +#include "thread_data.hpp" namespace boost { - class thread; namespace detail @@ -81,39 +81,6 @@ namespace boost }; } - class thread_cancelled - {}; - - namespace detail - { - struct thread_exit_callback_node; - - struct thread_data_base - { - boost::shared_ptr self; - pthread_t thread_handle; - boost::mutex data_mutex; - boost::condition_variable done_condition; - bool done; - bool join_started; - bool joined; - boost::detail::thread_exit_callback_node* thread_exit_callbacks; - bool cancel_enabled; - bool cancel_requested; - - thread_data_base(): - done(false),join_started(false),joined(false), - thread_exit_callbacks(0), - cancel_enabled(true), - cancel_requested(false) - {} - virtual ~thread_data_base() - {} - - virtual void run()=0; - }; - } - struct xtime; class BOOST_THREAD_DECL thread { diff --git a/include/boost/thread/pthread/thread_data.hpp b/include/boost/thread/pthread/thread_data.hpp new file mode 100644 index 00000000..93d2bbca --- /dev/null +++ b/include/boost/thread/pthread/thread_data.hpp @@ -0,0 +1,93 @@ +#ifndef BOOST_THREAD_PTHREAD_THREAD_DATA_HPP +#define BOOST_THREAD_PTHREAD_THREAD_DATA_HPP +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// (C) Copyright 2007 Anthony Williams + +#include +#include +#include +#include +#include +#include "condition_variable_fwd.hpp" + +namespace boost +{ + class thread_cancelled + {}; + + namespace detail + { + struct thread_exit_callback_node; + + struct thread_data_base + { + boost::shared_ptr self; + pthread_t thread_handle; + boost::mutex data_mutex; + boost::condition_variable done_condition; + boost::mutex sleep_mutex; + boost::condition_variable sleep_condition; + bool done; + bool join_started; + bool joined; + boost::detail::thread_exit_callback_node* thread_exit_callbacks; + bool cancel_enabled; + bool cancel_requested; + pthread_cond_t* current_cond; + + thread_data_base(): + done(false),join_started(false),joined(false), + thread_exit_callbacks(0), + cancel_enabled(true), + cancel_requested(false), + current_cond(0) + {} + virtual ~thread_data_base() + {} + + virtual void run()=0; + }; + + BOOST_THREAD_DECL thread_data_base* get_current_thread_data(); + + class cancel_wrapper + { + thread_data_base* const thread_info; + + void check_cancel() + { + if(thread_info->cancel_requested) + { + thread_info->cancel_requested=false; + throw thread_cancelled(); + } + } + + public: + explicit cancel_wrapper(pthread_cond_t* cond): + thread_info(detail::get_current_thread_data()) + { + if(thread_info && thread_info->cancel_enabled) + { + lock_guard guard(thread_info->data_mutex); + check_cancel(); + thread_info->current_cond=cond; + } + } + ~cancel_wrapper() + { + if(thread_info && thread_info->cancel_enabled) + { + lock_guard guard(thread_info->data_mutex); + thread_info->current_cond=NULL; + check_cancel(); + } + } + }; + } +} + + +#endif diff --git a/include/boost/thread/win32/condition_variable.hpp b/include/boost/thread/win32/condition_variable.hpp index 50319339..5f173e45 100644 --- a/include/boost/thread/win32/condition_variable.hpp +++ b/include/boost/thread/win32/condition_variable.hpp @@ -156,13 +156,10 @@ namespace boost ++generations[0].count; sem=detail::win32::duplicate_handle(generations[0].semaphore); } - unsigned long const wait_result=detail::win32::WaitForSingleObject(sem,::boost::detail::get_milliseconds_until(wait_until)); - - if(wait_result==detail::win32::timeout) + if(!this_thread::cancellable_wait(sem,::boost::detail::get_milliseconds_until(wait_until))) { break; } - BOOST_ASSERT(!wait_result); unsigned long const woken_result=detail::win32::WaitForSingleObject(local_wake_sem,0); BOOST_ASSERT(woken_result==detail::win32::timeout || woken_result==0); diff --git a/include/boost/thread/win32/thread.hpp b/include/boost/thread/win32/thread.hpp index 1e0575d8..9d2f00f1 100644 --- a/include/boost/thread/win32/thread.hpp +++ b/include/boost/thread/win32/thread.hpp @@ -197,6 +197,10 @@ namespace boost thread::id BOOST_THREAD_DECL get_id(); bool BOOST_THREAD_DECL cancellable_wait(detail::win32::handle handle_to_wait_for,unsigned long milliseconds); + inline bool cancellable_wait(unsigned long milliseconds) + { + return cancellable_wait(detail::win32::invalid_handle_value,milliseconds); + } void BOOST_THREAD_DECL cancellation_point(); bool BOOST_THREAD_DECL cancellation_enabled(); @@ -207,7 +211,7 @@ namespace boost template void sleep(TimeDuration const& rel_time) { - cancellable_wait(detail::win32::invalid_handle_value,static_cast(rel_time.total_milliseconds())); + cancellable_wait(static_cast(rel_time.total_milliseconds())); } } diff --git a/src/pthread/thread.cpp b/src/pthread/thread.cpp index bc9b6828..abe8f0f4 100644 --- a/src/pthread/thread.cpp +++ b/src/pthread/thread.cpp @@ -24,41 +24,42 @@ namespace boost boost::detail::thread_exit_function_base* func; thread_exit_callback_node* next; }; - } - namespace - { - boost::once_flag current_thread_tls_init_flag=BOOST_ONCE_INIT; - pthread_key_t current_thread_tls_key; - extern "C" + namespace { - void tls_destructor(void* data) + boost::once_flag current_thread_tls_init_flag=BOOST_ONCE_INIT; + pthread_key_t current_thread_tls_key; + + extern "C" { - boost::detail::thread_data_base* thread_info=static_cast(data); - if(thread_info) + void tls_destructor(void* data) { - while(thread_info->thread_exit_callbacks) + boost::detail::thread_data_base* thread_info=static_cast(data); + if(thread_info) { - boost::detail::thread_exit_callback_node* const current_node=thread_info->thread_exit_callbacks; - thread_info->thread_exit_callbacks=current_node->next; - if(current_node->func) + while(thread_info->thread_exit_callbacks) { - (*current_node->func)(); - delete current_node->func; + boost::detail::thread_exit_callback_node* const current_node=thread_info->thread_exit_callbacks; + thread_info->thread_exit_callbacks=current_node->next; + if(current_node->func) + { + (*current_node->func)(); + delete current_node->func; + } + delete current_node; } - delete current_node; } } } - } - void create_current_thread_tls_key() - { - int const res=pthread_key_create(¤t_thread_tls_key,NULL); - BOOST_ASSERT(!res); + void create_current_thread_tls_key() + { + int const res=pthread_key_create(¤t_thread_tls_key,NULL); + BOOST_ASSERT(!res); + } } - + boost::detail::thread_data_base* get_current_thread_data() { boost::call_once(current_thread_tls_init_flag,create_current_thread_tls_key); @@ -71,15 +72,17 @@ namespace boost int const res=pthread_setspecific(current_thread_tls_key,new_data); BOOST_ASSERT(!res); } - - + } + + namespace + { extern "C" { void* thread_proxy(void* param) { boost::shared_ptr thread_info = static_cast(param)->self; thread_info->self.reset(); - set_current_thread_data(thread_info.get()); + detail::set_current_thread_data(thread_info.get()); try { thread_info->run(); @@ -92,8 +95,8 @@ namespace boost std::terminate(); } - tls_destructor(thread_info.get()); - set_current_thread_data(0); + detail::tls_destructor(thread_info.get()); + detail::set_current_thread_data(0); boost::lock_guard lock(thread_info->data_mutex); thread_info->done=true; thread_info->done_condition.notify_all(); @@ -213,33 +216,43 @@ namespace boost void thread::sleep(const system_time& st) { - xtime const xt=get_xtime(st); - - for (int foo=0; foo < 5; ++foo) + detail::thread_data_base* const thread_info=detail::get_current_thread_data(); + + if(thread_info) { + unique_lock lk(thread_info->sleep_mutex); + while(thread_info->sleep_condition.timed_wait(lk,st)); + } + else + { + xtime const xt=get_xtime(st); + + for (int foo=0; foo < 5; ++foo) + { # if defined(BOOST_HAS_PTHREAD_DELAY_NP) - timespec ts; - to_timespec_duration(xt, ts); - int res = 0; - res = pthread_delay_np(&ts); - BOOST_ASSERT(res == 0); + timespec ts; + to_timespec_duration(xt, ts); + int res = 0; + res = pthread_delay_np(&ts); + BOOST_ASSERT(res == 0); # elif defined(BOOST_HAS_NANOSLEEP) - timespec ts; - to_timespec_duration(xt, ts); - - // nanosleep takes a timespec that is an offset, not - // an absolute time. - nanosleep(&ts, 0); + timespec ts; + to_timespec_duration(xt, ts); + + // nanosleep takes a timespec that is an offset, not + // an absolute time. + nanosleep(&ts, 0); # else - mutex mx; - mutex::scoped_lock lock(mx); - condition cond; - cond.timed_wait(lock, xt); + mutex mx; + mutex::scoped_lock lock(mx); + condition cond; + cond.timed_wait(lock, xt); # endif - xtime cur; - xtime_get(&cur, TIME_UTC); - if (xtime_cmp(xt, cur) <= 0) - return; + xtime cur; + xtime_get(&cur, TIME_UTC); + if (xtime_cmp(xt, cur) <= 0) + return; + } } } @@ -285,6 +298,11 @@ namespace boost { lock_guard lk(local_thread_info->data_mutex); local_thread_info->cancel_requested=true; + if(local_thread_info->current_cond) + { + int const res=pthread_cond_broadcast(local_thread_info->current_cond); + BOOST_ASSERT(!res); + } } } @@ -293,7 +311,7 @@ namespace boost { void cancellation_point() { - boost::detail::thread_data_base* const thread_info=get_current_thread_data(); + boost::detail::thread_data_base* const thread_info=detail::get_current_thread_data(); if(thread_info && thread_info->cancel_enabled) { lock_guard lg(thread_info->data_mutex); @@ -307,13 +325,13 @@ namespace boost bool cancellation_enabled() { - boost::detail::thread_data_base* const thread_info=get_current_thread_data(); + boost::detail::thread_data_base* const thread_info=detail::get_current_thread_data(); return thread_info && thread_info->cancel_enabled; } bool cancellation_requested() { - boost::detail::thread_data_base* const thread_info=get_current_thread_data(); + boost::detail::thread_data_base* const thread_info=detail::get_current_thread_data(); if(!thread_info) { return false; @@ -330,15 +348,15 @@ namespace boost { if(cancel_was_enabled) { - get_current_thread_data()->cancel_enabled=false; + detail::get_current_thread_data()->cancel_enabled=false; } } disable_cancellation::~disable_cancellation() { - if(get_current_thread_data()) + if(detail::get_current_thread_data()) { - get_current_thread_data()->cancel_enabled=cancel_was_enabled; + detail::get_current_thread_data()->cancel_enabled=cancel_was_enabled; } } @@ -346,15 +364,15 @@ namespace boost { if(d.cancel_was_enabled) { - get_current_thread_data()->cancel_enabled=true; + detail::get_current_thread_data()->cancel_enabled=true; } } restore_cancellation::~restore_cancellation() { - if(get_current_thread_data()) + if(detail::get_current_thread_data()) { - get_current_thread_data()->cancel_enabled=false; + detail::get_current_thread_data()->cancel_enabled=false; } } } diff --git a/test/test_condition.cpp b/test/test_condition.cpp index 4570f06e..dea55ead 100644 --- a/test/test_condition.cpp +++ b/test/test_condition.cpp @@ -103,7 +103,7 @@ void do_test_condition_notify_one() void test_condition_notify_one() { - timed_test(&do_test_condition_notify_one, 2, execution_monitor::use_mutex); + timed_test(&do_test_condition_notify_one, 100, execution_monitor::use_mutex); } void do_test_condition_notify_all() @@ -131,7 +131,7 @@ void test_condition_notify_all() // We should have already tested notify_one here, so // a timed test with the default execution_monitor::use_condition // should be OK, and gives the fastest performance - timed_test(&do_test_condition_notify_all, 3); + timed_test(&do_test_condition_notify_all, 100); } void do_test_condition_waits() @@ -189,6 +189,24 @@ void test_condition_waits() timed_test(&do_test_condition_waits, 12); } +void do_test_condition_wait_is_a_cancellation_point() +{ + condition_test_data data; + + boost::thread thread(bind(&condition_test_thread, &data)); + + thread.cancel(); + thread.join(); + BOOST_CHECK_EQUAL(data.awoken,0); +} + + +void test_condition_wait_is_a_cancellation_point() +{ + timed_test(&do_test_condition_wait_is_a_cancellation_point, 1); +} + + boost::unit_test_framework::test_suite* init_unit_test_suite(int, char*[]) { boost::unit_test_framework::test_suite* test = @@ -197,6 +215,7 @@ boost::unit_test_framework::test_suite* init_unit_test_suite(int, char*[]) test->add(BOOST_TEST_CASE(&test_condition_notify_one)); test->add(BOOST_TEST_CASE(&test_condition_notify_all)); test->add(BOOST_TEST_CASE(&test_condition_waits)); + test->add(BOOST_TEST_CASE(&test_condition_wait_is_a_cancellation_point)); return test; }