From 3319841788d7a2e59d70c115eb459d46d1fdb3c1 Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Thu, 3 Dec 2015 17:57:00 +0100 Subject: [PATCH] use spinlock_lock+context instead of function<> --- examples/work_sharing.cpp | 10 +-- include/boost/fiber/condition.hpp | 75 +++++++++++----------- include/boost/fiber/context.hpp | 35 +++++++---- include/boost/fiber/scheduler.hpp | 11 +++- src/context.cpp | 73 ++++++++++++++++++---- src/mutex.cpp | 5 +- src/recursive_mutex.cpp | 5 +- src/recursive_timed_mutex.cpp | 10 +-- src/scheduler.cpp | 100 ++++++++++++++++++++++++------ src/timed_mutex.cpp | 10 +-- 10 files changed, 222 insertions(+), 112 deletions(-) diff --git a/examples/work_sharing.cpp b/examples/work_sharing.cpp index 4ec826eb..4191cfba 100644 --- a/examples/work_sharing.cpp +++ b/examples/work_sharing.cpp @@ -80,11 +80,13 @@ public: BOOST_ASSERT( nullptr != ctx); // attach context to current scheduler boost::fibers::context::active()->migrate( ctx); - } else if ( ! local_queue_.empty() ) { + } else { lk.unlock(); - // nothing in the ready queue, return dispatcher_ctx_ - ctx = local_queue_.front(); - local_queue_.pop(); + if ( ! local_queue_.empty() ) { + // nothing in the ready queue, return dispatcher_ctx_ + ctx = local_queue_.front(); + local_queue_.pop(); + } } return ctx; } diff --git a/include/boost/fiber/condition.hpp b/include/boost/fiber/condition.hpp index 6f4115af..d25cbf94 100644 --- a/include/boost/fiber/condition.hpp +++ b/include/boost/fiber/condition.hpp @@ -69,33 +69,34 @@ public: // check if context was interrupted this_fiber::interruption_point(); context * ctx = context::active(); - typename LockType::mutex_type * mtx = lt.mutex(); - if ( ctx != mtx->owner_) { - throw lock_error( static_cast< int >( std::errc::operation_not_permitted), - "boost fiber: no privilege to perform the operation"); - } + // pre-conditions + BOOST_ASSERT( lt.owns_lock() && ctx == lt.mutex()->owner_); + // atomically call lt.unlock() and block on *this // store this fiber in waiting-queue - // in order notify (resume) this fiber later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->wait_link( wait_queue_); - // unlock external + // unlock external lt lt.unlock(); - std::function< void() > func([&lk](){ - lk.unlock(); - }); // suspend this fiber - ctx->suspend( & func); + ctx->suspend( lk); + // relock local lk + lk.lock(); + // remove from waiting-queue + ctx->wait_unlink(); + // unlock local lk + lk.unlock(); + // relock external again before returning try { - // check if context was interrupted - this_fiber::interruption_point(); - } catch ( ... ) { - ctx->wait_unlink(); - throw; + lt.lock(); + } catch (...) { + std::terminate(); } + // post-conditions + BOOST_ASSERT( lt.owns_lock() && ctx == lt.mutex()->owner_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - // lock external again before returning - lt.lock(); + // check if context was interrupted + this_fiber::interruption_point(); } template< typename LockType, typename Clock, typename Duration > @@ -106,36 +107,36 @@ public: std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) ); context * ctx = context::active(); - typename LockType::mutex_type * mtx = lt.mutex(); - if ( ctx != mtx->owner_) { - throw lock_error( static_cast< int >( std::errc::operation_not_permitted), - "boost fiber: no privilege to perform the operation"); - } + // pre-conditions + BOOST_ASSERT( lt.owns_lock() && ctx == lt.mutex()->owner_); + // atomically call lt.unlock() and block on *this // store this fiber in waiting-queue - // in order notify (resume) this fiber later detail::spinlock_lock lk( wait_queue_splk_); BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->wait_link( wait_queue_); - // unlock external + // unlock external lt lt.unlock(); - std::function< void() > func([&lk](){ - lk.unlock(); - }); // suspend this fiber - if ( ! ctx->wait_until( timeout_time, & func) ) { + if ( ! ctx->wait_until( timeout_time, lk) ) { status = cv_status::timeout; - ctx->wait_unlink(); } + // relock local lk + lk.lock(); + // remove from waiting-queue + ctx->wait_unlink(); + // unlock local lk + lk.unlock(); + // relock external again before returning try { - // check if context was interrupted - this_fiber::interruption_point(); - } catch ( ... ) { - ctx->wait_unlink(); - throw; + lt.lock(); + } catch (...) { + std::terminate(); } + // post-conditions + BOOST_ASSERT( lt.owns_lock() && ctx == lt.mutex()->owner_); BOOST_ASSERT( ! ctx->wait_is_linked() ); - // lock external again before returning - lt.lock(); + // check if context was interrupted + this_fiber::interruption_point(); return status; } diff --git a/include/boost/fiber/context.hpp b/include/boost/fiber/context.hpp index 3c14d768..d0c52bbc 100644 --- a/include/boost/fiber/context.hpp +++ b/include/boost/fiber/context.hpp @@ -155,6 +155,11 @@ private: } }; + struct data_t { + detail::spinlock_lock * lk{ nullptr }; + context * ctx{ nullptr }; + }; + typedef std::map< uintptr_t, fss_data > fss_data_t; static thread_local context * active_; @@ -169,6 +174,9 @@ private: scheduler * scheduler_{ nullptr }; boost::context::execution_context ctx_; + void resume_( data_t &) noexcept; + void set_ready_( context *) noexcept; + public: detail::ready_hook ready_hook_{}; detail::remote_ready_hook remote_ready_hook_{}; @@ -266,17 +274,16 @@ public: // mutable: generated operator() is not const -> enables std::move( fn) // std::make_tuple: stores decayed copies of its args, implicitly unwraps std::reference_wrapper [this,fn_=std::forward< Fn >( fn),tpl_=std::make_tuple( std::forward< Args >( args) ...), - ctx=boost::context::execution_context::current()] (void *) mutable noexcept { + ctx=boost::context::execution_context::current()] (void * vp) mutable noexcept { try { + data_t * dp = static_cast< data_t * >( vp); + if ( nullptr != dp->lk) { + dp->lk->unlock(); + } else if ( nullptr != dp->ctx) { + active_->set_ready_( dp->ctx); + } auto fn = std::move( fn_); auto tpl = std::move( tpl_); - // jump back after initialization - void * vp = ctx(); - // execute returned functor - if ( nullptr != vp) { - std::function< void() > * func( static_cast< std::function< void() > * >( vp) ); - ( * func)(); - } boost::context::detail::do_invoke( fn, tpl); } catch ( fiber_interrupted const&) { } @@ -284,8 +291,6 @@ public: terminate(); BOOST_ASSERT_MSG( false, "fiber already terminated"); }} { - // switch for initialization - ctx_(); } virtual ~context() noexcept; @@ -294,9 +299,12 @@ public: id get_id() const noexcept; - void resume( std::function< void() > *) noexcept; + void resume() noexcept; + void resume( detail::spinlock_lock &) noexcept; + void resume( context *) noexcept; - void suspend( std::function< void() > * = nullptr) noexcept; + void suspend() noexcept; + void suspend( detail::spinlock_lock &) noexcept; void join(); @@ -304,8 +312,9 @@ public: void terminate() noexcept; + bool wait_until( std::chrono::steady_clock::time_point const&) noexcept; bool wait_until( std::chrono::steady_clock::time_point const&, - std::function< void() > * = nullptr) noexcept; + detail::spinlock_lock &) noexcept; void set_ready( context *) noexcept; diff --git a/include/boost/fiber/scheduler.hpp b/include/boost/fiber/scheduler.hpp index 9e18df36..538a0053 100644 --- a/include/boost/fiber/scheduler.hpp +++ b/include/boost/fiber/scheduler.hpp @@ -83,7 +83,9 @@ private: detail::spinlock remote_ready_splk_{}; detail::spinlock worker_splk_{}; - void resume_( context *, context *, std::function< void() > *) noexcept; + void resume_( context *, context *) noexcept; + void resume_( context *, context *, detail::spinlock_lock &) noexcept; + void resume_( context *, context *, context *) noexcept; context * get_next_() noexcept; @@ -111,12 +113,15 @@ public: void yield( context *) noexcept; + bool wait_until( context *, + std::chrono::steady_clock::time_point const&) noexcept; bool wait_until( context *, std::chrono::steady_clock::time_point const&, - std::function< void() > * = nullptr) noexcept; + detail::spinlock_lock &) noexcept; + void suspend( context *) noexcept; void suspend( context *, - std::function< void() > * = nullptr) noexcept; + detail::spinlock_lock &) noexcept; bool has_ready_fibers() const noexcept; diff --git a/src/context.cpp b/src/context.cpp index 038b1b14..61b1eb62 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -146,6 +146,21 @@ context::reset_active() noexcept { active_ = nullptr; } +void +context::resume_( data_t & d) noexcept { + data_t * dp = static_cast< data_t * >( ctx_( & d) ); + if ( nullptr != dp->lk) { + dp->lk->unlock(); + } else if ( nullptr != dp->ctx) { + active_->set_ready_( dp->ctx); + } +} + +void +context::set_ready_( context * ctx) noexcept { + scheduler_->set_ready( ctx); +} + // main fiber context context::context( main_context_t) noexcept : use_count_{ 1 }, // allocated on main- or thread-stack @@ -159,9 +174,11 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall flags_{ flag_dispatcher_context }, ctx_{ std::allocator_arg, palloc, salloc, [this,sched] (void * vp) noexcept { - if ( nullptr != vp) { - std::function< void() > * func( static_cast< std::function< void() > * >( vp) ); - ( * func)(); + data_t * dp = static_cast< data_t * >( vp); + if ( nullptr != dp->lk) { + dp->lk->unlock(); + } else if ( nullptr != dp->ctx) { + active_->set_ready_( dp->ctx); } // execute scheduler::dispatch() sched->dispatch(); @@ -190,20 +207,43 @@ context::get_id() const noexcept { } void -context::resume( std::function< void() > * func) noexcept { +context::resume() noexcept { context * prev = this; // active_ will point to `this` // prev will point to previous active context std::swap( active_, prev); - func = static_cast< std::function< void() > * >( ctx_( func) ); - if ( nullptr != func) { - ( * func)(); - } + data_t d{}; + resume_( d); } void -context::suspend( std::function< void() > * func) noexcept { - scheduler_->suspend( this, func); +context::resume( detail::spinlock_lock & lk) noexcept { + context * prev = this; + // active_ will point to `this` + // prev will point to previous active context + std::swap( active_, prev); + data_t d{ & lk, nullptr }; + resume_( d); +} + +void +context::resume( context * ready_ctx) noexcept { + context * prev = this; + // active_ will point to `this` + // prev will point to previous active context + std::swap( active_, prev); + data_t d{ nullptr, ready_ctx }; + resume_( d); +} + +void +context::suspend() noexcept { + scheduler_->suspend( this); +} + +void +context::suspend( detail::spinlock_lock & lk) noexcept { + scheduler_->suspend( this, lk); } void @@ -265,11 +305,18 @@ context::terminate() noexcept { } bool -context::wait_until( std::chrono::steady_clock::time_point const& tp, - std::function< void() > * func) noexcept { +context::wait_until( std::chrono::steady_clock::time_point const& tp) noexcept { BOOST_ASSERT( nullptr != scheduler_); BOOST_ASSERT( this == active_); - return scheduler_->wait_until( this, tp, func); + return scheduler_->wait_until( this, tp); +} + +bool +context::wait_until( std::chrono::steady_clock::time_point const& tp, + detail::spinlock_lock & lk) noexcept { + BOOST_ASSERT( nullptr != scheduler_); + BOOST_ASSERT( this == active_); + return scheduler_->wait_until( this, tp, lk); } void diff --git a/src/mutex.cpp b/src/mutex.cpp index fc7c4b41..6067f7a4 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -34,11 +34,8 @@ mutex::lock() { } BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->wait_link( wait_queue_); - std::function< void() > func([&lk](){ - lk.unlock(); - }); // suspend this fiber - ctx->suspend( & func); + ctx->suspend( lk); BOOST_ASSERT( ! ctx->wait_is_linked() ); } diff --git a/src/recursive_mutex.cpp b/src/recursive_mutex.cpp index b2d8206e..bb3342a9 100644 --- a/src/recursive_mutex.cpp +++ b/src/recursive_mutex.cpp @@ -34,11 +34,8 @@ recursive_mutex::lock() { } BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->wait_link( wait_queue_); - std::function< void() > func([&lk](){ - lk.unlock(); - }); // suspend this fiber - ctx->suspend( & func); + ctx->suspend( lk); BOOST_ASSERT( ! ctx->wait_is_linked() ); } diff --git a/src/recursive_timed_mutex.cpp b/src/recursive_timed_mutex.cpp index 49b5637f..12a99a72 100644 --- a/src/recursive_timed_mutex.cpp +++ b/src/recursive_timed_mutex.cpp @@ -37,11 +37,8 @@ recursive_timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point co } BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->wait_link( wait_queue_); - std::function< void() > func([&lk](){ - lk.unlock(); - }); // suspend this fiber until notified or timed-out - if ( ! context::active()->wait_until( timeout_time, & func) ) { + if ( ! context::active()->wait_until( timeout_time, lk) ) { // remove fiber from wait-queue lk.lock(); ctx->wait_unlink(); @@ -66,11 +63,8 @@ recursive_timed_mutex::lock() { } BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->wait_link( wait_queue_); - std::function< void() > func([&lk](){ - lk.unlock(); - }); // suspend this fiber - ctx->suspend( & func); + ctx->suspend( lk); BOOST_ASSERT( ! ctx->wait_is_linked() ); } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 3dc6328d..fdf821ef 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -23,7 +23,7 @@ namespace boost { namespace fibers { void -scheduler::resume_( context * active_ctx, context * ctx, std::function< void() > * func) noexcept { +scheduler::resume_( context * active_ctx, context * ctx) noexcept { BOOST_ASSERT( nullptr != active_ctx); BOOST_ASSERT( nullptr != ctx); //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); @@ -32,14 +32,44 @@ scheduler::resume_( context * active_ctx, context * ctx, std::function< void() > BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() ); BOOST_ASSERT( active_ctx != ctx); // resume active-fiber == ctx - ctx->resume( func); + ctx->resume(); + BOOST_ASSERT( context::active() == active_ctx); +} + +void +scheduler::resume_( context * active_ctx, context * ctx, detail::spinlock_lock & lk) noexcept { + BOOST_ASSERT( nullptr != active_ctx); + BOOST_ASSERT( nullptr != ctx); + //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); + BOOST_ASSERT( this == active_ctx->get_scheduler() ); + BOOST_ASSERT( this == ctx->get_scheduler() ); + BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() ); + BOOST_ASSERT( active_ctx != ctx); + // resume active-fiber == ctx + ctx->resume( lk); + BOOST_ASSERT( context::active() == active_ctx); +} + +void +scheduler::resume_( context * active_ctx, context * ctx, context * ready_ctx) noexcept { + BOOST_ASSERT( nullptr != active_ctx); + BOOST_ASSERT( nullptr != ctx); + BOOST_ASSERT( ready_ctx == active_ctx); + //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); + BOOST_ASSERT( this == active_ctx->get_scheduler() ); + BOOST_ASSERT( this == ctx->get_scheduler() ); + BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() ); + BOOST_ASSERT( active_ctx != ctx); + // resume active-fiber == ctx + ctx->resume( ready_ctx); BOOST_ASSERT( context::active() == active_ctx); } context * scheduler::get_next_() noexcept { context * ctx = sched_algo_->pick_next(); - BOOST_ASSERT( nullptr == ctx || this == ctx->get_scheduler() ); + //BOOST_ASSERT( nullptr == ctx); + //BOOST_ASSERT( this == ctx->get_scheduler() ); return ctx; } @@ -123,7 +153,7 @@ scheduler::~scheduler() noexcept { // signal dispatcher-context termination shutdown_ = true; // resume pending fibers - resume_( main_ctx_, get_next_(), nullptr); + resume_( main_ctx_, get_next_() ); // no context' in worker-queue //BOOST_ASSERT( worker_queue_.empty() ); BOOST_ASSERT( terminated_queue_.empty() ); @@ -154,7 +184,7 @@ scheduler::dispatch() noexcept { // push dispatcher-context to ready-queue // so that ready-queue never becomes empty sched_algo_->awakened( dispatcher_ctx_.get() ); - resume_( dispatcher_ctx_.get(), ctx, nullptr); + resume_( dispatcher_ctx_.get(), ctx); BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); } else { // no ready context, wait till signaled @@ -191,7 +221,7 @@ scheduler::dispatch() noexcept { if ( nullptr != ( ctx = get_next_() ) ) { // resume ready context's sched_algo_->awakened( dispatcher_ctx_.get() ); - resume_( dispatcher_ctx_.get(), ctx, nullptr); + resume_( dispatcher_ctx_.get(), ctx); BOOST_ASSERT( context::active() == dispatcher_ctx_.get() ); } } @@ -199,7 +229,7 @@ scheduler::dispatch() noexcept { // release termianted context' release_terminated_(); // return to main-context - resume_( dispatcher_ctx_.get(), main_ctx_, nullptr); + resume_( dispatcher_ctx_.get(), main_ctx_); } void @@ -262,7 +292,7 @@ scheduler::set_terminated( context * active_ctx) noexcept { // intrusive_ptr_release( ctx); active_ctx->terminated_link( terminated_queue_); // resume another fiber - resume_( active_ctx, get_next_(), nullptr); + resume_( active_ctx, get_next_() ); } void @@ -280,17 +310,13 @@ scheduler::yield( context * active_ctx) noexcept { // from one ready-queue) the context must be // already suspended until another thread resumes it // (== maked as ready) - std::function< void() > func([this,active_ctx](){ - set_ready( active_ctx); - }); // resume another fiber - resume_( active_ctx, get_next_(), & func); + resume_( active_ctx, get_next_(), active_ctx); } bool scheduler::wait_until( context * active_ctx, - std::chrono::steady_clock::time_point const& sleep_tp, - std::function< void() > * func) noexcept { + std::chrono::steady_clock::time_point const& sleep_tp) noexcept { BOOST_ASSERT( nullptr != active_ctx); //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); BOOST_ASSERT( ! active_ctx->is_terminated() ); @@ -311,19 +337,57 @@ scheduler::wait_until( context * active_ctx, active_ctx->tp_ = sleep_tp; active_ctx->sleep_link( sleep_queue_); // resume another context - resume_( active_ctx, get_next_(), func); + resume_( active_ctx, get_next_() ); + // context has been resumed + // check if deadline has reached + return std::chrono::steady_clock::now() < sleep_tp; +} + +bool +scheduler::wait_until( context * active_ctx, + std::chrono::steady_clock::time_point const& sleep_tp, + detail::spinlock_lock & lk) noexcept { + BOOST_ASSERT( nullptr != active_ctx); + //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); + BOOST_ASSERT( ! active_ctx->is_terminated() ); + // if the active-fiber running in this thread calls + // condition:wait() and code in another thread calls + // condition::notify_one(), it might happen that the + // other thread pushes the fiber to remote ready-queue first + // the dispatcher-context migh have been moved the fiber from + // the remote ready-queue to the local ready-queue + // so we do not check + //BOOST_ASSERT( active_ctx->ready_is_linked() ); + BOOST_ASSERT( ! active_ctx->sleep_is_linked() ); + // active_ctx->wait_is_linked() might return true + // if context was locked inside timed_mutex::try_lock_until() + // context::wait_is_linked() is not sychronized + // with other threads + // push active context to sleep-queue + active_ctx->tp_ = sleep_tp; + active_ctx->sleep_link( sleep_queue_); + // resume another context + resume_( active_ctx, get_next_(), lk); // context has been resumed // check if deadline has reached return std::chrono::steady_clock::now() < sleep_tp; } void -scheduler::suspend( context * active_ctx, - std::function< void() > * func) noexcept { +scheduler::suspend( context * active_ctx) noexcept { BOOST_ASSERT( nullptr != active_ctx); //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); // resume another context - resume_( active_ctx, get_next_(), func); + resume_( active_ctx, get_next_() ); +} + +void +scheduler::suspend( context * active_ctx, + detail::spinlock_lock & lk) noexcept { + BOOST_ASSERT( nullptr != active_ctx); + //BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() ); + // resume another context + resume_( active_ctx, get_next_(), lk); } bool diff --git a/src/timed_mutex.cpp b/src/timed_mutex.cpp index f070050d..f05b5663 100644 --- a/src/timed_mutex.cpp +++ b/src/timed_mutex.cpp @@ -33,11 +33,8 @@ timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point const& timeo } BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->wait_link( wait_queue_); - std::function< void() > func([&lk](){ - lk.unlock(); - }); // suspend this fiber until notified or timed-out - if ( ! context::active()->wait_until( timeout_time, & func) ) { + if ( ! context::active()->wait_until( timeout_time, lk) ) { // remove fiber from wait-queue lk.lock(); ctx->wait_unlink(); @@ -61,11 +58,8 @@ timed_mutex::lock() { } BOOST_ASSERT( ! ctx->wait_is_linked() ); ctx->wait_link( wait_queue_); - std::function< void() > func([&lk](){ - lk.unlock(); - }); // suspend this fiber - ctx->suspend( & func); + ctx->suspend( lk); BOOST_ASSERT( ! ctx->wait_is_linked() ); }