From 7de155b732bf2b7f2b70cb3ddee5a66b083015ef Mon Sep 17 00:00:00 2001 From: Christopher Kohlhoff Date: Thu, 8 Nov 2007 07:52:49 +0000 Subject: [PATCH] Fix timer stalls. [SVN r40924] --- .../boost/asio/detail/win_iocp_io_service.hpp | 106 +++++++++++------- 1 file changed, 65 insertions(+), 41 deletions(-) diff --git a/include/boost/asio/detail/win_iocp_io_service.hpp b/include/boost/asio/detail/win_iocp_io_service.hpp index f8536930..039052e8 100644 --- a/include/boost/asio/detail/win_iocp_io_service.hpp +++ b/include/boost/asio/detail/win_iocp_io_service.hpp @@ -306,7 +306,8 @@ public: { timer_interrupt_issued_ = true; lock.unlock(); - ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0); + ::PostQueuedCompletionStatus(iocp_.handle, + 0, steal_timer_dispatching, 0); } } } @@ -326,7 +327,8 @@ public: { timer_interrupt_issued_ = true; lock.unlock(); - ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0); + ::PostQueuedCompletionStatus(iocp_.handle, + 0, steal_timer_dispatching, 0); } return n; } @@ -337,19 +339,17 @@ private: // either 0 or 1). size_t do_one(bool block, boost::system::error_code& ec) { - bool doing_timers = false; + long this_thread_id = static_cast(::GetCurrentThreadId()); + for (;;) { - // Try to become the timer thread. - if (!doing_timers) - { - doing_timers = (InterlockedCompareExchange(&timer_thread_, - static_cast(GetCurrentThreadId()), 0) == 0); - } + // Try to acquire responsibility for dispatching timers. + bool dispatching_timers = (::InterlockedCompareExchange( + &timer_thread_, this_thread_id, 0) == 0); // Calculate timeout for GetQueuedCompletionStatus call. - DWORD timeout = 1000; - if (doing_timers) + DWORD timeout = max_timeout; + if (dispatching_timers) { boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); timer_interrupt_issued_ = false; @@ -370,35 +370,37 @@ private: DWORD last_error = ::GetLastError(); // Dispatch any pending timers. - if (doing_timers) + if (dispatching_timers) { boost::asio::detail::mutex::scoped_lock lock(timer_mutex_); + timer_queues_copy_ = timer_queues_; for (std::size_t i = 0; i < timer_queues_.size(); ++i) { timer_queues_[i]->dispatch_timers(); timer_queues_[i]->dispatch_cancellations(); + timer_queues_[i]->cleanup_timers(); } - - // Clean up timers. We must not hold the lock while cleaning up timers - // since the destructors may make calls back into this service. We make - // a copy of the vector of timer queues since the original may be - // modified while the lock is not held. - timer_queues_for_cleanup_ = timer_queues_; - lock.unlock(); - for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i) - timer_queues_for_cleanup_[i]->cleanup_timers(); } if (!ok && overlapped == 0) { if (block && last_error == WAIT_TIMEOUT) - continue; - - // Pass responsibility for timers to another thread. - if (doing_timers) { - ::InterlockedExchange(&timer_thread_, 0); - ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0); + // Relinquish responsibility for dispatching timers. + if (dispatching_timers) + { + ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id); + } + + continue; + } + + // Transfer responsibility for dispatching timers to another thread. + if (dispatching_timers && ::InterlockedCompareExchange( + &timer_thread_, 0, this_thread_id) == this_thread_id) + { + ::PostQueuedCompletionStatus(iocp_.handle, + 0, transfer_timer_dispatching, 0); } ec = boost::system::error_code(); @@ -412,11 +414,12 @@ private: last_error = completion_key; } - // Pass responsibility for timers to another thread. - if (doing_timers) + // Transfer responsibility for dispatching timers to another thread. + if (dispatching_timers && ::InterlockedCompareExchange( + &timer_thread_, 0, this_thread_id) == this_thread_id) { - ::InterlockedExchange(&timer_thread_, 0); - ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0); + ::PostQueuedCompletionStatus(iocp_.handle, + 0, transfer_timer_dispatching, 0); } // Ensure that the io_service does not exit due to running out of work @@ -430,9 +433,15 @@ private: ec = boost::system::error_code(); return 1; } - else if (completion_key == 1) + else if (completion_key == transfer_timer_dispatching) { - // Woken up to try to become the timer thread. + // Woken up to try to acquire responsibility for dispatching timers. + ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id); + } + else if (completion_key == steal_timer_dispatching) + { + // Woken up to steal responsibility for dispatching timers. + ::InterlockedExchange(&timer_thread_, 0); } else { @@ -440,10 +449,10 @@ private: // interrupts from a previous run invocation are ignored. if (::InterlockedExchangeAdd(&stopped_, 0) != 0) { - // Pass responsibility for timers to another thread. - if (doing_timers) + // Relinquish responsibility for dispatching timers. + if (dispatching_timers) { - ::InterlockedExchange(&timer_thread_, 0); + ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id); } // Wake up next thread that is blocked on GetQueuedCompletionStatus. @@ -477,10 +486,10 @@ private: DWORD get_timeout() { if (all_timer_queues_are_empty()) - return 1000; + return max_timeout; boost::posix_time::time_duration minimum_wait_duration - = boost::posix_time::seconds(1); + = boost::posix_time::milliseconds(max_timeout); for (std::size_t i = 0; i < timer_queues_.size(); ++i) { @@ -591,6 +600,20 @@ private: // Flag to indicate whether the service has been shut down. long shutdown_; + enum + { + // Maximum GetQueuedCompletionStatus timeout, in milliseconds. + max_timeout = 1000, + + // Completion key value to indicate that responsibility for dispatching + // timers is being cooperatively transferred from one thread to another. + transfer_timer_dispatching = 1, + + // Completion key value to indicate that responsibility for dispatching + // timers should be stolen from another thread. + steal_timer_dispatching = 2 + }; + // The thread that's currently in charge of dispatching timers. long timer_thread_; @@ -603,9 +626,10 @@ private: // The timer queues. std::vector timer_queues_; - // A copy of the timer queues, used when cleaning up timers. The copy is - // stored as a class data member to avoid unnecessary memory allocation. - std::vector timer_queues_for_cleanup_; + // A copy of the timer queues, used when dispatching, cancelling and cleaning + // up timers. The copy is stored as a class data member to avoid unnecessary + // memory allocation. + std::vector timer_queues_copy_; }; } // namespace detail