2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-02 08:52:07 +00:00

use coroutines inside fiber

This commit is contained in:
Oliver Kowalke
2013-09-09 08:52:26 +02:00
parent 00ec9d32a3
commit e996d2e28f
6 changed files with 130 additions and 402 deletions

View File

@@ -11,7 +11,7 @@ import toolset ;
project boost/fiber
: requirements
<library>/boost/context//boost_context
<library>/boost/coroutine//boost_coroutine
<library>/boost/chrono//boost_chrono
<toolset>gcc-4.7,<segmented-stacks>on:<cxxflags>-fsplit-stack
<toolset>gcc-4.7,<segmented-stacks>on:<linkflags>"-static-libgcc"
@@ -70,7 +70,7 @@ lib boost_fiber
recursive_timed_mutex.cpp
round_robin.cpp
timed_mutex.cpp
: <link>shared:<library>../../context/build//boost_context
: <link>shared:<library>../../coroutine/build//boost_coroutine
<link>shared:<library>../../chrono/build//boost_chrono
;

View File

@@ -10,7 +10,7 @@
project boost/fiber/example/cpp03
: requirements
<library>../build//boost_fiber
<library>/boost/context//boost_context
<library>/boost/coroutine//boost_coroutine
<library>/boost/system//boost_system
<library>/boost/thread//boost_thread
<link>static

View File

@@ -13,12 +13,16 @@
#include <vector>
#include <boost/assert.hpp>
#include <boost/bind.hpp>
#include <boost/config.hpp>
#include <boost/coroutine/all.hpp>
#include <boost/cstdint.hpp>
#include <boost/exception_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/move/move.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/attributes.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/fiber_context.hpp>
#include <boost/fiber/detail/flags.hpp>
@@ -38,7 +42,7 @@ namespace boost {
namespace fibers {
namespace detail {
struct forced_unwind {};
namespace coro = boost::coroutines;
class BOOST_FIBERS_DECL fiber_base : public notify
{
@@ -77,19 +81,58 @@ private:
fss_data_t fss_data_;
protected:
state_t state_;
int flags_;
int priority_;
fiber_context caller_;
fiber_context callee_;
exception_ptr except_;
std::vector< ptr_t > waiting_;
// set terminate is only set inside fiber_base::trampoline_()
void set_terminated_() BOOST_NOEXCEPT
{
state_t previous = TERMINATED;
std::swap( state_, previous);
BOOST_ASSERT( RUNNING == previous);
}
virtual void unwind_stack() = 0;
void trampoline_( coro::coroutine< void >::push_type & c)
{
BOOST_ASSERT( c);
BOOST_ASSERT( ! is_terminated() );
callee_ = & c;
set_running();
suspend();
try
{
BOOST_ASSERT( is_running() );
run();
BOOST_ASSERT( is_running() );
}
catch ( coro::detail::forced_unwind const&)
{
set_terminated_();
release();
throw;
}
catch (...)
{ except_ = current_exception(); }
set_terminated_();
release();
suspend();
BOOST_ASSERT_MSG( false, "fiber already terminated");
}
protected:
state_t state_;
int flags_;
int priority_;
coro::coroutine< void >::pull_type caller_;
coro::coroutine< void >::push_type * callee_;
exception_ptr except_;
std::vector< ptr_t > waiting_;
void release();
virtual void run() = 0;
public:
class id
{
@@ -143,9 +186,39 @@ public:
{ return 0 == impl_; }
};
fiber_base( fiber_context::ctx_fn fn,
stack_context * stack_ctx,
bool preserve_fpu);
template< typename StackAllocator, typename Allocator >
fiber_base( attributes const& attr, StackAllocator const& stack_alloc, Allocator const& alloc) :
fss_data_(),
state_( READY),
flags_( 0),
priority_( 0),
caller_(),
callee_( 0),
except_(),
waiting_()
{
BOOST_ASSERT( ! caller_);
BOOST_ASSERT( ! callee_);
typedef typename Allocator::template rebind<
coro::coroutine< void >::pull_type
>::other allocator_t;
caller_ = coro::coroutine< void >::pull_type(
boost::bind( & fiber_base::trampoline_, this, _1),
coro::attributes(
attr.size,
fpu_preserved == attr.preserve_fpu
? coro::fpu_preserved
: coro::fpu_not_preserved),
stack_alloc,
allocator_t( alloc) );
set_ready(); // fiber is setup and now ready to run
BOOST_ASSERT( caller_);
BOOST_ASSERT( callee_);
}
virtual ~fiber_base();
@@ -164,15 +237,6 @@ public:
bool join( ptr_t const&);
bool force_unwind() const BOOST_NOEXCEPT
{ return 0 != ( flags_ & flag_force_unwind); }
bool unwind_requested() const BOOST_NOEXCEPT
{ return 0 != ( flags_ & flag_unwind_stack); }
bool preserve_fpu() const BOOST_NOEXCEPT
{ return 0 != ( flags_ & flag_preserve_fpu); }
bool interruption_enabled() const BOOST_NOEXCEPT
{ return 0 == ( flags_ & flag_interruption_blocked); }
@@ -210,46 +274,8 @@ public:
bool is_waiting() const BOOST_NOEXCEPT
{ return WAITING == state_; }
// set terminate is only set inside fiber_object::exec()
// it is set after the fiber-function was left == at the end of exec()
void set_terminated() BOOST_NOEXCEPT
{
// other thread could have called set_ready() before
// case: - this fiber has joined another fiber running in another thread,
// - other fiber terminated and releases its joining fibers
// - this fiber was interrupted before and therefore resumed
// and throws fiber_interrupted
// - fiber_interrupted was not catched and swallowed
// - other fiber calls set_ready() on this fiber happend before this
// fiber calls set_terminated()
// - this fiber stack gets unwound and set_terminated() is called at the end
state_t previous = TERMINATED;
std::swap( state_, previous);
BOOST_ASSERT( RUNNING == previous);
//BOOST_ASSERT( RUNNING == previous || READY == previous);
}
void set_ready() BOOST_NOEXCEPT
{
#if 0
// this fiber calls set_ready(): - only transition from WAITING (wake-up)
// - or transition from RUNNING (yield) allowed
// other fiber calls set_ready(): - only if this fiber was joinig other fiber
// - if this fiber was not interrupted then this fiber
// should in WAITING
// - if this fiber was interrupted the this fiber might
// be in READY, RUNNING or already in
// TERMINATED
for (;;)
{
int expected = WAITING;
bool result = state_.compare_exchange_strong( expected, READY);
if ( result || TERMINATED == expected || READY == expected) return;
expected = RUNNING;
result = state_.compare_exchange_strong( expected, READY);
if ( result || TERMINATED == expected || READY == expected) return;
}
#endif
state_t previous = READY;
std::swap( state_, previous);
BOOST_ASSERT( WAITING == previous || RUNNING == previous || READY == previous);
@@ -264,27 +290,11 @@ public:
void set_waiting() BOOST_NOEXCEPT
{
// other thread could have called set_ready() before
// case: - this fiber has joined another fiber running in another thread,
// - other fiber terminated and releases its joining fibers
// - this fiber was interrupted and therefore resumed and
// throws fiber_interrupted
// - fiber_interrupted was catched and swallowed
// - other fiber calls set_ready() on this fiber happend before
// this fiber calls set_waiting()
// - this fiber might wait on some sync. primitive calling
// set_waiting()
state_t previous = WAITING;
std::swap( state_, previous);
BOOST_ASSERT( RUNNING == previous);
//BOOST_ASSERT( RUNNING == previous || READY == previous);
}
bool has_exception() const BOOST_NOEXCEPT
{ return except_; }
void rethrow() const;
void * get_fss_data( void const* vp) const;
void set_fss_data(
@@ -292,6 +302,11 @@ public:
fss_cleanup_function::ptr_t const& cleanup_fn,
void * data,
bool cleanup_existing);
bool has_exception() const BOOST_NOEXCEPT
{ return except_; }
void rethrow() const;
};
}}}

View File

@@ -19,7 +19,6 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/fiber_base.hpp>
#include <boost/fiber/detail/flags.hpp>
#include <boost/fiber/detail/stack_tuple.hpp>
#include <boost/fiber/flags.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
@@ -35,19 +34,8 @@ namespace boost {
namespace fibers {
namespace detail {
template< typename Fiber >
void trampoline( intptr_t vp)
{
BOOST_ASSERT( vp);
Fiber * f( reinterpret_cast< Fiber * >( vp) );
BOOST_ASSERT( f->is_running() );
f->exec();
}
template< typename Fn, typename StackAllocator, typename Allocator >
class fiber_object : private stack_tuple< StackAllocator >,
public fiber_base
class fiber_object : public fiber_base
{
public:
typedef typename Allocator::template rebind<
@@ -57,7 +45,6 @@ public:
>::other allocator_t;
private:
typedef stack_tuple< StackAllocator > pbase_type;
typedef fiber_base base_type;
Fn fn_;
@@ -72,303 +59,38 @@ private:
fiber_object( fiber_object &);
fiber_object & operator=( fiber_object const&);
void enter_()
{
set_running();
caller_.jump(
callee_,
reinterpret_cast< intptr_t >( this),
preserve_fpu() );
BOOST_ASSERT( ! except_);
}
protected:
void unwind_stack() BOOST_NOEXCEPT
{
flags_ |= flag_unwind_stack;
set_running();
caller_.jump(
callee_,
0,
preserve_fpu() );
flags_ &= ~flag_unwind_stack;
BOOST_ASSERT( is_terminated() );
}
public:
#ifndef BOOST_NO_RVALUE_REFERENCES
fiber_object( Fn && fn, attributes const& attr,
StackAllocator const& stack_alloc,
allocator_t const& alloc) :
pbase_type( stack_alloc, attr.size),
base_type(
trampoline< fiber_object >,
& this->stack_ctx,
fpu_preserved == attr.preserve_fpu),
base_type( attr, stack_alloc, alloc),
fn_( forward< Fn >( fn) ),
alloc_( alloc)
{ enter_(); }
{}
#else
fiber_object( Fn fn, attributes const& attr,
StackAllocator const& stack_alloc,
allocator_t const& alloc) :
pbase_type( stack_alloc, attr.size),
base_type(
trampoline< fiber_object >,
& this->stack_ctx,
fpu_preserved == attr.preserve_fpu),
base_type( attr, stack_alloc, alloc),
fn_( fn),
alloc_( alloc)
{ enter_(); }
{}
fiber_object( BOOST_RV_REF( Fn) fn, attributes const& attr,
StackAllocator const& stack_alloc,
allocator_t const& alloc) :
pbase_type( stack_alloc, attr.size),
base_type(
trampoline< fiber_object >,
& this->stack_ctx,
fpu_preserved == attr.preserve_fpu),
base_type( attr, stack_alloc, alloc),
fn_( fn),
alloc_( alloc)
{ enter_(); }
{}
#endif
~fiber_object()
{
if ( ! is_terminated() )
unwind_stack();
}
void exec()
{
BOOST_ASSERT( ! is_terminated() );
try
{
set_ready();
suspend();
BOOST_ASSERT( is_running() );
fn_();
BOOST_ASSERT( is_running() );
}
catch ( forced_unwind const&)
{}
catch (...)
{ except_ = current_exception(); }
set_terminated();
release();
callee_.jump(
caller_,
0,
preserve_fpu() );
BOOST_ASSERT_MSG( false, "fiber already terminated");
}
void deallocate_object()
{ destroy_( alloc_, this); }
};
template< typename Fn, typename StackAllocator, typename Allocator >
class fiber_object< reference_wrapper< Fn >, StackAllocator, Allocator > :
private stack_tuple< StackAllocator >,
public fiber_base
{
public:
typedef typename Allocator::template rebind<
fiber_object<
Fn, StackAllocator, Allocator
>
>::other allocator_t;
private:
typedef stack_tuple< StackAllocator > pbase_type;
typedef fiber_base base_type;
Fn fn_;
allocator_t alloc_;
static void destroy_( allocator_t & alloc, fiber_object * p)
{
alloc.destroy( p);
alloc.deallocate( p, 1);
}
fiber_object( fiber_object &);
fiber_object & operator=( fiber_object const&);
void enter_()
{
set_running();
caller_.jump(
callee_,
reinterpret_cast< intptr_t >( this),
preserve_fpu() );
BOOST_ASSERT( ! except_);
}
protected:
void unwind_stack() BOOST_NOEXCEPT
{
flags_ |= flag_unwind_stack;
set_running();
caller_.jump(
callee_,
0,
preserve_fpu() );
flags_ &= ~flag_unwind_stack;
BOOST_ASSERT( is_terminated() );
}
public:
fiber_object( reference_wrapper< Fn > fn, attributes const& attr,
StackAllocator const& stack_alloc,
allocator_t const& alloc) :
pbase_type( stack_alloc, attr.size),
base_type(
trampoline< fiber_object >,
& this->stack_ctx,
fpu_preserved == attr.preserve_fpu),
fn_( fn),
alloc_( alloc)
{ enter_(); }
~fiber_object()
{
if ( ! is_terminated() )
unwind_stack();
}
void exec()
{
BOOST_ASSERT( ! is_terminated() );
try
{
set_ready();
suspend();
BOOST_ASSERT( is_running() );
fn_();
BOOST_ASSERT( is_running() );
}
catch ( forced_unwind const&)
{}
catch (...)
{ except_ = current_exception(); }
set_terminated();
release();
callee_.jump(
caller_,
0,
preserve_fpu() );
BOOST_ASSERT_MSG( false, "fiber already terminated");
}
void deallocate_object()
{ destroy_( alloc_, this); }
};
template< typename Fn, typename StackAllocator, typename Allocator >
class fiber_object< const reference_wrapper< Fn >, StackAllocator, Allocator > :
private stack_tuple< StackAllocator >,
public fiber_base
{
public:
typedef typename Allocator::template rebind<
fiber_object<
Fn, StackAllocator, Allocator
>
>::other allocator_t;
private:
typedef stack_tuple< StackAllocator > pbase_type;
typedef fiber_base base_type;
Fn fn_;
allocator_t alloc_;
fiber_object( fiber_object &);
fiber_object & operator=( fiber_object const&);
static void destroy_( allocator_t & alloc, fiber_object * p)
{
alloc.destroy( p);
alloc.deallocate( p, 1);
}
void enter_()
{
set_running();
caller_.jump(
callee_,
reinterpret_cast< intptr_t >( this),
preserve_fpu() );
BOOST_ASSERT( ! except_);
}
protected:
void unwind_stack() BOOST_NOEXCEPT
{
flags_ |= flag_unwind_stack;
set_running();
caller_.jump(
callee_,
0,
preserve_fpu() );
flags_ &= ~flag_unwind_stack;
BOOST_ASSERT( is_terminated() );
}
public:
fiber_object( const reference_wrapper< Fn > fn, attributes const& attr,
StackAllocator const& stack_alloc,
allocator_t const& alloc) :
pbase_type( stack_alloc, attr.size),
base_type(
trampoline< fiber_object >,
& this->stack_ctx,
fpu_preserved == attr.preserve_fpu),
fn_( forward< Fn >( fn) ),
alloc_( alloc)
{ enter_(); }
~fiber_object()
{
if ( ! is_terminated() )
unwind_stack();
}
void exec()
{
BOOST_ASSERT( ! is_terminated() );
try
{
set_ready();
suspend();
fn_();
BOOST_ASSERT( is_running() );
fn_();
BOOST_ASSERT( is_running() );
}
catch ( forced_unwind const&)
{}
catch (...)
{ except_ = current_exception(); }
set_terminated();
release();
callee_.jump(
caller_,
0,
preserve_fpu() );
BOOST_ASSERT_MSG( false, "fiber already terminated");
}
void deallocate_object()
{ destroy_( alloc_, this); }
void run()
{ fn_(); }
};
}}}

View File

@@ -13,6 +13,7 @@
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/coroutine/stack_allocator.hpp>
#include <boost/move/move.hpp>
#include <boost/type_traits/decay.hpp>
#include <boost/type_traits/is_convertible.hpp>
@@ -23,7 +24,6 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/fiber_base.hpp>
#include <boost/fiber/detail/fiber_object.hpp>
#include <boost/fiber/stack_allocator.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@@ -36,6 +36,9 @@
namespace boost {
namespace fibers {
namespace coro = boost::coroutines;
namespace detail {
class scheduler;
@@ -77,7 +80,7 @@ public:
typedef void ( * fiber_fn)();
explicit fiber( fiber_fn fn, attributes const& attr = attributes(),
stack_allocator const& stack_alloc = stack_allocator(),
coro::stack_allocator const& stack_alloc = coro::stack_allocator(),
std::allocator< fiber > const& alloc =
std::allocator< fiber >(),
disable_if<
@@ -87,7 +90,7 @@ public:
impl_()
{
typedef detail::fiber_object<
fiber_fn, stack_allocator, std::allocator< fiber >
fiber_fn, coro::stack_allocator, std::allocator< fiber >
> object_t;
object_t::allocator_t a( alloc);
impl_ = ptr_t(
@@ -139,7 +142,7 @@ public:
#endif
template< typename Fn >
explicit fiber( BOOST_RV_REF( Fn) fn, attributes const& attr = attributes(),
stack_allocator const& stack_alloc = stack_allocator(),
coro::stack_allocator const& stack_alloc = coro::stack_allocator(),
std::allocator< fiber > const& alloc =
std::allocator< fiber >(),
typename disable_if<
@@ -149,7 +152,7 @@ public:
impl_()
{
typedef detail::fiber_object<
Fn, stack_allocator, std::allocator< fiber >
Fn, coro::stack_allocator, std::allocator< fiber >
> object_t;
typename object_t::allocator_t a( alloc);
impl_ = ptr_t(
@@ -201,7 +204,7 @@ public:
#else
template< typename Fn >
explicit fiber( Fn fn, attributes const& attr = attributes(),
stack_allocator const& stack_alloc = stack_allocator(),
coro::stack_allocator const& stack_alloc = coro::stack_allocator(),
std::allocator< fiber > const& alloc =
std::allocator< fiber >(),
typename disable_if<
@@ -211,7 +214,7 @@ public:
impl_()
{
typedef detail::fiber_object<
Fn, stack_allocator, std::allocator< fiber >
Fn, coro::stack_allocator, std::allocator< fiber >
> object_t;
typename object_t::allocator_t a( alloc);
impl_ = ptr_t(
@@ -263,7 +266,7 @@ public:
template< typename Fn >
explicit fiber( BOOST_RV_REF( Fn) fn, attributes const& attr = attributes(),
stack_allocator const& stack_alloc = stack_allocator(),
coro::stack_allocator const& stack_alloc = coro::stack_allocator(),
std::allocator< fiber > const& alloc =
std::allocator< fiber >(),
typename disable_if<
@@ -273,7 +276,7 @@ public:
impl_()
{
typedef detail::fiber_object<
Fn, stack_allocator, std::allocator< fiber >
Fn, coro::stack_allocator, std::allocator< fiber >
> object_t;
typename object_t::allocator_t a( alloc);
impl_ = ptr_t(

View File

@@ -20,19 +20,6 @@ namespace boost {
namespace fibers {
namespace detail {
fiber_base::fiber_base( fiber_context::ctx_fn fn,
stack_context * stack_ctx,
bool preserve_fpu) :
fss_data_(),
state_( READY),
flags_( 0),
priority_( 0),
caller_(),
callee_( fn, stack_ctx),
except_(),
waiting_()
{ if ( preserve_fpu) flags_ |= flag_preserve_fpu; }
fiber_base::~fiber_base()
{
BOOST_ASSERT( is_terminated() );
@@ -42,21 +29,22 @@ fiber_base::~fiber_base()
void
fiber_base::resume()
{
BOOST_ASSERT( is_running() );
caller_.jump( callee_, 0, preserve_fpu() );
BOOST_ASSERT( caller_);
BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm
caller_();
if ( has_exception() ) rethrow();
}
void
fiber_base::suspend()
{
callee_.jump( caller_, 0, preserve_fpu() );
BOOST_ASSERT( callee_);
BOOST_ASSERT( * callee_);
BOOST_ASSERT( is_running() );
( * callee_)();
if ( unwind_requested() ) throw forced_unwind();
BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm
}
void
@@ -87,14 +75,6 @@ fiber_base::join( ptr_t const& p)
return true;
}
void
fiber_base::rethrow() const
{
BOOST_ASSERT( has_exception() );
rethrow_exception( except_);
}
void *
fiber_base::get_fss_data( void const* vp) const
{
@@ -133,6 +113,14 @@ fiber_base::set_fss_data(
fss_data( data, cleanup_fn) ) );
}
void
fiber_base::rethrow() const
{
BOOST_ASSERT( has_exception() );
rethrow_exception( except_);
}
}}}
#ifdef BOOST_HAS_ABI_HEADERS