2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-19 02:12:24 +00:00

execution_context required for segmented stacks

This commit is contained in:
Oliver Kowalke
2016-02-06 10:21:54 +01:00
parent 7e183329ee
commit 12555f9561
5 changed files with 236 additions and 92 deletions

View File

@@ -18,7 +18,11 @@
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/context/detail/apply.hpp>
#include <boost/context/captured_context.hpp>
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
# include <boost/context/captured_context.hpp>
#else
# include <boost/context/execution_context.hpp>
#endif
#include <boost/context/stack_context.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/parent_from_member.hpp>
@@ -157,6 +161,7 @@ private:
}
};
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
struct data_t {
detail::spinlock_lock * lk{ nullptr };
context * ctx{ nullptr };
@@ -178,6 +183,22 @@ private:
from{ from_ } {
}
};
#else
struct data_t {
detail::spinlock_lock * lk{ nullptr };
context * ctx{ nullptr };
constexpr data_t() noexcept = default;
explicit data_t( detail::spinlock_lock * lk_) noexcept :
lk{ lk_ } {
}
explicit data_t( context * ctx_) noexcept :
ctx{ ctx_ } {
}
};
#endif
typedef std::map< uintptr_t, fss_data > fss_data_t;
@@ -191,11 +212,16 @@ private:
int flags_;
#endif
scheduler * scheduler_{ nullptr };
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
boost::context::captured_context ctx_;
#else
boost::context::execution_context ctx_;
#endif
void resume_( data_t &) noexcept;
void set_ready_( context *) noexcept;
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
template< typename Fn, typename Tpl >
boost::context::captured_context
run_( boost::context::captured_context ctx, Fn && fn_, Tpl && tpl_, data_t * dp) noexcept {
@@ -215,6 +241,25 @@ private:
// terminate context
return set_terminated();
}
#else
template< typename Fn, typename Tpl >
void run_( Fn && fn_, Tpl && tpl_, data_t * dp) noexcept {
try {
typename std::decay< Fn >::type fn = std::forward< Fn >( fn_);
typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_);
if ( nullptr != dp->lk) {
dp->lk->unlock();
} else if ( nullptr != dp->ctx) {
active_->set_ready_( dp->ctx);
}
boost::context::detail::apply( std::move( fn), std::move( tpl) );
} catch ( fiber_interrupted const&) {
}
// terminate context
set_terminated();
BOOST_ASSERT_MSG( false, "fiber already terminated");
}
#endif
public:
detail::ready_hook ready_hook_{};
@@ -312,7 +357,8 @@ public:
Fn && fn, Tpl && tpl) :
use_count_{ 1 }, // fiber instance or scheduler owner
flags_{ flag_worker_context },
#if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS)
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
# if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS)
ctx_{ std::allocator_arg, palloc, salloc,
detail::wrap(
[this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl,
@@ -322,12 +368,32 @@ public:
std::forward< Fn >( fn),
std::forward< Tpl >( tpl) )}
#else
# else
ctx_{ std::allocator_arg, palloc, salloc,
[this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl)]
(boost::context::captured_context ctx, void * vp) mutable noexcept {
return run_( std::move( ctx), std::move( fn), std::move( tpl), static_cast< data_t * >( vp) );
}}
# endif
#else
# if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS)
ctx_{ std::allocator_arg, palloc, salloc,
detail::wrap(
[this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl,
boost::context::execution_context & ctx, void * vp) mutable noexcept {
run_( std::move( fn), std::move( tpl), static_cast< data_t * >( vp) );
},
std::forward< Fn >( fn),
std::forward< Tpl >( tpl),
boost::context::execution_context::current() )
}
# else
ctx_{ std::allocator_arg, palloc, salloc,
[this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl),
ctx=boost::context::execution_context::current()] (void * vp) mutable noexcept {
run_( std::move( fn), std::move( tpl), static_cast< data_t * >( vp) );
}}
# endif
#endif
{}
@@ -346,14 +412,17 @@ public:
void suspend() noexcept;
void suspend( detail::spinlock_lock &) noexcept;
boost::context::captured_context suspend_with_cc() noexcept;
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
boost::context::captured_context suspend_with_cc() noexcept;
boost::context::captured_context set_terminated() noexcept;
#else
void set_terminated() noexcept;
#endif
void join();
void yield() noexcept;
boost::context::captured_context set_terminated() noexcept;
bool wait_until( std::chrono::steady_clock::time_point const&) noexcept;
bool wait_until( std::chrono::steady_clock::time_point const&,
detail::spinlock_lock &) noexcept;
@@ -472,11 +541,18 @@ public:
friend void intrusive_ptr_release( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
if ( 0 == --ctx->use_count_) {
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
boost::context::captured_context cc( std::move( ctx->ctx_) );
// destruct context
ctx->~context();
// deallocated stack
cc();
#else
boost::context::execution_context ec( ctx->ctx_);
// destruct context
// deallocates stack (execution_context is ref counted)
ctx->~context();
#endif
}
}
};

View File

@@ -11,7 +11,11 @@
#include <boost/config.hpp>
#include <boost/context/detail/invoke.hpp>
#include <boost/context/execution_context.hpp>
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
# include <boost/context/captured_context.hpp>
#else
# include <boost/context/execution_context.hpp>
#endif
#include <boost/fiber/detail/config.hpp>
@@ -23,6 +27,7 @@ namespace boost {
namespace fibers {
namespace detail {
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
template< typename Fn1, typename Fn2, typename Tpl >
class wrapper {
private:
@@ -59,6 +64,48 @@ wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) {
std::forward< Fn2 >( fn2),
std::forward< Tpl >( tpl) );
}
#else
template< typename Fn1, typename Fn2, typename Tpl >
class wrapper {
private:
typename std::decay< Fn1 >::type fn1_;
typename std::decay< Fn2 >::type fn2_;
typename std::decay< Tpl >::type tpl_;
boost::context::execution_context ctx_;
public:
wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl,
boost::context::execution_context const& ctx) :
fn1_( std::move( fn1) ),
fn2_( std::move( fn2) ),
tpl_( std::move( tpl) ),
ctx_{ ctx } {
}
wrapper( wrapper const&) = delete;
wrapper & operator=( wrapper const&) = delete;
wrapper( wrapper && other) = default;
wrapper & operator=( wrapper && other) = default;
void operator()( void * vp) {
boost::context::detail::invoke(
std::move( fn1_),
fn2_, tpl_, ctx_, vp);
}
};
template< typename Fn1, typename Fn2, typename Tpl >
wrapper< Fn1, Fn2, Tpl >
wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl,
boost::context::execution_context const& ctx) {
return wrapper< Fn1, Fn2, Tpl >(
std::forward< Fn1 >( fn1),
std::forward< Fn2 >( fn2),
std::forward< Tpl >( tpl),
ctx);
}
#endif
}}}

View File

@@ -84,10 +84,6 @@ private:
detail::spinlock remote_ready_splk_{};
detail::spinlock worker_splk_{};
void resume_( context *, context *) noexcept;
void resume_( context *, context *, detail::spinlock_lock &) noexcept;
void resume_( context *, context *, context *) noexcept;
context * get_next_() noexcept;
void release_terminated_() noexcept;
@@ -104,13 +100,19 @@ public:
virtual ~scheduler();
boost::context::captured_context dispatch() noexcept;
void set_ready( context *) noexcept;
void set_remote_ready( context *) noexcept;
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
boost::context::captured_context dispatch() noexcept;
boost::context::captured_context set_terminated( context *) noexcept;
#else
void dispatch() noexcept;
void set_terminated( context *) noexcept;
#endif
void yield( context *) noexcept;

View File

@@ -134,6 +134,10 @@ context_initializer::~context_initializer() {
context *
context::active() noexcept {
#if defined(BOOST_USE_EXECUTION_CONTEXT)
// initialized the first time control passes; per thread
thread_local static boost::context::detail::activation_record_initializer rec_initializer;
#endif
// initialized the first time control passes; per thread
thread_local static context_initializer ctx_initializer;
return active_;
@@ -144,6 +148,7 @@ context::reset_active() noexcept {
active_ = nullptr;
}
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
void
context::resume_( data_t & d) noexcept {
auto result = ctx_( & d);
@@ -157,6 +162,17 @@ context::resume_( data_t & d) noexcept {
}
}
}
#else
void
context::resume_( data_t & d) noexcept {
data_t * dp = static_cast< data_t * >( ctx_( & d) );
if ( nullptr != dp->lk) {
dp->lk->unlock();
} else if ( nullptr != dp->ctx) {
active_->set_ready_( dp->ctx);
}
}
#endif
void
context::set_ready_( context * ctx) noexcept {
@@ -167,15 +183,20 @@ context::set_ready_( context * ctx) noexcept {
context::context( main_context_t) noexcept :
use_count_{ 1 }, // allocated on main- or thread-stack
flags_{ flag_main_context },
ctx_{} {
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
ctx_() {
#else
ctx_{ boost::context::execution_context::current() } {
#endif
}
// dispatcher fiber context
context::context( dispatcher_context_t, boost::context::preallocated const& palloc,
default_stack const& salloc, scheduler * sched) :
flags_{ flag_dispatcher_context },
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
ctx_{ std::allocator_arg, palloc, salloc,
[this,sched] ( boost::context::captured_context ctx, void * vp) noexcept {
[this,sched](boost::context::captured_context ctx, void * vp) noexcept {
data_t * dp = static_cast< data_t * >( vp);
// update captured_context of calling fiber
dp->from->ctx_ = std::move( ctx);
@@ -186,8 +207,23 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall
}
// execute scheduler::dispatch()
return sched->dispatch();
}} {
}
}}
#else
ctx_{ std::allocator_arg, palloc, salloc,
[this,sched](void * vp) noexcept {
data_t * dp = static_cast< data_t * >( vp);
if ( nullptr != dp->lk) {
dp->lk->unlock();
} else if ( nullptr != dp->ctx) {
active_->set_ready_( dp->ctx);
}
// execute scheduler::dispatch()
sched->dispatch();
// dispatcher context should never return from scheduler::dispatch()
BOOST_ASSERT_MSG( false, "disatcher fiber already terminated");
}}
#endif
{}
context::~context() {
BOOST_ASSERT( wait_queue_.empty() );
@@ -207,24 +243,17 @@ context::get_id() const noexcept {
return id( const_cast< context * >( this) );
}
boost::context::captured_context
context::suspend_with_cc() noexcept {
context * prev = this;
// active_ will point to `this`
// prev will point to previous active context
std::swap( active_, prev);
data_t d{ prev };
// context switch
return std::move( std::get< 0 >( ctx_( & d) ) );
}
void
context::resume() noexcept {
context * prev = this;
// active_ will point to `this`
// prev will point to previous active context
std::swap( active_, prev);
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
data_t d{ prev };
#else
data_t d{};
#endif
resume_( d);
}
@@ -234,7 +263,11 @@ context::resume( detail::spinlock_lock & lk) noexcept {
// active_ will point to `this`
// prev will point to previous active context
std::swap( active_, prev);
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
data_t d{ & lk, prev };
#else
data_t d{ & lk };
#endif
resume_( d);
}
@@ -244,7 +277,11 @@ context::resume( context * ready_ctx) noexcept {
// active_ will point to `this`
// prev will point to previous active context
std::swap( active_, prev);
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
data_t d{ ready_ctx, prev };
#else
data_t d{ ready_ctx };
#endif
resume_( d);
}
@@ -290,7 +327,24 @@ context::yield() noexcept {
scheduler_->yield( context::active() );
}
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
boost::context::captured_context
context::suspend_with_cc() noexcept {
context * prev = this;
// active_ will point to `this`
// prev will point to previous active context
std::swap( active_, prev);
data_t d{ prev };
// context switch
return std::move( std::get< 0 >( ctx_( & d) ) );
}
#endif
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
boost::context::captured_context
#else
void
#endif
context::set_terminated() noexcept {
// protect for concurrent access
std::unique_lock< detail::spinlock > lk( splk_);

View File

@@ -22,60 +22,6 @@
namespace boost {
namespace fibers {
boost::context::captured_context
suspend_with_cc( context * active_ctx, context * ctx) {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != ctx);
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() );
BOOST_ASSERT( active_ctx != ctx);
// resume active-fiber == ctx
return ctx->suspend_with_cc();
}
void
scheduler::resume_( context * active_ctx, context * ctx) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != ctx);
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
BOOST_ASSERT( this == active_ctx->get_scheduler() );
BOOST_ASSERT( this == ctx->get_scheduler() );
BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() );
BOOST_ASSERT( active_ctx != ctx);
// resume active-fiber == ctx
ctx->resume();
BOOST_ASSERT( context::active() == active_ctx);
}
void
scheduler::resume_( context * active_ctx, context * ctx, detail::spinlock_lock & lk) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != ctx);
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
BOOST_ASSERT( this == active_ctx->get_scheduler() );
BOOST_ASSERT( this == ctx->get_scheduler() );
BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() );
BOOST_ASSERT( active_ctx != ctx);
// resume active-fiber == ctx
ctx->resume( lk);
BOOST_ASSERT( context::active() == active_ctx);
}
void
scheduler::resume_( context * active_ctx, context * ctx, context * ready_ctx) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( ready_ctx == active_ctx);
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
BOOST_ASSERT( this == active_ctx->get_scheduler() );
BOOST_ASSERT( this == ctx->get_scheduler() );
BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() );
BOOST_ASSERT( active_ctx != ctx);
// resume active-fiber == ctx
ctx->resume( ready_ctx);
BOOST_ASSERT( context::active() == active_ctx);
}
context *
scheduler::get_next_() noexcept {
context * ctx = sched_algo_->pick_next();
@@ -164,7 +110,7 @@ scheduler::~scheduler() {
// signal dispatcher-context termination
shutdown_ = true;
// resume pending fibers
resume_( main_ctx_, get_next_() );
get_next_()->resume();
// no context' in worker-queue
//BOOST_ASSERT( worker_queue_.empty() );
BOOST_ASSERT( terminated_queue_.empty() );
@@ -179,8 +125,13 @@ scheduler::~scheduler() {
main_ctx_ = nullptr;
}
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
boost::context::captured_context
scheduler::dispatch() noexcept {
#else
void
scheduler::dispatch() noexcept {
#endif
BOOST_ASSERT( context::active() == dispatcher_ctx_);
while ( ! shutdown_) {
// release termianted context'
@@ -195,7 +146,7 @@ scheduler::dispatch() noexcept {
// push dispatcher-context to ready-queue
// so that ready-queue never becomes empty
sched_algo_->awakened( dispatcher_ctx_.get() );
resume_( dispatcher_ctx_.get(), ctx);
ctx->resume();
BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
} else {
// no ready context, wait till signaled
@@ -232,7 +183,7 @@ scheduler::dispatch() noexcept {
if ( nullptr != ( ctx = get_next_() ) ) {
// resume ready context's
sched_algo_->awakened( dispatcher_ctx_.get() );
resume_( dispatcher_ctx_.get(), ctx);
ctx->resume();
BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
}
}
@@ -240,7 +191,11 @@ scheduler::dispatch() noexcept {
// release termianted context'
release_terminated_();
// return to main-context
return suspend_with_cc( dispatcher_ctx_.get(), main_ctx_);
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
return main_ctx_->suspend_with_cc();
#else
main_ctx_->resume();
#endif
}
void
@@ -287,8 +242,13 @@ scheduler::set_remote_ready( context * ctx) noexcept {
sched_algo_->notify();
}
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
boost::context::captured_context
scheduler::set_terminated( context * active_ctx) noexcept {
#else
void
scheduler::set_terminated( context * active_ctx) noexcept {
#endif
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( context::active() == active_ctx);
BOOST_ASSERT( ! active_ctx->is_main_context() );
@@ -303,7 +263,12 @@ scheduler::set_terminated( context * active_ctx) noexcept {
// intrusive_ptr_release( ctx);
active_ctx->terminated_link( terminated_queue_);
// resume another fiber
return suspend_with_cc( active_ctx, get_next_() );
#if ! defined(BOOST_USE_EXECUTION_CONTEXT)
return get_next_()->suspend_with_cc();
#else
// resume another fiber
get_next_()->resume();
#endif
}
void
@@ -322,7 +287,7 @@ scheduler::yield( context * active_ctx) noexcept {
// already suspended until another thread resumes it
// (== maked as ready)
// resume another fiber
resume_( active_ctx, get_next_(), active_ctx);
get_next_()->resume( active_ctx);
}
bool
@@ -348,7 +313,7 @@ scheduler::wait_until( context * active_ctx,
active_ctx->tp_ = sleep_tp;
active_ctx->sleep_link( sleep_queue_);
// resume another context
resume_( active_ctx, get_next_() );
get_next_()->resume();
// context has been resumed
// check if deadline has reached
return std::chrono::steady_clock::now() < sleep_tp;
@@ -378,7 +343,7 @@ scheduler::wait_until( context * active_ctx,
active_ctx->tp_ = sleep_tp;
active_ctx->sleep_link( sleep_queue_);
// resume another context
resume_( active_ctx, get_next_(), lk);
get_next_()->resume( lk);
// context has been resumed
// check if deadline has reached
return std::chrono::steady_clock::now() < sleep_tp;
@@ -389,7 +354,7 @@ scheduler::suspend( context * active_ctx) noexcept {
BOOST_ASSERT( nullptr != active_ctx);
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
// resume another context
resume_( active_ctx, get_next_() );
get_next_()->resume();
}
void
@@ -398,7 +363,7 @@ scheduler::suspend( context * active_ctx,
BOOST_ASSERT( nullptr != active_ctx);
//BOOST_ASSERT( main_ctx_ == active_ctx || dispatcher_ctx_.get() == active_ctx || active_ctx->worker_is_linked() );
// resume another context
resume_( active_ctx, get_next_(), lk);
get_next_()->resume( lk);
}
bool