mirror of
https://github.com/boostorg/fiber.git
synced 2026-02-12 12:02:54 +00:00
condition uses notify interface for notifications
This commit is contained in:
@@ -21,7 +21,7 @@
|
||||
#include <boost/utility.hpp>
|
||||
|
||||
#include <boost/fiber/detail/config.hpp>
|
||||
#include <boost/fiber/detail/fiber_base.hpp>
|
||||
#include <boost/fiber/detail/notify.hpp>
|
||||
#include <boost/fiber/detail/scheduler.hpp>
|
||||
#include <boost/fiber/detail/spinlock.hpp>
|
||||
#include <boost/fiber/exceptions.hpp>
|
||||
@@ -38,24 +38,16 @@
|
||||
# pragma warning(disable:4355 4251 4275)
|
||||
# endif
|
||||
|
||||
#include <cstdio>
|
||||
|
||||
namespace boost {
|
||||
namespace fibers {
|
||||
|
||||
class BOOST_FIBERS_DECL condition : private noncopyable
|
||||
{
|
||||
private:
|
||||
enum command
|
||||
{
|
||||
SLEEPING = 0,
|
||||
NOTIFY_ONE,
|
||||
NOTIFY_ALL
|
||||
};
|
||||
|
||||
atomic< command > cmd_;
|
||||
atomic< std::size_t > waiters_;
|
||||
mutex enter_mtx_;
|
||||
detail::spinlock waiting_mtx_;
|
||||
std::deque< detail::fiber_base::ptr_t > waiting_;
|
||||
detail::spinlock waiting_mtx_;
|
||||
std::deque< detail::notify::ptr_t > waiting_;
|
||||
|
||||
public:
|
||||
condition();
|
||||
@@ -76,249 +68,53 @@ public:
|
||||
template< typename LockType >
|
||||
void wait( LockType & lt)
|
||||
{
|
||||
detail::fiber_base::ptr_t f( detail::scheduler::instance().active() );
|
||||
detail::notify::ptr_t n( detail::scheduler::instance().active() );
|
||||
try
|
||||
{
|
||||
unique_lock< mutex > lk( enter_mtx_);
|
||||
BOOST_ASSERT( lk);
|
||||
if ( n)
|
||||
{
|
||||
// store this fiber in order to be notified later
|
||||
unique_lock< detail::spinlock > lk( waiting_mtx_);
|
||||
BOOST_ASSERT( lk);
|
||||
waiting_.push_back( f);
|
||||
++waiters_;
|
||||
waiting_.push_back( n);
|
||||
lt.unlock();
|
||||
|
||||
// suspend fiber
|
||||
detail::scheduler::instance().wait( lk);
|
||||
|
||||
// check if fiber was interrupted
|
||||
this_fiber::interruption_point();
|
||||
}
|
||||
lt.unlock();
|
||||
}
|
||||
|
||||
bool unlock_enter_mtx = false;
|
||||
|
||||
//Loop until a notification indicates that the fiber should exit
|
||||
for (;;)
|
||||
{
|
||||
//The fiber sleeps/spins until a spin_condition commands a notification
|
||||
//Notification occurred, we will lock the checking mutex so that
|
||||
while ( SLEEPING == cmd_)
|
||||
{
|
||||
try
|
||||
{
|
||||
if ( f)
|
||||
{
|
||||
// check if fiber was interrupted
|
||||
this_fiber::interruption_point();
|
||||
|
||||
detail::scheduler::instance().wait();
|
||||
|
||||
// check if fiber was interrupted
|
||||
this_fiber::interruption_point();
|
||||
}
|
||||
else
|
||||
{
|
||||
// run scheduler
|
||||
detail::scheduler::instance().run();
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// FIXME: use multi-index container
|
||||
// remove fiber from waiting_
|
||||
unique_lock< detail::spinlock > lk( waiting_mtx_);
|
||||
waiting_.erase(
|
||||
std::find( waiting_.begin(), waiting_.end(), f) );
|
||||
--waiters_;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
command expected = NOTIFY_ONE;
|
||||
cmd_.compare_exchange_strong( expected, SLEEPING);
|
||||
if ( SLEEPING == expected)
|
||||
//Other fiber has been notified and since it was a NOTIFY one
|
||||
//command, this fiber must sleep again
|
||||
continue;
|
||||
else if ( NOTIFY_ONE == expected)
|
||||
{
|
||||
//If it was a NOTIFY_ONE command, only this fiber should
|
||||
//exit. This fiber has atomically marked command as sleep before
|
||||
//so no other fiber will exit.
|
||||
//Decrement wait count.
|
||||
--waiters_;
|
||||
unlock_enter_mtx = true;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
//If it is a NOTIFY_ALL command, all fibers should return
|
||||
//from do_timed_wait function. Decrement wait count.
|
||||
// notifier for main-fiber
|
||||
n = detail::scheduler::instance().notifier();
|
||||
// store this fiber in order to be notified later
|
||||
unique_lock< detail::spinlock > lk( waiting_mtx_);
|
||||
unlock_enter_mtx = 0 == --waiters_;
|
||||
waiting_.push_back( n);
|
||||
lt.unlock();
|
||||
|
||||
lk.unlock();
|
||||
//Check if this is the last fiber of notify_all waiters
|
||||
//Only the last fiber will release the mutex
|
||||
if ( unlock_enter_mtx)
|
||||
{
|
||||
expected = NOTIFY_ALL;
|
||||
cmd_.compare_exchange_strong( expected, SLEEPING);
|
||||
}
|
||||
break;
|
||||
while ( ! n->woken_up() )
|
||||
{
|
||||
fprintf(stdout, "condition: main-fiber not woken-up\n");
|
||||
// run scheduler
|
||||
detail::scheduler::instance().run();
|
||||
}
|
||||
fprintf(stdout, "condition: main-fiber woken-up\n");
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// remove fiber from waiting_
|
||||
unique_lock< detail::spinlock > lk( waiting_mtx_);
|
||||
waiting_.erase(
|
||||
std::find( waiting_.begin(), waiting_.end(), n) );
|
||||
throw;
|
||||
}
|
||||
|
||||
//Unlock the enter mutex if it is a single notification, if this is
|
||||
//the last notified fiber in a notify_all or a timeout has occurred
|
||||
if ( unlock_enter_mtx)
|
||||
enter_mtx_.unlock();
|
||||
|
||||
//Lock external again before returning from the method
|
||||
// lock external again before returning
|
||||
lt.lock();
|
||||
}
|
||||
#if 0
|
||||
template< typename LockType, typename TimeDuration >
|
||||
bool timed_wait( LockType & lt, TimeDuration const& dt)
|
||||
{ return timed_wait( lt, chrono::system_clock::now() + dt); }
|
||||
|
||||
template< typename LockType, typename TimeDuration, typename Pred >
|
||||
bool timed_wait( LockType & lt, TimeDuration const& dt, Pred pred)
|
||||
{ return timed_wait( lt, chrono::system_clock::now() + dt, pred); }
|
||||
|
||||
template< typename LockType, typename Pred >
|
||||
bool timed_wait( LockType & lt, chrono::system_clock::time_point const& abs_time, Pred pred)
|
||||
{
|
||||
while ( ! pred() )
|
||||
if ( ! timed_wait( lt, abs_time) )
|
||||
return pred();
|
||||
return true;
|
||||
}
|
||||
|
||||
template< typename LockType >
|
||||
bool timed_wait( LockType & lt, chrono::system_clock::time_point const& abs_time)
|
||||
{
|
||||
if ( (chrono::system_clock::time_point::max)() == abs_time){
|
||||
wait( lt);
|
||||
return true;
|
||||
}
|
||||
chrono::system_clock::time_point now( chrono::system_clock::now() );
|
||||
if ( now >= abs_time) return false;
|
||||
|
||||
{
|
||||
mutex::scoped_lock lk( enter_mtx_); // FIXME: abs_time!
|
||||
BOOST_ASSERT( lk);
|
||||
++waiters_;
|
||||
lt.unlock();
|
||||
}
|
||||
|
||||
bool unlock_enter_mtx = false, timed_out = false;
|
||||
//Loop until a notification indicates that the fiber should
|
||||
//exit or timeout occurs
|
||||
for (;;)
|
||||
{
|
||||
//The fiber sleeps/spins until a spin_condition commands a notification
|
||||
//Notification occurred, we will lock the checking mutex so that
|
||||
while ( SLEEPING == cmd_)
|
||||
{
|
||||
detail::fiber_base::ptr_t f( detail::scheduler::instance().active() );
|
||||
try
|
||||
{
|
||||
if ( f)
|
||||
{
|
||||
// check if fiber was interrupted
|
||||
this_fiber::interruption_point();
|
||||
|
||||
this_fiber::yield();
|
||||
|
||||
// check if fiber was interrupted
|
||||
this_fiber::interruption_point();
|
||||
}
|
||||
else
|
||||
{
|
||||
// store a dummy fiber
|
||||
unique_lock< detail::spinlock > lk( waiting_mtx_);
|
||||
waiting_.push_back( f);
|
||||
// run scheduler
|
||||
run();
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
--waiters_;
|
||||
// FIXME: use multi-index container
|
||||
// remove fiber from waiting_
|
||||
unique_lock< detail::spinlock > lk( waiting_mtx_);
|
||||
waiting_.erase(
|
||||
std::find( waiting_.ebgin(), waiting_.end(), f) );
|
||||
throw;
|
||||
}
|
||||
|
||||
now = chrono::system_clock::now();
|
||||
if ( now >= abs_time)
|
||||
{
|
||||
//If we can lock the mutex it means that no notification
|
||||
//is being executed in this spin_condition variable
|
||||
timed_out = enter_mtx_.try_lock();
|
||||
|
||||
//If locking fails, indicates that another fiber is executing
|
||||
//notification, so we play the notification game
|
||||
if ( ! timed_out)
|
||||
//There is an ongoing notification, we will try again later
|
||||
continue;
|
||||
|
||||
//No notification in execution, since enter mutex is locked.
|
||||
//We will execute time-out logic, so we will decrement count,
|
||||
//release the enter mutex and return false.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//If a timeout occurred, the mutex will not execute checking logic
|
||||
if ( timed_out)
|
||||
{
|
||||
unlock_enter_mtx = true;
|
||||
--waiters_;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
command expected = NOTIFY_ONE;
|
||||
cmd_.compare_exchange_strong( expected, SLEEPING);
|
||||
if ( SLEEPING == expected)
|
||||
//Other fiber has been notified and since it was a NOTIFY one
|
||||
//command, this fiber must sleep again
|
||||
continue;
|
||||
else if ( NOTIFY_ONE == expected)
|
||||
{
|
||||
//If it was a NOTIFY_ONE command, only this fiber should
|
||||
//exit. This fiber has atomically marked command as sleep before
|
||||
//so no other fiber will exit.
|
||||
//Decrement wait count.
|
||||
unlock_enter_mtx = true;
|
||||
--waiters_;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
//If it is a NOTIFY_ALL command, all fibers should return
|
||||
//from do_timed_wait function. Decrement wait count.
|
||||
unlock_enter_mtx = 0 == --waiters_;
|
||||
//Check if this is the last fiber of notify_all waiters
|
||||
//Only the last fiber will release the mutex
|
||||
if ( unlock_enter_mtx)
|
||||
{
|
||||
expected = NOTIFY_ALL;
|
||||
cmd_.compare_exchange_strong( expected, SLEEPING);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Unlock the enter mutex if it is a single notification, if this is
|
||||
//the last notified fiber in a notify_all or a timeout has occurred
|
||||
if ( unlock_enter_mtx)
|
||||
enter_mtx_.unlock();
|
||||
|
||||
//Lock external again before returning from the method
|
||||
lt.lock();
|
||||
return ! timed_out;
|
||||
}
|
||||
#endif
|
||||
};
|
||||
|
||||
typedef condition condition_variable;
|
||||
|
||||
@@ -22,9 +22,6 @@ namespace boost {
|
||||
namespace fibers {
|
||||
|
||||
condition::condition() :
|
||||
cmd_( SLEEPING),
|
||||
waiters_( 0),
|
||||
enter_mtx_(),
|
||||
waiting_mtx_(),
|
||||
waiting_()
|
||||
{}
|
||||
@@ -35,68 +32,31 @@ condition::~condition()
|
||||
void
|
||||
condition::notify_one()
|
||||
{
|
||||
// This mutex guarantees that no other thread can enter to the
|
||||
// wait method logic, so that thread count will be
|
||||
// constant until the function writes a NOTIFY_ONE command.
|
||||
// It also guarantees that no other notification can be signaled
|
||||
// on this condition before this one ends
|
||||
enter_mtx_.lock();
|
||||
detail::notify::ptr_t n;
|
||||
|
||||
// Return if there are no waiters
|
||||
if ( waiting_.empty() )
|
||||
{
|
||||
BOOST_ASSERT( 0 == waiters_);
|
||||
enter_mtx_.unlock();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
if ( waiting_.front() )
|
||||
waiting_.front()->wake_up();
|
||||
unique_lock< detail::spinlock > lk( waiting_mtx_);
|
||||
if ( ! waiting_.empty() ) {
|
||||
n.swap( waiting_.front() );
|
||||
waiting_.pop_front();
|
||||
}
|
||||
lk.unlock();
|
||||
|
||||
// Notify that all fibers should execute wait logic
|
||||
command expected = SLEEPING;
|
||||
while ( ! cmd_.compare_exchange_strong( expected, NOTIFY_ONE) )
|
||||
{
|
||||
// if ( this_fiber::is_fiberized() )
|
||||
// this_fiber::yield();
|
||||
expected = SLEEPING;
|
||||
}
|
||||
if ( n)
|
||||
n->wake_up();
|
||||
}
|
||||
|
||||
void
|
||||
condition::notify_all()
|
||||
{
|
||||
// This mutex guarantees that no other thread can enter to the
|
||||
// wait method logic, so that thread count will be
|
||||
// constant until the function writes a NOTIFY_ALL command.
|
||||
// It also guarantees that no other notification can be signaled
|
||||
// on this condition before this one ends
|
||||
enter_mtx_.lock();
|
||||
std::deque< detail::notify::ptr_t > waiting;
|
||||
|
||||
// Return if there are no waiters
|
||||
if ( waiting_.empty() )
|
||||
{
|
||||
BOOST_ASSERT( 0 == waiters_);
|
||||
enter_mtx_.unlock();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
BOOST_FOREACH( detail::fiber_base::ptr_t const& f, waiting_)
|
||||
{ if ( f) f->wake_up(); }
|
||||
waiting_.clear();
|
||||
}
|
||||
unique_lock< detail::spinlock > lk( waiting_mtx_);
|
||||
waiting.swap( waiting_);
|
||||
lk.unlock();
|
||||
|
||||
// Notify that all fibers should execute wait logic
|
||||
command expected = SLEEPING;
|
||||
while ( SLEEPING != cmd_.compare_exchange_strong( expected, NOTIFY_ALL) )
|
||||
BOOST_FOREACH( detail::notify::ptr_t const& n, waiting)
|
||||
{
|
||||
// if ( this_fiber::is_fiberized() )
|
||||
// this_fiber::yield();
|
||||
expected = SLEEPING;
|
||||
n->wake_up();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user