2
0
mirror of https://github.com/boostorg/asio.git synced 2026-02-26 02:42:08 +00:00

Fix timer stalls.

[SVN r40924]
This commit is contained in:
Christopher Kohlhoff
2007-11-08 07:52:49 +00:00
parent 1086746c7d
commit 7de155b732

View File

@@ -306,7 +306,8 @@ public:
{
timer_interrupt_issued_ = true;
lock.unlock();
::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
::PostQueuedCompletionStatus(iocp_.handle,
0, steal_timer_dispatching, 0);
}
}
}
@@ -326,7 +327,8 @@ public:
{
timer_interrupt_issued_ = true;
lock.unlock();
::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
::PostQueuedCompletionStatus(iocp_.handle,
0, steal_timer_dispatching, 0);
}
return n;
}
@@ -337,19 +339,17 @@ private:
// either 0 or 1).
size_t do_one(bool block, boost::system::error_code& ec)
{
bool doing_timers = false;
long this_thread_id = static_cast<long>(::GetCurrentThreadId());
for (;;)
{
// Try to become the timer thread.
if (!doing_timers)
{
doing_timers = (InterlockedCompareExchange(&timer_thread_,
static_cast<long>(GetCurrentThreadId()), 0) == 0);
}
// Try to acquire responsibility for dispatching timers.
bool dispatching_timers = (::InterlockedCompareExchange(
&timer_thread_, this_thread_id, 0) == 0);
// Calculate timeout for GetQueuedCompletionStatus call.
DWORD timeout = 1000;
if (doing_timers)
DWORD timeout = max_timeout;
if (dispatching_timers)
{
boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
timer_interrupt_issued_ = false;
@@ -370,35 +370,37 @@ private:
DWORD last_error = ::GetLastError();
// Dispatch any pending timers.
if (doing_timers)
if (dispatching_timers)
{
boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
timer_queues_copy_ = timer_queues_;
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
{
timer_queues_[i]->dispatch_timers();
timer_queues_[i]->dispatch_cancellations();
timer_queues_[i]->cleanup_timers();
}
// Clean up timers. We must not hold the lock while cleaning up timers
// since the destructors may make calls back into this service. We make
// a copy of the vector of timer queues since the original may be
// modified while the lock is not held.
timer_queues_for_cleanup_ = timer_queues_;
lock.unlock();
for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
timer_queues_for_cleanup_[i]->cleanup_timers();
}
if (!ok && overlapped == 0)
{
if (block && last_error == WAIT_TIMEOUT)
continue;
// Pass responsibility for timers to another thread.
if (doing_timers)
{
::InterlockedExchange(&timer_thread_, 0);
::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
// Relinquish responsibility for dispatching timers.
if (dispatching_timers)
{
::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
}
continue;
}
// Transfer responsibility for dispatching timers to another thread.
if (dispatching_timers && ::InterlockedCompareExchange(
&timer_thread_, 0, this_thread_id) == this_thread_id)
{
::PostQueuedCompletionStatus(iocp_.handle,
0, transfer_timer_dispatching, 0);
}
ec = boost::system::error_code();
@@ -412,11 +414,12 @@ private:
last_error = completion_key;
}
// Pass responsibility for timers to another thread.
if (doing_timers)
// Transfer responsibility for dispatching timers to another thread.
if (dispatching_timers && ::InterlockedCompareExchange(
&timer_thread_, 0, this_thread_id) == this_thread_id)
{
::InterlockedExchange(&timer_thread_, 0);
::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
::PostQueuedCompletionStatus(iocp_.handle,
0, transfer_timer_dispatching, 0);
}
// Ensure that the io_service does not exit due to running out of work
@@ -430,9 +433,15 @@ private:
ec = boost::system::error_code();
return 1;
}
else if (completion_key == 1)
else if (completion_key == transfer_timer_dispatching)
{
// Woken up to try to become the timer thread.
// Woken up to try to acquire responsibility for dispatching timers.
::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
}
else if (completion_key == steal_timer_dispatching)
{
// Woken up to steal responsibility for dispatching timers.
::InterlockedExchange(&timer_thread_, 0);
}
else
{
@@ -440,10 +449,10 @@ private:
// interrupts from a previous run invocation are ignored.
if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
{
// Pass responsibility for timers to another thread.
if (doing_timers)
// Relinquish responsibility for dispatching timers.
if (dispatching_timers)
{
::InterlockedExchange(&timer_thread_, 0);
::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
}
// Wake up next thread that is blocked on GetQueuedCompletionStatus.
@@ -477,10 +486,10 @@ private:
DWORD get_timeout()
{
if (all_timer_queues_are_empty())
return 1000;
return max_timeout;
boost::posix_time::time_duration minimum_wait_duration
= boost::posix_time::seconds(1);
= boost::posix_time::milliseconds(max_timeout);
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
{
@@ -591,6 +600,20 @@ private:
// Flag to indicate whether the service has been shut down.
long shutdown_;
enum
{
// Maximum GetQueuedCompletionStatus timeout, in milliseconds.
max_timeout = 1000,
// Completion key value to indicate that responsibility for dispatching
// timers is being cooperatively transferred from one thread to another.
transfer_timer_dispatching = 1,
// Completion key value to indicate that responsibility for dispatching
// timers should be stolen from another thread.
steal_timer_dispatching = 2
};
// The thread that's currently in charge of dispatching timers.
long timer_thread_;
@@ -603,9 +626,10 @@ private:
// The timer queues.
std::vector<timer_queue_base*> timer_queues_;
// A copy of the timer queues, used when cleaning up timers. The copy is
// stored as a class data member to avoid unnecessary memory allocation.
std::vector<timer_queue_base*> timer_queues_for_cleanup_;
// A copy of the timer queues, used when dispatching, cancelling and cleaning
// up timers. The copy is stored as a class data member to avoid unnecessary
// memory allocation.
std::vector<timer_queue_base*> timer_queues_copy_;
};
} // namespace detail