2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-01-31 08:12:08 +00:00

support sleep_for()/sleep_until()

This commit is contained in:
Oliver Kowalke
2015-09-19 08:59:53 +02:00
parent 8d41c994af
commit 6da902ff09
7 changed files with 241 additions and 13 deletions

View File

@@ -8,6 +8,7 @@
#define BOOST_FIBERS_CONTEXT_H
#include <atomic>
#include <chrono>
#include <boost/assert.hpp>
#include <boost/config.hpp>
@@ -16,6 +17,7 @@
#include <boost/context/stack_context.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/parent_from_member.hpp>
#include <boost/intrusive/set.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/fiber/detail/config.hpp>
@@ -67,6 +69,14 @@ typedef intrusive::list_member_hook<
>
> ready_hook;
struct sleep_tag;
typedef intrusive::set_member_hook<
intrusive::tag< sleep_tag >,
intrusive::link_mode<
intrusive::auto_unlink
>
> sleep_hook;
struct terminated_tag;
typedef intrusive::list_member_hook<
intrusive::tag< terminated_tag >,
@@ -88,9 +98,11 @@ constexpr worker_context_t worker_context = worker_context_t();
class BOOST_FIBERS_DECL context {
public:
detail::ready_hook ready_hook_;
detail::terminated_hook terminated_hook_;
detail::wait_hook wait_hook_;
detail::ready_hook ready_hook_;
detail::sleep_hook sleep_hook_;
detail::terminated_hook terminated_hook_;
detail::wait_hook wait_hook_;
std::chrono::steady_clock::time_point tp_;
typedef intrusive::list<
context,
@@ -202,6 +214,7 @@ public:
ready_hook_(),
terminated_hook_(),
wait_hook_(),
tp_( (std::chrono::steady_clock::time_point::max)() ),
use_count_( 1), // fiber instance or scheduler owner
flags_( flag_worker_context),
scheduler_( nullptr),
@@ -238,6 +251,8 @@ public:
void yield() noexcept;
bool wait_until( std::chrono::steady_clock::time_point const&);
bool is_main_context() const noexcept {
return 0 != ( flags_ & flag_main_context);
}
@@ -258,6 +273,8 @@ public:
bool ready_is_linked();
bool sleep_is_linked();
void wait_unlink();
friend void intrusive_ptr_add_ref( context * ctx) {

View File

@@ -0,0 +1,42 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_DETAIL_CONVERT_H
#define BOOST_FIBERS_DETAIL_CONVERT_H
#include <chrono>
#include <boost/config.hpp>
#include <boost/fiber/detail/config.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace detail {
inline
std::chrono::steady_clock::time_point convert(
std::chrono::steady_clock::time_point const& timeout_time) noexcept {
return timeout_time;
}
template< typename Clock, typename Duration >
std::chrono::steady_clock::time_point convert(
std::chrono::time_point< Clock, Duration > const& timeout_time) {
return std::chrono::steady_clock::now() + ( timeout_time - Clock::now() );
}
}}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_DETAIL_CONVERT_H

View File

@@ -6,10 +6,13 @@
#ifndef BOOST_THIS_FIBER_OPERATIONS_H
#define BOOST_THIS_FIBER_OPERATIONS_H
#include <chrono>
#include <boost/config.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@@ -28,6 +31,23 @@ void yield() noexcept {
fibers::context::active()->yield();
}
template< typename Clock, typename Duration >
void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) {
std::chrono::steady_clock::time_point sleep_time(
boost::fibers::detail::convert( sleep_time_) );
fibers::context::active()->wait_until( sleep_time);
// TODO: check if fiber was interrupted
//interruption_point();
}
template< typename Rep, typename Period >
void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) {
fibers::context::active()->wait_until(
std::chrono::steady_clock::now() + timeout_duration);
// TODO: check if fiber was interrupted
//interruption_point();
}
}}
#ifdef BOOST_HAS_ABI_HEADERS

View File

@@ -6,9 +6,12 @@
#ifndef BOOST_FIBERS_FIBER_MANAGER_H
#define BOOST_FIBERS_FIBER_MANAGER_H
#include <chrono>
#include <boost/config.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/intrusive/set.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/autoreset_event.hpp>
@@ -23,20 +26,35 @@ namespace fibers {
class BOOST_FIBERS_DECL scheduler {
private:
struct timepoint_less {
bool operator()( context const& l, context const& r) {
return l.tp_ < r.tp_;
}
};
typedef intrusive::list<
context,
intrusive::member_hook<
context, detail::ready_hook, & context::ready_hook_ >,
intrusive::constant_time_size< false > > ready_queue_t;
typedef intrusive::set<
context,
intrusive::member_hook<
context, detail::sleep_hook, & context::sleep_hook_ >,
intrusive::constant_time_size< false >,
intrusive::compare< timepoint_less > > sleep_queue_t;
typedef intrusive::list<
context,
intrusive::member_hook<
context, detail::terminated_hook, & context::terminated_hook_ >,
context,
detail::terminated_hook,
& context::terminated_hook_ >,
intrusive::constant_time_size< false > > terminated_queue_t;
context * main_ctx_;
intrusive_ptr< context > dispatcher_ctx_;
ready_queue_t ready_queue_;
sleep_queue_t sleep_queue_;
terminated_queue_t terminated_queue_;
bool shutdown_;
detail::autoreset_event ready_queue_ev_;
@@ -47,6 +65,8 @@ private:
void release_terminated_();
void woken_up_() noexcept;
public:
scheduler() noexcept;
@@ -67,6 +87,8 @@ public:
void yield( context *) noexcept;
bool wait_until( context *, std::chrono::steady_clock::time_point const&);
void re_schedule( context *) noexcept;
};

View File

@@ -59,6 +59,7 @@ context::context( main_context_t) :
ready_hook_(),
terminated_hook_(),
wait_hook_(),
tp_( (std::chrono::steady_clock::time_point::max)() ),
use_count_( 1), // allocated on main- or thread-stack
flags_( flag_main_context),
scheduler_( nullptr),
@@ -72,6 +73,7 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall
ready_hook_(),
terminated_hook_(),
wait_hook_(),
tp_( (std::chrono::steady_clock::time_point::max)() ),
use_count_( 0), // scheduler will own dispatcher context
flags_( flag_dispatcher_context),
scheduler_( nullptr),
@@ -151,6 +153,14 @@ context::yield() noexcept {
scheduler_->yield( active_ctx);
}
bool
context::wait_until( std::chrono::steady_clock::time_point const& tp) {
BOOST_ASSERT( nullptr != scheduler_);
BOOST_ASSERT( this == active_);
return scheduler_->wait_until( this, tp);
}
bool
context::wait_is_linked() {
return wait_hook_.is_linked();
@@ -161,6 +171,11 @@ context::ready_is_linked() {
return ready_hook_.is_linked();
}
bool
context::sleep_is_linked() {
return sleep_hook_.is_linked();
}
void
context::wait_unlink() {
wait_hook_.unlink();

View File

@@ -56,6 +56,34 @@ scheduler::release_terminated_() {
}
}
void
scheduler::woken_up_() noexcept {
// move context which the deadline has reached
// to ready-queue
// sleep-queue is sorted (ascending)
std::chrono::steady_clock::time_point now =
std::chrono::steady_clock::now();
sleep_queue_t::iterator e = sleep_queue_.end();
for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) {
context * ctx = & ( * i);
BOOST_ASSERT( ! ctx->is_terminated() );
BOOST_ASSERT( ! ctx->ready_is_linked() );
BOOST_ASSERT( ctx->sleep_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
// set fiber to state_ready if deadline was reached
if ( ctx->tp_ <= now) {
// remove context from sleep-queue
i = sleep_queue_.erase( i);
// reset sleep-tp
ctx->tp_ = (std::chrono::steady_clock::time_point::max)();
// push new context to ready-queue
ready_queue_.push_back( * ctx);
} else {
break; // first element with ctx->tp_ > now, leave for-loop
}
}
}
scheduler::scheduler() noexcept :
main_ctx_( nullptr),
dispatcher_ctx_(),
@@ -103,15 +131,27 @@ scheduler::set_dispatcher_context( intrusive_ptr< context > dispatcher_ctx) noex
void
scheduler::dispatch() {
while ( ! shutdown_) {
// release termianted context'
release_terminated_();
context * ctx( nullptr);
// get sleeping context'
woken_up_();
context * ctx = nullptr;
// loop till we get next ready context
while ( nullptr == ( ctx = get_next_() ) ) {
// TODO: move context' from remote ready-queue to local ready-queue
// move ready context' from sleep-queue to ready-queue
//
// no ready context, wait till signaled
ready_queue_ev_.reset(
(std::chrono::steady_clock::time_point::max)());
// set deadline to highest value
std::chrono::steady_clock::time_point tp =
(std::chrono::steady_clock::time_point::max)();
// get lowest deadline from sleep-queue
sleep_queue_t::iterator i = sleep_queue_.begin();
if ( sleep_queue_.end() != i) {
tp = i->tp_;
}
ready_queue_ev_.reset( tp);
// get sleeping context'
woken_up_();
}
// push dispatcher context to ready-queue
// so that ready-queue never becomes empty
@@ -131,8 +171,9 @@ scheduler::dispatch() {
void
scheduler::set_ready( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( ! ctx->ready_is_linked() );
BOOST_ASSERT( ! ctx->is_terminated() );
BOOST_ASSERT( ! ctx->ready_is_linked() );
BOOST_ASSERT( ! ctx->sleep_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
// set the scheduler for new context
ctx->set_scheduler( this);
@@ -145,6 +186,7 @@ scheduler::set_terminated( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( ctx->is_terminated() );
BOOST_ASSERT( ! ctx->ready_is_linked() );
BOOST_ASSERT( ! ctx->sleep_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() );
terminated_queue_.push_back( * ctx);
}
@@ -154,15 +196,36 @@ scheduler::yield( context * active_ctx) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( ! active_ctx->is_terminated() );
BOOST_ASSERT( ! active_ctx->ready_is_linked() );
BOOST_ASSERT( ! active_ctx->sleep_is_linked() );
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
// push active context to ready-queue
ready_queue_.push_back( * active_ctx);
// resume another fiber
resume_( active_ctx, get_next_() );
}
bool
scheduler::wait_until( context * active_ctx,
std::chrono::steady_clock::time_point const& sleep_tp) {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( ! active_ctx->is_terminated() );
BOOST_ASSERT( ! active_ctx->ready_is_linked() );
BOOST_ASSERT( ! active_ctx->sleep_is_linked() );
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
// push active context to sleep-queue
active_ctx->tp_ = sleep_tp;
sleep_queue_.insert( * active_ctx);
// resume another context
resume_( active_ctx, get_next_() );
// context has been resumed
// check if deadline has reached
return std::chrono::steady_clock::now() < sleep_tp;
}
void
scheduler::re_schedule( context * active_ctx) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
// resume another context
resume_( active_ctx, get_next_() );
}

View File

@@ -4,6 +4,7 @@
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <chrono>
#include <sstream>
#include <string>
@@ -231,12 +232,58 @@ void test_yield() {
BOOST_CHECK_EQUAL( 8, v2);
}
void test_sleep_for() {
typedef std::chrono::system_clock Clock;
typedef Clock::time_point time_point;
std::chrono::milliseconds ms(500);
time_point t0 = Clock::now();
boost::this_fiber::sleep_for(ms);
time_point t1 = Clock::now();
std::chrono::nanoseconds ns = (t1 - t0) - ms;
std::chrono::nanoseconds err = ms / 100;
// The time slept is within 1% of 500ms
// This test is spurious as it depends on the time the fiber system switches the fiber
BOOST_CHECK((std::max)(ns.count(), -ns.count()) < (err+std::chrono::milliseconds(1000)).count());
//BOOST_TEST(std::abs(static_cast<long>(ns.count())) < (err+std::chrono::milliseconds(1000)).count());
}
void test_sleep_until() {
{
typedef std::chrono::steady_clock Clock;
typedef Clock::time_point time_point;
std::chrono::milliseconds ms(500);
time_point t0 = Clock::now();
boost::this_fiber::sleep_until(t0 + ms);
time_point t1 = Clock::now();
std::chrono::nanoseconds ns = (t1 - t0) - ms;
std::chrono::nanoseconds err = ms / 100;
// The time slept is within 1% of 500ms
// This test is spurious as it depends on the time the thread system switches the threads
BOOST_CHECK((std::max)(ns.count(), -ns.count()) < (err+std::chrono::milliseconds(1000)).count());
//BOOST_TEST(std::abs(static_cast<long>(ns.count())) < (err+std::chrono::milliseconds(1000)).count());
}
{
typedef std::chrono::system_clock Clock;
typedef Clock::time_point time_point;
std::chrono::milliseconds ms(500);
time_point t0 = Clock::now();
boost::this_fiber::sleep_until(t0 + ms);
time_point t1 = Clock::now();
std::chrono::nanoseconds ns = (t1 - t0) - ms;
std::chrono::nanoseconds err = ms / 100;
// The time slept is within 1% of 500ms
// This test is spurious as it depends on the time the thread system switches the threads
BOOST_CHECK((std::max)(ns.count(), -ns.count()) < (err+std::chrono::milliseconds(1000)).count());
//BOOST_TEST(std::abs(static_cast<long>(ns.count())) < (err+std::chrono::milliseconds(1000)).count());
}
}
boost::unit_test::test_suite * init_unit_test_suite( int, char* []) {
boost::unit_test::test_suite * test =
BOOST_TEST_SUITE("Boost.Fiber: fiber test suite");
test->add( BOOST_TEST_CASE( & test_scheduler_dtor) );
test->add( BOOST_TEST_CASE( & test_join_fn) );
test->add( BOOST_TEST_CASE( & test_scheduler_dtor) );
test->add( BOOST_TEST_CASE( & test_join_fn) );
test->add( BOOST_TEST_CASE( & test_join_memfn) );
test->add( BOOST_TEST_CASE( & test_join_copyable) );
test->add( BOOST_TEST_CASE( & test_join_moveable) );
@@ -244,6 +291,8 @@ boost::unit_test::test_suite * init_unit_test_suite( int, char* []) {
test->add( BOOST_TEST_CASE( & test_move_fiber) );
test->add( BOOST_TEST_CASE( & test_move_fiber) );
test->add( BOOST_TEST_CASE( & test_yield) );
test->add( BOOST_TEST_CASE( & test_sleep_for) );
test->add( BOOST_TEST_CASE( & test_sleep_until) );
return test;
}