2
0
mirror of https://github.com/boostorg/thread.git synced 2026-02-03 09:42:16 +00:00

New condvar implementation that doesn't depend on APC calls, so OS can pick thread to wake

[SVN r37188]
This commit is contained in:
Anthony Williams
2007-03-15 13:39:59 +00:00
parent 89348a9569
commit d0d1db2feb

View File

@@ -1,181 +1,225 @@
// (C) Copyright 2005-6 Anthony Williams
// Copyright 2006 Roland Schwarz.
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_THREAD_RS06041001_HPP
#define BOOST_THREAD_RS06041001_HPP
#include <boost/thread/win32/config.hpp>
#include <boost/detail/interlocked.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/xtime.hpp>
#include <boost/thread/win32/thread_primitives.hpp>
#include <boost/thread/win32/xtime_utils.hpp>
#include <boost/thread/win32/interlocked_read.hpp>
#include <boost/assert.hpp>
#include <boost/utility.hpp>
#ifndef CONDITION_HPP
#define CONDITION_HPP
#include "boost/config.hpp"
#include "boost/thread/mutex.hpp"
#include "boost/thread/win32/thread_primitives.hpp"
#include "boost/thread/win32/xtime.hpp"
#include "boost/thread/win32/xtime_utils.hpp"
#include <limits.h>
#include "boost/assert.hpp"
#include <algorithm>
namespace boost
{
template<typename lockable_type>
class basic_condition:
noncopyable
template<typename lock_type>
class basic_condition
{
private:
struct waiting_list_entry
{
void* waiting_thread_handle;
waiting_list_entry* next;
waiting_list_entry* previous;
long notified;
boost::mutex internal_mutex;
void unlink()
{
next->previous=previous;
previous->next=next;
next=this;
previous=this;
}
struct list_entry
{
detail::win32::handle semaphore;
long count;
bool notified;
};
typedef ::boost::mutex gate_type;
gate_type state_change_gate;
typedef gate_type::scoped_lock gate_scoped_lock;
waiting_list_entry waiting_list;
BOOST_STATIC_CONSTANT(unsigned,generation_count=10);
struct add_entry_to_list
list_entry generations[generation_count];
detail::win32::handle wake_sem;
static bool no_waiters(list_entry const& entry)
{
basic_condition* self;
waiting_list_entry& entry;
lockable_type& m;
return entry.count==0;
}
add_entry_to_list(basic_condition* self_,waiting_list_entry& entry_,lockable_type& m_):
self(self_),entry(entry_),m(m_)
void shift_generations_down()
{
if(std::remove_if(generations,generations+generation_count,no_waiters)==generations+generation_count)
{
entry.previous=&self->waiting_list;
gate_scoped_lock lock(self->state_change_gate);
entry.next=self->waiting_list.next;
self->waiting_list.next=&entry;
entry.next->previous=&entry;
m.unlock();
broadcast_entry(generations[generation_count-1],false);
}
~add_entry_to_list()
std::copy_backward(generations,generations+generation_count,generations+generation_count);
generations[0].semaphore=0;
generations[0].count=0;
generations[0].notified=false;
}
void broadcast_entry(list_entry& entry,bool wake)
{
if(wake)
{
if(!entry.notified)
{
gate_scoped_lock lock(self->state_change_gate);
if(!entry.notified)
{
entry.unlink();
}
}
detail::win32::CloseHandle(entry.waiting_thread_handle);
m.lock();
detail::win32::ReleaseSemaphore(wake_sem,entry.count,NULL);
}
};
detail::win32::ReleaseSemaphore(entry.semaphore,entry.count,NULL);
entry.count=0;
dispose_entry(entry);
}
bool do_wait(lockable_type& m,boost::xtime const& target=::boost::detail::get_xtime_sentinel())
void dispose_entry(list_entry& entry)
{
waiting_list_entry entry={0};
void* const currentProcess=detail::win32::GetCurrentProcess();
BOOST_ASSERT(entry.count==0);
if(entry.semaphore)
{
unsigned long const close_result=detail::win32::CloseHandle(entry.semaphore);
BOOST_ASSERT(close_result);
}
entry.semaphore=0;
entry.notified=false;
}
detail::win32::handle duplicate_handle(detail::win32::handle source)
{
detail::win32::handle const current_process=detail::win32::GetCurrentProcess();
long const same_access_flag=2;
bool const success=detail::win32::DuplicateHandle(currentProcess,detail::win32::GetCurrentThread(),currentProcess,&entry.waiting_thread_handle,0,false,same_access_flag)!=0;
detail::win32::handle new_handle=0;
bool const success=detail::win32::DuplicateHandle(current_process,source,current_process,&new_handle,0,false,same_access_flag)!=0;
BOOST_ASSERT(success);
{
add_entry_to_list list_guard(this,entry,m);
unsigned const woken_due_to_apc=0xc0;
while(!::boost::detail::interlocked_read(&entry.notified) &&
detail::win32::SleepEx(::boost::detail::get_milliseconds_until_time(target),true)==woken_due_to_apc);
}
return ::boost::detail::interlocked_read(&entry.notified)!=0;
return new_handle;
}
static void __stdcall notify_function(detail::win32::ulong_ptr)
bool do_wait(lock_type& lock,::boost::xtime const& target_time)
{
}
void notify_entry(waiting_list_entry * entry)
{
BOOST_INTERLOCKED_EXCHANGE(&entry->notified,true);
if(entry->waiting_thread_handle)
detail::win32::handle local_wake_sem;
detail::win32::handle sem;
bool first_loop=true;
bool woken=false;
while(!woken)
{
detail::win32::QueueUserAPC(notify_function,entry->waiting_thread_handle,0);
}
}
{
boost::mutex::scoped_lock internal_lock(internal_mutex);
if(first_loop)
{
lock.unlock();
if(!wake_sem)
{
wake_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
BOOST_ASSERT(wake_sem);
}
local_wake_sem=duplicate_handle(wake_sem);
if(generations[0].notified)
{
shift_generations_down();
}
if(!generations[0].semaphore)
{
generations[0].semaphore=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
BOOST_ASSERT(generations[0].semaphore);
}
first_loop=false;
}
++generations[0].count;
sem=duplicate_handle(generations[0].semaphore);
}
unsigned long const notified=detail::win32::WaitForSingleObject(sem,::boost::detail::get_milliseconds_until_time(target_time));
BOOST_ASSERT(notified==detail::win32::timeout || notified==0);
unsigned long const sem_close_result=detail::win32::CloseHandle(sem);
BOOST_ASSERT(sem_close_result);
if(notified==detail::win32::timeout)
{
break;
}
unsigned long const woken_result=detail::win32::WaitForSingleObject(local_wake_sem,0);
BOOST_ASSERT(woken_result==detail::win32::timeout || woken_result==0);
woken=(woken_result==0);
}
unsigned long const wake_sem_close_result=detail::win32::CloseHandle(local_wake_sem);
BOOST_ASSERT(wake_sem_close_result);
lock.lock();
return woken;
}
public:
basic_condition()
basic_condition():
wake_sem(0)
{
waiting_list.next=&waiting_list;
waiting_list.previous=&waiting_list;
for(unsigned i=0;i<generation_count;++i)
{
generations[i]=list_entry();
}
}
~basic_condition()
{
for(unsigned i=0;i<generation_count;++i)
{
dispose_entry(generations[i]);
}
detail::win32::CloseHandle(wake_sem);
}
void wait(lock_type& m)
{
do_wait(m,::boost::detail::get_xtime_sentinel());
}
template<typename predicate_type>
void wait(lock_type& m,predicate_type pred)
{
while(!pred()) wait(m);
}
bool timed_wait(lock_type& m,::boost::xtime const& target_time)
{
return do_wait(m,target_time);
}
template<typename predicate_type>
bool timed_wait(lock_type& m,::boost::xtime const& target_time,predicate_type pred)
{
while (!pred()) { if (!timed_wait(m, target_time)) return false; } return true;
}
void notify_one()
{
gate_scoped_lock lock(state_change_gate);
if(waiting_list.previous!=&waiting_list)
boost::mutex::scoped_lock internal_lock(internal_mutex);
if(wake_sem)
{
waiting_list_entry* const entry=waiting_list.previous;
entry->unlink();
notify_entry(entry);
detail::win32::ReleaseSemaphore(wake_sem,1,NULL);
for(unsigned generation=generation_count;generation!=0;--generation)
{
list_entry& entry=generations[generation-1];
if(entry.count)
{
entry.notified=true;
detail::win32::ReleaseSemaphore(entry.semaphore,1,NULL);
if(!--entry.count)
{
dispose_entry(entry);
}
}
}
}
}
void notify_all()
{
gate_scoped_lock lock(state_change_gate);
waiting_list_entry* head=waiting_list.previous;
waiting_list.previous=&waiting_list;
waiting_list.next=&waiting_list;
while(head!=&waiting_list)
boost::mutex::scoped_lock internal_lock(internal_mutex);
if(wake_sem)
{
waiting_list_entry* const previous=head->previous;
notify_entry(head);
head=previous;
for(unsigned generation=generation_count;generation!=0;--generation)
{
list_entry& entry=generations[generation-1];
if(entry.count)
{
broadcast_entry(entry,true);
}
}
}
}
void wait(lockable_type& m)
{
do_wait(m);
}
template<typename predicate_type>
void wait(lockable_type& m,predicate_type pred)
{
while(!pred()) do_wait(m);
}
bool timed_wait(lockable_type& m,const xtime& xt)
{
return do_wait(m,xt);
}
template<typename predicate_type>
bool timed_wait(lockable_type& m,const xtime& xt,predicate_type pred)
{
while (!pred())
{
if (!timed_wait(m, xt)) return false;
}
return true;
}
};
class condition:
public basic_condition<boost::mutex::scoped_lock>
{};
typedef basic_condition<boost::mutex::scoped_lock> condition;
}
#endif // BOOST_THREAD_RS06041001_HPP
#endif