mirror of
https://github.com/boostorg/fiber.git
synced 2026-02-16 13:22:17 +00:00
unwind at shutdown
- scheduler holds a list of managed worker-fibers - ~scheduler() requests stack unwinding for fibers not yet terminated
This commit is contained in:
@@ -98,6 +98,14 @@ typedef intrusive::list_member_hook<
|
||||
>
|
||||
> terminated_hook;
|
||||
|
||||
struct managed_tag;
|
||||
typedef intrusive::list_member_hook<
|
||||
intrusive::tag< managed_tag >,
|
||||
intrusive::link_mode<
|
||||
intrusive::auto_unlink
|
||||
>
|
||||
> worker_hook;
|
||||
|
||||
}
|
||||
|
||||
struct main_context_t {};
|
||||
@@ -117,7 +125,8 @@ private:
|
||||
flag_worker_context = 1 << 3,
|
||||
flag_terminated = 1 << 4,
|
||||
flag_interruption_blocked = 1 << 5,
|
||||
flag_interruption_requested = 1 << 6
|
||||
flag_interruption_requested = 1 << 6,
|
||||
flag_forced_unwind = 1 << 7
|
||||
};
|
||||
|
||||
struct BOOST_FIBERS_DECL fss_data {
|
||||
@@ -156,10 +165,11 @@ private:
|
||||
boost::context::execution_context ctx_;
|
||||
|
||||
public:
|
||||
detail::worker_hook worker_hook_;
|
||||
detail::terminated_hook terminated_hook_;
|
||||
detail::ready_hook ready_hook_;
|
||||
detail::remote_ready_hook remote_ready_hook_;
|
||||
detail::sleep_hook sleep_hook_;
|
||||
detail::terminated_hook terminated_hook_;
|
||||
detail::wait_hook wait_hook_;
|
||||
std::chrono::steady_clock::time_point tp_;
|
||||
|
||||
@@ -259,6 +269,7 @@ public:
|
||||
// invoke fiber function
|
||||
boost::context::detail::invoke_helper( std::move( fn), std::move( tpl) );
|
||||
} catch ( fiber_interrupted const&) {
|
||||
} catch ( forced_unwind const&) {
|
||||
} catch ( ... ) {
|
||||
std::terminate();
|
||||
}
|
||||
@@ -270,9 +281,11 @@ public:
|
||||
suspend();
|
||||
BOOST_ASSERT_MSG( false, "fiber already terminated");
|
||||
}),
|
||||
worker_hook_(),
|
||||
terminated_hook_(),
|
||||
ready_hook_(),
|
||||
remote_ready_hook_(),
|
||||
terminated_hook_(),
|
||||
sleep_hook_(),
|
||||
wait_hook_(),
|
||||
tp_( (std::chrono::steady_clock::time_point::max)() ),
|
||||
fss_data_(),
|
||||
@@ -328,6 +341,14 @@ public:
|
||||
return 0 != ( flags_ & flag_interruption_requested);
|
||||
}
|
||||
|
||||
void request_interruption( bool req) noexcept;
|
||||
|
||||
bool unwinding_requested() const noexcept {
|
||||
return 0 != ( flags_ & flag_forced_unwind);
|
||||
}
|
||||
|
||||
void request_unwinding() noexcept;
|
||||
|
||||
void * get_fss_data( void const * vp) const;
|
||||
|
||||
void set_fss_data(
|
||||
@@ -336,7 +357,7 @@ public:
|
||||
void * data,
|
||||
bool cleanup_existing);
|
||||
|
||||
void request_interruption( bool req) noexcept;
|
||||
bool managed_is_linked();
|
||||
|
||||
bool ready_is_linked();
|
||||
|
||||
@@ -346,6 +367,8 @@ public:
|
||||
|
||||
bool wait_is_linked();
|
||||
|
||||
void managed_unlink();
|
||||
|
||||
void sleep_unlink();
|
||||
|
||||
void wait_unlink();
|
||||
|
||||
@@ -24,6 +24,8 @@
|
||||
namespace boost {
|
||||
namespace fibers {
|
||||
|
||||
struct forced_unwind {};
|
||||
|
||||
class fiber_exception : public std::system_error {
|
||||
public:
|
||||
fiber_exception() :
|
||||
|
||||
@@ -56,13 +56,30 @@ private:
|
||||
detail::terminated_hook,
|
||||
& context::terminated_hook_ >,
|
||||
intrusive::constant_time_size< false > > terminated_queue_t;
|
||||
typedef intrusive::list<
|
||||
context,
|
||||
intrusive::member_hook<
|
||||
context,
|
||||
detail::worker_hook,
|
||||
& context::worker_hook_ >,
|
||||
intrusive::constant_time_size< false > > worker_queue_t;
|
||||
|
||||
context * main_ctx_;
|
||||
intrusive_ptr< context > dispatcher_ctx_;
|
||||
ready_queue_t ready_queue_;
|
||||
remote_ready_queue_t remote_ready_queue_;
|
||||
sleep_queue_t sleep_queue_;
|
||||
// worker-queue contains all context' mananged by this scheduler
|
||||
// except main-context and dispatcher-context
|
||||
// unlink happens on destruction of a context
|
||||
worker_queue_t worker_queue_;
|
||||
// terminated-queue contains context' which have been terminated
|
||||
terminated_queue_t terminated_queue_;
|
||||
// ready-queue contains context' ready to be resumed
|
||||
ready_queue_t ready_queue_;
|
||||
// remote ready-queue contains context' signaled by schedulers
|
||||
// running in other threads
|
||||
remote_ready_queue_t remote_ready_queue_;
|
||||
// sleep-queue cotnains context' whic hahve been called
|
||||
// scheduler::wait_until()
|
||||
sleep_queue_t sleep_queue_;
|
||||
bool shutdown_;
|
||||
detail::autoreset_event ready_queue_ev_;
|
||||
detail::spinlock remote_ready_splk_;
|
||||
|
||||
@@ -58,9 +58,11 @@ context::context( main_context_t) :
|
||||
flags_( flag_main_context),
|
||||
scheduler_( nullptr),
|
||||
ctx_( boost::context::execution_context::current() ),
|
||||
worker_hook_(),
|
||||
terminated_hook_(),
|
||||
ready_hook_(),
|
||||
remote_ready_hook_(),
|
||||
terminated_hook_(),
|
||||
sleep_hook_(),
|
||||
wait_hook_(),
|
||||
tp_( (std::chrono::steady_clock::time_point::max)() ),
|
||||
fss_data_(),
|
||||
@@ -81,9 +83,11 @@ context::context( dispatcher_context_t, boost::context::preallocated const& pall
|
||||
// dispatcher context should never return from scheduler::dispatch()
|
||||
BOOST_ASSERT_MSG( false, "disatcher fiber already terminated");
|
||||
}),
|
||||
worker_hook_(),
|
||||
terminated_hook_(),
|
||||
ready_hook_(),
|
||||
remote_ready_hook_(),
|
||||
terminated_hook_(),
|
||||
sleep_hook_(),
|
||||
wait_hook_(),
|
||||
tp_( (std::chrono::steady_clock::time_point::max)() ),
|
||||
fss_data_(),
|
||||
@@ -196,6 +200,8 @@ void
|
||||
context::set_ready( context * ctx) noexcept {
|
||||
BOOST_ASSERT( nullptr != ctx);
|
||||
BOOST_ASSERT( this != ctx);
|
||||
BOOST_ASSERT( nullptr != scheduler_);
|
||||
BOOST_ASSERT( nullptr != ctx->scheduler_);
|
||||
// FIXME: comparing scheduler address' must be synchronized?
|
||||
// what if ctx is migrated between threads
|
||||
// (other scheduler assigned)
|
||||
@@ -227,6 +233,13 @@ context::request_interruption( bool req) noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
context::request_unwinding() noexcept {
|
||||
BOOST_ASSERT( ! is_main_context() );
|
||||
BOOST_ASSERT( ! is_dispatcher_context() );
|
||||
flags_ |= flag_forced_unwind;
|
||||
}
|
||||
|
||||
void *
|
||||
context::get_fss_data( void const * vp) const {
|
||||
uintptr_t key( reinterpret_cast< uintptr_t >( vp) );
|
||||
@@ -263,6 +276,11 @@ context::set_fss_data( void const * vp,
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
context::managed_is_linked() {
|
||||
return worker_hook_.is_linked();
|
||||
}
|
||||
|
||||
bool
|
||||
context::ready_is_linked() {
|
||||
return ready_hook_.is_linked();
|
||||
@@ -283,6 +301,11 @@ context::wait_is_linked() {
|
||||
return wait_hook_.is_linked();
|
||||
}
|
||||
|
||||
void
|
||||
context::managed_unlink() {
|
||||
worker_hook_.unlink();
|
||||
}
|
||||
|
||||
void
|
||||
context::sleep_unlink() {
|
||||
sleep_hook_.unlink();
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <boost/assert.hpp>
|
||||
|
||||
#include "boost/fiber/context.hpp"
|
||||
#include "boost/fiber/exceptions.hpp"
|
||||
|
||||
#ifdef BOOST_HAS_ABI_HEADERS
|
||||
# include BOOST_ABI_PREFIX
|
||||
@@ -24,6 +25,12 @@ void
|
||||
scheduler::resume_( context * active_ctx, context * ctx) {
|
||||
BOOST_ASSERT( nullptr != active_ctx);
|
||||
BOOST_ASSERT( nullptr != ctx);
|
||||
BOOST_ASSERT( main_ctx_ == active_ctx ||
|
||||
dispatcher_ctx_.get() == active_ctx ||
|
||||
active_ctx->managed_is_linked() );
|
||||
BOOST_ASSERT( main_ctx_ == ctx ||
|
||||
dispatcher_ctx_.get() == ctx ||
|
||||
ctx->managed_is_linked() );
|
||||
BOOST_ASSERT( active_ctx->get_scheduler() == ctx->get_scheduler() );
|
||||
// fiber next-to-run is same as current active-fiber
|
||||
// this might happen in context of this_fiber::yield()
|
||||
@@ -35,6 +42,12 @@ scheduler::resume_( context * active_ctx, context * ctx) {
|
||||
// resume active-fiber == ctx
|
||||
ctx->resume();
|
||||
BOOST_ASSERT( context::active() == active_ctx);
|
||||
BOOST_ASSERT( main_ctx_ == active_ctx ||
|
||||
dispatcher_ctx_.get() == active_ctx ||
|
||||
active_ctx->managed_is_linked() );
|
||||
if ( ctx->unwinding_requested() ) {
|
||||
throw forced_unwind();
|
||||
}
|
||||
}
|
||||
|
||||
context *
|
||||
@@ -53,7 +66,16 @@ scheduler::release_terminated_() {
|
||||
for ( terminated_queue_t::iterator i( terminated_queue_.begin() );
|
||||
i != e;) {
|
||||
context * ctx = & ( * i);
|
||||
BOOST_ASSERT( ! ctx->is_main_context() );
|
||||
BOOST_ASSERT( ! ctx->is_dispatcher_context() );
|
||||
BOOST_ASSERT( ctx->managed_is_linked() );
|
||||
BOOST_ASSERT( ctx->is_terminated() );
|
||||
BOOST_ASSERT( ! ctx->ready_is_linked() );
|
||||
BOOST_ASSERT( ! ctx->sleep_is_linked() );
|
||||
i = terminated_queue_.erase( i);
|
||||
// if last reference, e.g. fiber::join() or fiber::detach()
|
||||
// have been already called, this will call ~context(),
|
||||
// the context is automatically removeid from worker-queue
|
||||
intrusive_ptr_release( ctx);
|
||||
}
|
||||
}
|
||||
@@ -65,16 +87,12 @@ scheduler::remote_ready2ready_() {
|
||||
std::unique_lock< detail::spinlock > lk( remote_ready_splk_);
|
||||
remote_ready_queue_.swap( tmp);
|
||||
lk.unlock();
|
||||
// get from remote ready-queue
|
||||
// get context from remote ready-queue
|
||||
while ( ! tmp.empty() ) {
|
||||
context * ctx = & tmp.front();
|
||||
tmp.pop_front();
|
||||
// context must no be contained in ready-queue
|
||||
// no need to check agains active context
|
||||
// because we are in scheduler::dispatch() -> dispatcher-context
|
||||
if ( ! ctx->ready_is_linked() ) {
|
||||
ready_queue_.push_back( * ctx);
|
||||
}
|
||||
// store context in local queues
|
||||
set_ready( ctx);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,6 +106,9 @@ scheduler::sleep2ready_() noexcept {
|
||||
sleep_queue_t::iterator e = sleep_queue_.end();
|
||||
for ( sleep_queue_t::iterator i = sleep_queue_.begin(); i != e;) {
|
||||
context * ctx = & ( * i);
|
||||
BOOST_ASSERT( ! ctx->is_dispatcher_context() );
|
||||
BOOST_ASSERT( main_ctx_ == ctx ||
|
||||
ctx->managed_is_linked() );
|
||||
BOOST_ASSERT( ! ctx->is_terminated() );
|
||||
BOOST_ASSERT( ! ctx->ready_is_linked() );
|
||||
BOOST_ASSERT( ctx->sleep_is_linked() );
|
||||
@@ -102,7 +123,7 @@ scheduler::sleep2ready_() noexcept {
|
||||
// push new context to ready-queue
|
||||
ready_queue_.push_back( * ctx);
|
||||
} else {
|
||||
break; // first element with ctx->tp_ > now, leave for-loop
|
||||
break; // first context with now < deadline
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -110,9 +131,11 @@ scheduler::sleep2ready_() noexcept {
|
||||
scheduler::scheduler() noexcept :
|
||||
main_ctx_( nullptr),
|
||||
dispatcher_ctx_(),
|
||||
worker_queue_(),
|
||||
terminated_queue_(),
|
||||
ready_queue_(),
|
||||
remote_ready_queue_(),
|
||||
terminated_queue_(),
|
||||
sleep_queue_(),
|
||||
shutdown_( false),
|
||||
ready_queue_ev_(),
|
||||
remote_ready_splk_() {
|
||||
@@ -126,6 +149,12 @@ scheduler::~scheduler() noexcept {
|
||||
shutdown_ = true;
|
||||
// resume pending fibers
|
||||
resume_( main_ctx_, get_next_() );
|
||||
// no context' in worker-queue
|
||||
BOOST_ASSERT( worker_queue_.empty() );
|
||||
BOOST_ASSERT( terminated_queue_.empty() );
|
||||
BOOST_ASSERT( ready_queue_.empty() );
|
||||
BOOST_ASSERT( remote_ready_queue_.empty() );
|
||||
BOOST_ASSERT( sleep_queue_.empty() );
|
||||
// deallocate dispatcher-context
|
||||
dispatcher_ctx_.reset();
|
||||
// set main-context to nullptr
|
||||
@@ -135,6 +164,9 @@ scheduler::~scheduler() noexcept {
|
||||
void
|
||||
scheduler::set_main_context( context * main_ctx) noexcept {
|
||||
BOOST_ASSERT( nullptr != main_ctx);
|
||||
// main-context represents the execution context created
|
||||
// by the system, e.g. main()- or thread-context
|
||||
// should not be in worker-queue
|
||||
main_ctx_ = main_ctx;
|
||||
main_ctx_->set_scheduler( this);
|
||||
}
|
||||
@@ -142,6 +174,12 @@ scheduler::set_main_context( context * main_ctx) noexcept {
|
||||
void
|
||||
scheduler::set_dispatcher_context( intrusive_ptr< context > dispatcher_ctx) noexcept {
|
||||
BOOST_ASSERT( dispatcher_ctx);
|
||||
// dispatcher context has to handle
|
||||
// - remote ready context'
|
||||
// - sleeping context'
|
||||
// - extern event-loops
|
||||
// - suspending the thread if ready-queue is empty (waiting on external event)
|
||||
// should not be in worker-queue
|
||||
dispatcher_ctx_.swap( dispatcher_ctx);
|
||||
// add dispatcher-context to ready-queue
|
||||
// so it is the first element in the ready-queue
|
||||
@@ -192,8 +230,17 @@ scheduler::dispatch() {
|
||||
resume_( dispatcher_ctx_.get(), ctx);
|
||||
BOOST_ASSERT( context::active() == dispatcher_ctx_.get() );
|
||||
}
|
||||
// interrupt all context' in ready- and sleep-queue
|
||||
release_terminated_();
|
||||
// loop till all context' have been terminated
|
||||
while ( ! worker_queue_.empty() ) {
|
||||
release_terminated_();
|
||||
// force unwinding of all context' in worker-queue
|
||||
worker_queue_t::iterator e = worker_queue_.end();
|
||||
for ( worker_queue_t::iterator i = worker_queue_.begin(); i != e; ++i) {
|
||||
context * ctx = & ( * i);
|
||||
ctx->request_unwinding();
|
||||
set_ready( ctx);
|
||||
}
|
||||
}
|
||||
resume_( dispatcher_ctx_.get(), main_ctx_);
|
||||
}
|
||||
|
||||
@@ -201,34 +248,51 @@ void
|
||||
scheduler::set_ready( context * ctx) noexcept {
|
||||
BOOST_ASSERT( nullptr != ctx);
|
||||
BOOST_ASSERT( ! ctx->is_terminated() );
|
||||
// if context is already in ready-queue, return
|
||||
// this might happend if a newly create fiber is
|
||||
// signaled to interrupt
|
||||
if ( ctx->ready_is_linked() ) {
|
||||
return;
|
||||
}
|
||||
// we do not test for wait-queue because
|
||||
// context::wait_is_linked() is not sychronized
|
||||
// with other threads
|
||||
//BOOST_ASSERT( active_ctx->wait_is_linked() );
|
||||
// handle newly created context
|
||||
if ( ! ctx->is_main_context() ) {
|
||||
if ( ! ctx->managed_is_linked() ) {
|
||||
// attach context to `this`-scheduler
|
||||
ctx->set_scheduler( this);
|
||||
// push to the worker-queue
|
||||
worker_queue_.push_back( * ctx);
|
||||
}
|
||||
} else {
|
||||
// sanity checks, main-contxt might by signaled
|
||||
// from another thread
|
||||
BOOST_ASSERT( main_ctx_ == ctx);
|
||||
BOOST_ASSERT( this == ctx->get_scheduler() );
|
||||
}
|
||||
// remove context ctx from sleep-queue
|
||||
// (might happen if blocked in timed_mutex::try_lock_until())
|
||||
// FIXME: mabye better done in scheduler::dispatch()
|
||||
if ( ctx->sleep_is_linked() ) {
|
||||
// unlink it from sleep-queue
|
||||
ctx->sleep_unlink();
|
||||
}
|
||||
// set the scheduler for new context
|
||||
ctx->set_scheduler( this);
|
||||
// push new context to ready-queue
|
||||
ready_queue_.push_back( * ctx);
|
||||
// if context is already in ready-queue, do return
|
||||
// this might happend if a newly created fiber was
|
||||
// signaled to interrupt
|
||||
if ( ! ctx->ready_is_linked() ) {
|
||||
// push new context to ready-queue
|
||||
ready_queue_.push_back( * ctx);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
scheduler::set_remote_ready( context * ctx) noexcept {
|
||||
BOOST_ASSERT( nullptr != ctx);
|
||||
BOOST_ASSERT( ! ctx->is_dispatcher_context() );
|
||||
BOOST_ASSERT( this == ctx->get_scheduler() );
|
||||
// another thread might signal the main-context
|
||||
// from this thread
|
||||
//BOOST_ASSERT( ! ctx->is_main_context() );
|
||||
// context ctx might in wait-/ready-/sleep-queue
|
||||
// we do not test this in this function
|
||||
// scheduler::dispatcher() has to take care
|
||||
ctx->set_scheduler( this);
|
||||
// protect for concurrent access
|
||||
std::unique_lock< detail::spinlock > lk( remote_ready_splk_);
|
||||
// push new context to remote ready-queue
|
||||
@@ -238,6 +302,9 @@ scheduler::set_remote_ready( context * ctx) noexcept {
|
||||
void
|
||||
scheduler::set_terminated( context * ctx) noexcept {
|
||||
BOOST_ASSERT( nullptr != ctx);
|
||||
BOOST_ASSERT( ! ctx->is_main_context() );
|
||||
BOOST_ASSERT( ! ctx->is_dispatcher_context() );
|
||||
BOOST_ASSERT( ctx->managed_is_linked() );
|
||||
BOOST_ASSERT( ctx->is_terminated() );
|
||||
BOOST_ASSERT( ! ctx->ready_is_linked() );
|
||||
BOOST_ASSERT( ! ctx->sleep_is_linked() );
|
||||
@@ -251,6 +318,9 @@ scheduler::set_terminated( context * ctx) noexcept {
|
||||
void
|
||||
scheduler::yield( context * active_ctx) noexcept {
|
||||
BOOST_ASSERT( nullptr != active_ctx);
|
||||
BOOST_ASSERT( main_ctx_ == active_ctx ||
|
||||
dispatcher_ctx_.get() == active_ctx ||
|
||||
active_ctx->managed_is_linked() );
|
||||
BOOST_ASSERT( ! active_ctx->is_terminated() );
|
||||
BOOST_ASSERT( ! active_ctx->ready_is_linked() );
|
||||
BOOST_ASSERT( ! active_ctx->sleep_is_linked() );
|
||||
@@ -267,6 +337,9 @@ bool
|
||||
scheduler::wait_until( context * active_ctx,
|
||||
std::chrono::steady_clock::time_point const& sleep_tp) noexcept {
|
||||
BOOST_ASSERT( nullptr != active_ctx);
|
||||
BOOST_ASSERT( main_ctx_ == active_ctx ||
|
||||
dispatcher_ctx_.get() == active_ctx ||
|
||||
active_ctx->managed_is_linked() );
|
||||
BOOST_ASSERT( ! active_ctx->is_terminated() );
|
||||
// if the active-fiber running in this thread calls
|
||||
// condition:wait() and code in another thread calls
|
||||
@@ -294,6 +367,9 @@ scheduler::wait_until( context * active_ctx,
|
||||
void
|
||||
scheduler::re_schedule( context * active_ctx) noexcept {
|
||||
BOOST_ASSERT( nullptr != active_ctx);
|
||||
BOOST_ASSERT( main_ctx_ == active_ctx ||
|
||||
dispatcher_ctx_.get() == active_ctx ||
|
||||
active_ctx->managed_is_linked() );
|
||||
// resume another context
|
||||
resume_( active_ctx, get_next_() );
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user