From d0d1db2feb7cddc82ee8779bdb0556c84be79734 Mon Sep 17 00:00:00 2001 From: Anthony Williams Date: Thu, 15 Mar 2007 13:39:59 +0000 Subject: [PATCH] New condvar implementation that doesn't depend on APC calls, so OS can pick thread to wake [SVN r37188] --- include/boost/thread/win32/condition.hpp | 320 +++++++++++++---------- 1 file changed, 182 insertions(+), 138 deletions(-) diff --git a/include/boost/thread/win32/condition.hpp b/include/boost/thread/win32/condition.hpp index 199236c4..ac68ae35 100644 --- a/include/boost/thread/win32/condition.hpp +++ b/include/boost/thread/win32/condition.hpp @@ -1,181 +1,225 @@ -// (C) Copyright 2005-6 Anthony Williams -// Copyright 2006 Roland Schwarz. -// Distributed under the Boost Software License, Version 1.0. (See -// accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#ifndef BOOST_THREAD_RS06041001_HPP -#define BOOST_THREAD_RS06041001_HPP - -#include - -#include -#include -#include -#include -#include -#include -#include -#include +#ifndef CONDITION_HPP +#define CONDITION_HPP +#include "boost/config.hpp" +#include "boost/thread/mutex.hpp" +#include "boost/thread/win32/thread_primitives.hpp" +#include "boost/thread/win32/xtime.hpp" +#include "boost/thread/win32/xtime_utils.hpp" +#include +#include "boost/assert.hpp" +#include namespace boost { - template - class basic_condition: - noncopyable + template + class basic_condition { - private: - struct waiting_list_entry - { - void* waiting_thread_handle; - waiting_list_entry* next; - waiting_list_entry* previous; - long notified; + boost::mutex internal_mutex; - void unlink() - { - next->previous=previous; - previous->next=next; - next=this; - previous=this; - } + struct list_entry + { + detail::win32::handle semaphore; + long count; + bool notified; }; - typedef ::boost::mutex gate_type; - gate_type state_change_gate; - typedef gate_type::scoped_lock gate_scoped_lock; - waiting_list_entry waiting_list; + BOOST_STATIC_CONSTANT(unsigned,generation_count=10); - struct add_entry_to_list + list_entry generations[generation_count]; + detail::win32::handle wake_sem; + + static bool no_waiters(list_entry const& entry) { - basic_condition* self; - waiting_list_entry& entry; - lockable_type& m; + return entry.count==0; + } - add_entry_to_list(basic_condition* self_,waiting_list_entry& entry_,lockable_type& m_): - self(self_),entry(entry_),m(m_) + void shift_generations_down() + { + if(std::remove_if(generations,generations+generation_count,no_waiters)==generations+generation_count) { - entry.previous=&self->waiting_list; - gate_scoped_lock lock(self->state_change_gate); - - entry.next=self->waiting_list.next; - self->waiting_list.next=&entry; - entry.next->previous=&entry; - - m.unlock(); + broadcast_entry(generations[generation_count-1],false); } - ~add_entry_to_list() + std::copy_backward(generations,generations+generation_count,generations+generation_count); + generations[0].semaphore=0; + generations[0].count=0; + generations[0].notified=false; + } + + void broadcast_entry(list_entry& entry,bool wake) + { + if(wake) { - if(!entry.notified) - { - gate_scoped_lock lock(self->state_change_gate); - - if(!entry.notified) - { - entry.unlink(); - } - } - detail::win32::CloseHandle(entry.waiting_thread_handle); - m.lock(); + detail::win32::ReleaseSemaphore(wake_sem,entry.count,NULL); } - }; + detail::win32::ReleaseSemaphore(entry.semaphore,entry.count,NULL); + entry.count=0; + dispose_entry(entry); + } - bool do_wait(lockable_type& m,boost::xtime const& target=::boost::detail::get_xtime_sentinel()) + void dispose_entry(list_entry& entry) { - waiting_list_entry entry={0}; - void* const currentProcess=detail::win32::GetCurrentProcess(); + BOOST_ASSERT(entry.count==0); + if(entry.semaphore) + { + unsigned long const close_result=detail::win32::CloseHandle(entry.semaphore); + BOOST_ASSERT(close_result); + } + entry.semaphore=0; + entry.notified=false; + } + + detail::win32::handle duplicate_handle(detail::win32::handle source) + { + detail::win32::handle const current_process=detail::win32::GetCurrentProcess(); long const same_access_flag=2; - bool const success=detail::win32::DuplicateHandle(currentProcess,detail::win32::GetCurrentThread(),currentProcess,&entry.waiting_thread_handle,0,false,same_access_flag)!=0; + detail::win32::handle new_handle=0; + bool const success=detail::win32::DuplicateHandle(current_process,source,current_process,&new_handle,0,false,same_access_flag)!=0; BOOST_ASSERT(success); - - { - add_entry_to_list list_guard(this,entry,m); - - unsigned const woken_due_to_apc=0xc0; - while(!::boost::detail::interlocked_read(&entry.notified) && - detail::win32::SleepEx(::boost::detail::get_milliseconds_until_time(target),true)==woken_due_to_apc); - } - - return ::boost::detail::interlocked_read(&entry.notified)!=0; + return new_handle; } - static void __stdcall notify_function(detail::win32::ulong_ptr) + bool do_wait(lock_type& lock,::boost::xtime const& target_time) { - } - - void notify_entry(waiting_list_entry * entry) - { - BOOST_INTERLOCKED_EXCHANGE(&entry->notified,true); - if(entry->waiting_thread_handle) + detail::win32::handle local_wake_sem; + detail::win32::handle sem; + bool first_loop=true; + bool woken=false; + while(!woken) { - detail::win32::QueueUserAPC(notify_function,entry->waiting_thread_handle,0); - } - } + { + boost::mutex::scoped_lock internal_lock(internal_mutex); + if(first_loop) + { + lock.unlock(); + if(!wake_sem) + { + wake_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX); + BOOST_ASSERT(wake_sem); + } + local_wake_sem=duplicate_handle(wake_sem); + + if(generations[0].notified) + { + shift_generations_down(); + } + if(!generations[0].semaphore) + { + generations[0].semaphore=detail::win32::create_anonymous_semaphore(0,LONG_MAX); + BOOST_ASSERT(generations[0].semaphore); + } + first_loop=false; + } + ++generations[0].count; + sem=duplicate_handle(generations[0].semaphore); + } + unsigned long const notified=detail::win32::WaitForSingleObject(sem,::boost::detail::get_milliseconds_until_time(target_time)); + BOOST_ASSERT(notified==detail::win32::timeout || notified==0); + unsigned long const sem_close_result=detail::win32::CloseHandle(sem); + BOOST_ASSERT(sem_close_result); + + if(notified==detail::win32::timeout) + { + break; + } + + unsigned long const woken_result=detail::win32::WaitForSingleObject(local_wake_sem,0); + BOOST_ASSERT(woken_result==detail::win32::timeout || woken_result==0); + + woken=(woken_result==0); + } + unsigned long const wake_sem_close_result=detail::win32::CloseHandle(local_wake_sem); + BOOST_ASSERT(wake_sem_close_result); + lock.lock(); + return woken; + } + public: - basic_condition() + basic_condition(): + wake_sem(0) { - waiting_list.next=&waiting_list; - waiting_list.previous=&waiting_list; + for(unsigned i=0;i + void wait(lock_type& m,predicate_type pred) + { + while(!pred()) wait(m); + } + + + bool timed_wait(lock_type& m,::boost::xtime const& target_time) + { + return do_wait(m,target_time); + } + + template + bool timed_wait(lock_type& m,::boost::xtime const& target_time,predicate_type pred) + { + while (!pred()) { if (!timed_wait(m, target_time)) return false; } return true; + } + void notify_one() { - gate_scoped_lock lock(state_change_gate); - if(waiting_list.previous!=&waiting_list) + boost::mutex::scoped_lock internal_lock(internal_mutex); + if(wake_sem) { - waiting_list_entry* const entry=waiting_list.previous; - entry->unlink(); - notify_entry(entry); + detail::win32::ReleaseSemaphore(wake_sem,1,NULL); + for(unsigned generation=generation_count;generation!=0;--generation) + { + list_entry& entry=generations[generation-1]; + if(entry.count) + { + entry.notified=true; + detail::win32::ReleaseSemaphore(entry.semaphore,1,NULL); + if(!--entry.count) + { + dispose_entry(entry); + } + } + } } } void notify_all() { - gate_scoped_lock lock(state_change_gate); - waiting_list_entry* head=waiting_list.previous; - waiting_list.previous=&waiting_list; - waiting_list.next=&waiting_list; - while(head!=&waiting_list) + boost::mutex::scoped_lock internal_lock(internal_mutex); + if(wake_sem) { - waiting_list_entry* const previous=head->previous; - notify_entry(head); - head=previous; + for(unsigned generation=generation_count;generation!=0;--generation) + { + list_entry& entry=generations[generation-1]; + if(entry.count) + { + broadcast_entry(entry,true); + } + } } } - - void wait(lockable_type& m) - { - do_wait(m); - } - - template - void wait(lockable_type& m,predicate_type pred) - { - while(!pred()) do_wait(m); - } - - bool timed_wait(lockable_type& m,const xtime& xt) - { - return do_wait(m,xt); - } - - template - bool timed_wait(lockable_type& m,const xtime& xt,predicate_type pred) - { - while (!pred()) - { - if (!timed_wait(m, xt)) return false; - } - return true; - } + }; - - class condition: - public basic_condition - {}; + + typedef basic_condition condition; } -#endif // BOOST_THREAD_RS06041001_HPP +#endif