From 6da902ff09438be8ccd94641de5a457f2ce76bd7 Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Sat, 19 Sep 2015 08:59:53 +0200 Subject: [PATCH] support sleep_for()/sleep_until() --- include/boost/fiber/context.hpp | 23 ++++++-- include/boost/fiber/detail/convert.hpp | 42 +++++++++++++++ include/boost/fiber/operations.hpp | 22 +++++++- include/boost/fiber/scheduler.hpp | 26 ++++++++- src/context.cpp | 15 ++++++ src/scheduler.cpp | 73 ++++++++++++++++++++++++-- test/test_fiber.cpp | 53 ++++++++++++++++++- 7 files changed, 241 insertions(+), 13 deletions(-) create mode 100644 include/boost/fiber/detail/convert.hpp diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 0b79be36..8af5049d 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -8,6 +8,7 @@ #define BOOST_FIBERS_CONTEXT_H #include +#include #include #include @@ -16,6 +17,7 @@ #include #include #include +#include #include #include @@ -67,6 +69,14 @@ typedef intrusive::list_member_hook< > > ready_hook; +struct sleep_tag; +typedef intrusive::set_member_hook< + intrusive::tag< sleep_tag >, + intrusive::link_mode< + intrusive::auto_unlink + > +> sleep_hook; + struct terminated_tag; typedef intrusive::list_member_hook< intrusive::tag< terminated_tag >, @@ -88,9 +98,11 @@ constexpr worker_context_t worker_context = worker_context_t(); class BOOST_FIBERS_DECL context { public: - detail::ready_hook ready_hook_; - detail::terminated_hook terminated_hook_; - detail::wait_hook wait_hook_; + detail::ready_hook ready_hook_; + detail::sleep_hook sleep_hook_; + detail::terminated_hook terminated_hook_; + detail::wait_hook wait_hook_; + std::chrono::steady_clock::time_point tp_; typedef intrusive::list< context, @@ -202,6 +214,7 @@ public: ready_hook_(), terminated_hook_(), wait_hook_(), + tp_( (std::chrono::steady_clock::time_point::max)() ), use_count_( 1), // fiber instance or scheduler owner flags_( flag_worker_context), scheduler_( nullptr), @@ -238,6 +251,8 @@ public: void yield() noexcept; + bool wait_until( std::chrono::steady_clock::time_point const&); + bool is_main_context() const noexcept { return 0 != ( flags_ & flag_main_context); } @@ -258,6 +273,8 @@ public: bool ready_is_linked(); + bool sleep_is_linked(); + void wait_unlink(); friend void intrusive_ptr_add_ref( context * ctx) { diff --git a/include/boost/fiber/detail/convert.hpp b/include/boost/fiber/detail/convert.hpp new file mode 100644 index 00000000..1620bba7 --- /dev/null +++ b/include/boost/fiber/detail/convert.hpp @@ -0,0 +1,42 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_DETAIL_CONVERT_H +#define BOOST_FIBERS_DETAIL_CONVERT_H + +#include + +#include + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace detail { + +inline +std::chrono::steady_clock::time_point convert( + std::chrono::steady_clock::time_point const& timeout_time) noexcept { + return timeout_time; +} + +template< typename Clock, typename Duration > +std::chrono::steady_clock::time_point convert( + std::chrono::time_point< Clock, Duration > const& timeout_time) { + return std::chrono::steady_clock::now() + ( timeout_time - Clock::now() ); +} + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_DETAIL_CONVERT_H diff --git a/include/boost/fiber/operations.hpp b/include/boost/fiber/operations.hpp index 8e625677..45812c59 100644 --- a/include/boost/fiber/operations.hpp +++ b/include/boost/fiber/operations.hpp @@ -6,10 +6,13 @@ #ifndef BOOST_THIS_FIBER_OPERATIONS_H #define BOOST_THIS_FIBER_OPERATIONS_H +#include + #include -#include #include +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -28,6 +31,23 @@ void yield() noexcept { fibers::context::active()->yield(); } +template< typename Clock, typename Duration > +void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) { + std::chrono::steady_clock::time_point sleep_time( + boost::fibers::detail::convert( sleep_time_) ); + fibers::context::active()->wait_until( sleep_time); + // TODO: check if fiber was interrupted + //interruption_point(); +} + +template< typename Rep, typename Period > +void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) { + fibers::context::active()->wait_until( + std::chrono::steady_clock::now() + timeout_duration); + // TODO: check if fiber was interrupted + //interruption_point(); +} + }} #ifdef BOOST_HAS_ABI_HEADERS diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index fd8cc849..74c630bb 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -6,9 +6,12 @@ #ifndef BOOST_FIBERS_FIBER_MANAGER_H #define BOOST_FIBERS_FIBER_MANAGER_H +#include + #include -#include #include +#include +#include #include #include @@ -23,20 +26,35 @@ namespace fibers { class BOOST_FIBERS_DECL scheduler { private: + struct timepoint_less { + bool operator()( context const& l, context const& r) { + return l.tp_ < r.tp_; + } + }; + typedef intrusive::list< context, intrusive::member_hook< context, detail::ready_hook, & context::ready_hook_ >, intrusive::constant_time_size< false > > ready_queue_t; + typedef intrusive::set< + context, + intrusive::member_hook< + context, detail::sleep_hook, & context::sleep_hook_ >, + intrusive::constant_time_size< false >, + intrusive::compare< timepoint_less > > sleep_queue_t; typedef intrusive::list< context, intrusive::member_hook< - context, detail::terminated_hook, & context::terminated_hook_ >, + context, + detail::terminated_hook, + & context::terminated_hook_ >, intrusive::constant_time_size< false > > terminated_queue_t; context * main_ctx_; intrusive_ptr< context > dispatcher_ctx_; ready_queue_t ready_queue_; + sleep_queue_t sleep_queue_; terminated_queue_t terminated_queue_; bool shutdown_; detail::autoreset_event ready_queue_ev_; @@ -47,6 +65,8 @@ private: void release_terminated_(); + void woken_up_() noexcept; + public: scheduler() noexcept; @@ -67,6 +87,8 @@ public: void yield( context *) noexcept; + bool wait_until( context *, std::chrono::steady_clock::time_point const&); + void re_schedule( context *) noexcept; }; diff --git a/src/context.cpp b/src/context.cpp index 82be3f48..73eb909c 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -59,6 +59,7 @@ context::context( main_context_t) : ready_hook_(), terminated_hook_(), wait_hook_(), + tp_( (std::chrono::steady_clock::time_point::max)() ), use_count_( 1), // allocated on main- or thread-stack flags_( flag_main_context), scheduler_( nullptr), @@ -72,6 +73,7 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall ready_hook_(), terminated_hook_(), wait_hook_(), + tp_( (std::chrono::steady_clock::time_point::max)() ), use_count_( 0), // scheduler will own dispatcher context flags_( flag_dispatcher_context), scheduler_( nullptr), @@ -151,6 +153,14 @@ context::yield() noexcept { scheduler_->yield( active_ctx); } +bool +context::wait_until( std::chrono::steady_clock::time_point const& tp) { + BOOST_ASSERT( nullptr != scheduler_); + BOOST_ASSERT( this == active_); + + return scheduler_->wait_until( this, tp); +} + bool context::wait_is_linked() { return wait_hook_.is_linked(); @@ -161,6 +171,11 @@ context::ready_is_linked() { return ready_hook_.is_linked(); } +bool +context::sleep_is_linked() { + return sleep_hook_.is_linked(); +} + void context::wait_unlink() { wait_hook_.unlink(); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index e9f0d80f..32c4cde4 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -56,6 +56,34 @@ scheduler::release_terminated_() { } } +void +scheduler::woken_up_() noexcept { + // move context which the deadline has reached + // to ready-queue + // sleep-queue is sorted (ascending) + std::chrono::steady_clock::time_point now = + std::chrono::steady_clock::now(); + sleep_queue_t::iterator e = sleep_queue_.end(); + for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) { + context * ctx = & ( * i); + BOOST_ASSERT( ! ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ctx->sleep_is_linked() ); + BOOST_ASSERT( ! ctx->wait_is_linked() ); + // set fiber to state_ready if deadline was reached + if ( ctx->tp_ <= now) { + // remove context from sleep-queue + i = sleep_queue_.erase( i); + // reset sleep-tp + ctx->tp_ = (std::chrono::steady_clock::time_point::max)(); + // push new context to ready-queue + ready_queue_.push_back( * ctx); + } else { + break; // first element with ctx->tp_ > now, leave for-loop + } + } +} + scheduler::scheduler() noexcept : main_ctx_( nullptr), dispatcher_ctx_(), @@ -103,15 +131,27 @@ scheduler::set_dispatcher_context( intrusive_ptr< context > dispatcher_ctx) noex void scheduler::dispatch() { while ( ! shutdown_) { + // release termianted context' release_terminated_(); - context * ctx( nullptr); + // get sleeping context' + woken_up_(); + context * ctx = nullptr; // loop till we get next ready context while ( nullptr == ( ctx = get_next_() ) ) { // TODO: move context' from remote ready-queue to local ready-queue - // move ready context' from sleep-queue to ready-queue + // // no ready context, wait till signaled - ready_queue_ev_.reset( - (std::chrono::steady_clock::time_point::max)()); + // set deadline to highest value + std::chrono::steady_clock::time_point tp = + (std::chrono::steady_clock::time_point::max)(); + // get lowest deadline from sleep-queue + sleep_queue_t::iterator i = sleep_queue_.begin(); + if ( sleep_queue_.end() != i) { + tp = i->tp_; + } + ready_queue_ev_.reset( tp); + // get sleeping context' + woken_up_(); } // push dispatcher context to ready-queue // so that ready-queue never becomes empty @@ -131,8 +171,9 @@ scheduler::dispatch() { void scheduler::set_ready( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); - BOOST_ASSERT( ! ctx->ready_is_linked() ); BOOST_ASSERT( ! ctx->is_terminated() ); + BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); // set the scheduler for new context ctx->set_scheduler( this); @@ -145,6 +186,7 @@ scheduler::set_terminated( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( ctx->is_terminated() ); BOOST_ASSERT( ! ctx->ready_is_linked() ); + BOOST_ASSERT( ! ctx->sleep_is_linked() ); BOOST_ASSERT( ! ctx->wait_is_linked() ); terminated_queue_.push_back( * ctx); } @@ -154,15 +196,36 @@ scheduler::yield( context * active_ctx) noexcept { BOOST_ASSERT( nullptr != active_ctx); BOOST_ASSERT( ! active_ctx->is_terminated() ); BOOST_ASSERT( ! active_ctx->ready_is_linked() ); + BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); BOOST_ASSERT( ! active_ctx->wait_is_linked() ); // push active context to ready-queue ready_queue_.push_back( * active_ctx); + // resume another fiber resume_( active_ctx, get_next_() ); } +bool +scheduler::wait_until( context * active_ctx, + std::chrono::steady_clock::time_point const& sleep_tp) { + BOOST_ASSERT( nullptr != active_ctx); + BOOST_ASSERT( ! active_ctx->is_terminated() ); + BOOST_ASSERT( ! active_ctx->ready_is_linked() ); + BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); + BOOST_ASSERT( ! active_ctx->wait_is_linked() ); + // push active context to sleep-queue + active_ctx->tp_ = sleep_tp; + sleep_queue_.insert( * active_ctx); + // resume another context + resume_( active_ctx, get_next_() ); + // context has been resumed + // check if deadline has reached + return std::chrono::steady_clock::now() < sleep_tp; +} + void scheduler::re_schedule( context * active_ctx) noexcept { BOOST_ASSERT( nullptr != active_ctx); + // resume another context resume_( active_ctx, get_next_() ); } diff --git a/test/test_fiber.cpp b/test/test_fiber.cpp index bde725ae..9976cb90 100644 --- a/test/test_fiber.cpp +++ b/test/test_fiber.cpp @@ -4,6 +4,7 @@ // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) +#include #include #include @@ -231,12 +232,58 @@ void test_yield() { BOOST_CHECK_EQUAL( 8, v2); } +void test_sleep_for() { + typedef std::chrono::system_clock Clock; + typedef Clock::time_point time_point; + std::chrono::milliseconds ms(500); + time_point t0 = Clock::now(); + boost::this_fiber::sleep_for(ms); + time_point t1 = Clock::now(); + std::chrono::nanoseconds ns = (t1 - t0) - ms; + std::chrono::nanoseconds err = ms / 100; + // The time slept is within 1% of 500ms + // This test is spurious as it depends on the time the fiber system switches the fiber + BOOST_CHECK((std::max)(ns.count(), -ns.count()) < (err+std::chrono::milliseconds(1000)).count()); + //BOOST_TEST(std::abs(static_cast(ns.count())) < (err+std::chrono::milliseconds(1000)).count()); +} + +void test_sleep_until() { + { + typedef std::chrono::steady_clock Clock; + typedef Clock::time_point time_point; + std::chrono::milliseconds ms(500); + time_point t0 = Clock::now(); + boost::this_fiber::sleep_until(t0 + ms); + time_point t1 = Clock::now(); + std::chrono::nanoseconds ns = (t1 - t0) - ms; + std::chrono::nanoseconds err = ms / 100; + // The time slept is within 1% of 500ms + // This test is spurious as it depends on the time the thread system switches the threads + BOOST_CHECK((std::max)(ns.count(), -ns.count()) < (err+std::chrono::milliseconds(1000)).count()); + //BOOST_TEST(std::abs(static_cast(ns.count())) < (err+std::chrono::milliseconds(1000)).count()); + } + { + typedef std::chrono::system_clock Clock; + typedef Clock::time_point time_point; + std::chrono::milliseconds ms(500); + time_point t0 = Clock::now(); + boost::this_fiber::sleep_until(t0 + ms); + time_point t1 = Clock::now(); + std::chrono::nanoseconds ns = (t1 - t0) - ms; + std::chrono::nanoseconds err = ms / 100; + // The time slept is within 1% of 500ms + // This test is spurious as it depends on the time the thread system switches the threads + BOOST_CHECK((std::max)(ns.count(), -ns.count()) < (err+std::chrono::milliseconds(1000)).count()); + //BOOST_TEST(std::abs(static_cast(ns.count())) < (err+std::chrono::milliseconds(1000)).count()); + } +} + boost::unit_test::test_suite * init_unit_test_suite( int, char* []) { boost::unit_test::test_suite * test = BOOST_TEST_SUITE("Boost.Fiber: fiber test suite"); - test->add( BOOST_TEST_CASE( & test_scheduler_dtor) ); - test->add( BOOST_TEST_CASE( & test_join_fn) ); + test->add( BOOST_TEST_CASE( & test_scheduler_dtor) ); + test->add( BOOST_TEST_CASE( & test_join_fn) ); test->add( BOOST_TEST_CASE( & test_join_memfn) ); test->add( BOOST_TEST_CASE( & test_join_copyable) ); test->add( BOOST_TEST_CASE( & test_join_moveable) ); @@ -244,6 +291,8 @@ boost::unit_test::test_suite * init_unit_test_suite( int, char* []) { test->add( BOOST_TEST_CASE( & test_move_fiber) ); test->add( BOOST_TEST_CASE( & test_move_fiber) ); test->add( BOOST_TEST_CASE( & test_yield) ); + test->add( BOOST_TEST_CASE( & test_sleep_for) ); + test->add( BOOST_TEST_CASE( & test_sleep_until) ); return test; }