From 11eb0c6082b7db00a88dd3a2a73191cb9b477bde Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Wed, 19 Jun 2013 18:03:48 +0200 Subject: [PATCH] add boost.asio spawn() for fibers --- examples/asio/echo_server2.cpp | 23 +- include/boost/fiber/all.hpp | 1 + include/boost/fiber/asio/detail/spawn.hpp | 351 ++++++++++++++++++++++ include/boost/fiber/asio/spawn.hpp | 237 +++++++++++++++ include/boost/fiber/asio/use_future.hpp | 1 + include/boost/fiber/detail/scheduler.hpp | 5 + 6 files changed, 609 insertions(+), 9 deletions(-) create mode 100644 include/boost/fiber/asio/detail/spawn.hpp create mode 100644 include/boost/fiber/asio/spawn.hpp diff --git a/examples/asio/echo_server2.cpp b/examples/asio/echo_server2.cpp index ecf7660f..867fa5c5 100644 --- a/examples/asio/echo_server2.cpp +++ b/examples/asio/echo_server2.cpp @@ -38,13 +38,16 @@ public: void go() { - boost::fibers::fiber( - boost::bind( & session::timeout, shared_from_this() ) ).detach(); - echo(); + boost::fibers::asio::spawn(strand_, + boost::bind(&session::echo, + shared_from_this(), _1)); + boost::fibers::asio::spawn(strand_, + boost::bind(&session::timeout, + shared_from_this(), _1)); } private: - void echo() + void echo( boost::fibers::asio::yield_context yield) { try { @@ -69,7 +72,7 @@ private: } } - void timeout() + void timeout( boost::fibers::asio::yield_context yield) { while ( socket_.is_open() ) { @@ -87,7 +90,8 @@ private: boost::asio::deadline_timer timer_; }; -void do_accept( boost::asio::io_service & io_service, unsigned short port) +void do_accept(boost::asio::io_service& io_service, + unsigned short port, boost::fibers::asio::yield_context yield) { tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) ); @@ -119,10 +123,11 @@ int main( int argc, char* argv[]) boost::fibers::scheduling_algorithm( & ds); using namespace std; // For atoi. - boost::fibers::fiber fiber( - boost::bind( do_accept, boost::ref( io_service), atoi( argv[1]) ) ); + boost::fibers::asio::spawn( io_service, + boost::bind( do_accept, + boost::ref( io_service), atoi( argv[1]), _1) ); + io_service.run(); - fiber.join(); } catch ( std::exception const& e) { std::cerr << "Exception: " << e.what() << "\n"; } diff --git a/include/boost/fiber/all.hpp b/include/boost/fiber/all.hpp index aec32bfa..54fe0ad4 100644 --- a/include/boost/fiber/all.hpp +++ b/include/boost/fiber/all.hpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include diff --git a/include/boost/fiber/asio/detail/spawn.hpp b/include/boost/fiber/asio/detail/spawn.hpp new file mode 100644 index 00000000..4c11c690 --- /dev/null +++ b/include/boost/fiber/asio/detail/spawn.hpp @@ -0,0 +1,351 @@ +// +// detail/spawn.hpp +// ~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_FIBERS_ASIO_DETAIL_SPAWN_HPP +#define BOOST_FIBERS_ASIO_DETAIL_SPAWN_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +namespace detail { + +template< typename Handler, typename T > +class fiber_handler +{ +public: + fiber_handler( basic_yield_context< Handler > ctx) : + fiber_( ctx.fiber_), + handler_( ctx.handler_), + ec_( ctx.ec_), + value_( 0) + {} + + void operator()( T value) + { + * ec_ = boost::system::error_code(); + * value_ = value; + fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( fiber_); + } + + void operator()( boost::system::error_code ec, T value) + { + * ec_ = ec; + * value_ = value; + fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( fiber_); + } + +//private: + boost::fibers::detail::fiber_base::ptr_t fiber_; + Handler & handler_; + boost::system::error_code * ec_; + T * value_; +}; + +template< typename Handler > +class fiber_handler< Handler, void > + +{ +public: + fiber_handler( basic_yield_context< Handler > ctx) : + fiber_( ctx.fiber_), + handler_( ctx.handler_), + ec_( ctx.ec_) + {} + + void operator()() + { + * ec_ = boost::system::error_code(); + fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( fiber_); + } + + void operator()( boost::system::error_code ec) + { + * ec_ = ec; + fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( fiber_); + } + +//private: + boost::fibers::detail::fiber_base::ptr_t fiber_; + Handler & handler_; + boost::system::error_code * ec_; +}; + +template< typename Handler, typename T > +void* asio_handler_allocate( std::size_t size, + fiber_handler< Handler, T > * this_handler) +{ + return boost_asio_handler_alloc_helpers::allocate( + size, this_handler->handler_); +} + +template< typename Handler, typename T > +void asio_handler_deallocate( void* pointer, std::size_t size, + fiber_handler< Handler, T > * this_handler) +{ + boost_asio_handler_alloc_helpers::deallocate( + pointer, size, this_handler->handler_); +} + +template< typename Handler, typename T > +bool asio_handler_is_continuation( fiber_handler *) +{ return true; } + +template< typename Function, typename Handler, typename T > +void asio_handler_invoke( Function & function, + fiber_handler< Handler, T > * this_handler) +{ + boost_asio_handler_invoke_helpers::invoke( + function, this_handler->handler_); +} + +template< typename Function, typename Handler, typename T > +void asio_handler_invoke( Function const& function, + fiber_handler< Handler, T > * this_handler) +{ + boost_asio_handler_invoke_helpers::invoke( + function, this_handler->handler_); +} + +} // namespace detail +} // namespace asio +} // namespace fibers + +namespace asio { + +#if !defined(GENERATING_DOCUMENTATION) + +template< typename Handler, typename ReturnType > +struct handler_type< + boost::fibers::asio::basic_yield_context< Handler >, + ReturnType() +> +{ typedef boost::fibers::asio::detail::fiber_handler< Handler, void > type; }; + +template< typename Handler, typename ReturnType, typename Arg1 > +struct handler_type< + boost::fibers::asio::basic_yield_context< Handler >, + ReturnType( Arg1) +> +{ typedef boost::fibers::asio::detail::fiber_handler< Handler, Arg1 > type; }; + +template< typename Handler, typename ReturnType > +struct handler_type< + boost::fibers::asio::basic_yield_context< Handler >, + ReturnType( boost::system::error_code) +> +{ typedef boost::fibers::asio::detail::fiber_handler type; }; + +template< typename Handler, typename ReturnType, typename Arg2 > +struct handler_type< + boost::fibers::asio::basic_yield_context< Handler >, + ReturnType( boost::system::error_code, Arg2) +> +{ typedef boost::fibers::asio::detail::fiber_handler< Handler, Arg2 > type; }; + +template< typename Handler, typename T > +class async_result< boost::fibers::asio::detail::fiber_handler< Handler, T > > +{ +public: + typedef T type; + + explicit async_result( boost::fibers::asio::detail::fiber_handler< Handler, T > & h) : + out_ec_( 0), ec_(), value_() + { + out_ec_ = h.ec_; + if ( ! out_ec_) h.ec_ = & ec_; + h.value_ = & value_; + } + + type get() + { + boost::fibers::detail::scheduler::instance().active()->set_waiting(); + boost::fibers::detail::scheduler::instance().active()->suspend(); + if ( ! out_ec_ && ec_) throw boost::system::system_error( ec_); + return value_; + } + +private: + boost::system::error_code * out_ec_; + boost::system::error_code ec_; + type value_; +}; + +template< typename Handler > +class async_result< boost::fibers::asio::detail::fiber_handler< Handler, void > > +{ +public: + typedef void type; + + explicit async_result( boost::fibers::asio::detail::fiber_handler< Handler, void > & h) : + out_ec_( 0), ec_() + { + out_ec_ = h.ec_; + if (!out_ec_) h.ec_ = &ec_; + } + + void get() + { + boost::fibers::detail::scheduler::instance().active()->set_waiting(); + boost::fibers::detail::scheduler::instance().active()->suspend(); + if ( ! out_ec_ && ec_) throw boost::system::system_error( ec_); + } + +private: + boost::system::error_code * out_ec_; + boost::system::error_code ec_; +}; + +} // namespace asio + +namespace fibers { +namespace asio { +namespace detail { + +template< typename Handler, typename Function > +struct spawn_data : private noncopyable +{ + spawn_data( BOOST_ASIO_MOVE_ARG( Handler) handler, + bool call_handler, BOOST_ASIO_MOVE_ARG( Function) function) : + handler_( BOOST_ASIO_MOVE_CAST( Handler)( handler) ), + call_handler_( call_handler), + function_( BOOST_ASIO_MOVE_CAST( Function)( function) ) + {} + + boost::fibers::detail::fiber_base::ptr_t fiber_; + Handler handler_; + bool call_handler_; + Function function_; +}; + +template< typename Handler, typename Function > +struct fiber_entry_point +{ + void operator()() + { + shared_ptr< spawn_data< Handler, Function > > data( data_); + boost::fibers::detail::scheduler::instance().active()->set_waiting(); + boost::fibers::detail::scheduler::instance().active()->suspend(); + const basic_yield_context< Handler > yield( + data->fiber_, data->handler_); + ( data->function_)( yield); + if ( data->call_handler_) + ( data->handler_)(); + } + + shared_ptr< spawn_data< Handler, Function > > data_; +}; + +template< typename Handler, typename Function > +struct spawn_helper +{ + void operator()() + { + fiber_entry_point< Handler, Function > entry_point = { data_ }; + boost::fibers::fiber fiber( entry_point, attributes_); + data_->fiber_ = boost::fibers::detail::scheduler::extract( fiber); + fiber.detach(); + data_->fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( data_->fiber_); + } + + shared_ptr< spawn_data< Handler, Function > > data_; + boost::fibers::attributes attributes_; +}; + +inline void default_spawn_handler() {} + +} // namespace detail + +template< typename Handler, typename Function > +void spawn( BOOST_ASIO_MOVE_ARG( Handler) handler, + BOOST_ASIO_MOVE_ARG( Function) function, + boost::fibers::attributes const& attributes) +{ + detail::spawn_helper< Handler, Function > helper; + helper.data_.reset( + new detail::spawn_data< Handler, Function >( + BOOST_ASIO_MOVE_CAST( Handler)( handler), true, + BOOST_ASIO_MOVE_CAST( Function)( function) ) ); + helper.attributes_ = attributes; + boost_asio_handler_invoke_helpers::invoke( + helper, helper.data_->handler_); +} + +template< typename Handler, typename Function > +void spawn( basic_yield_context< Handler > ctx, + BOOST_ASIO_MOVE_ARG( Function) function, + boost::fibers::attributes const& attributes) +{ + Handler handler( ctx.handler_); // Explicit copy that might be moved from. + detail::spawn_helper< Handler, Function > helper; + helper.data_.reset( + new detail::spawn_data< Handler, Function >( + BOOST_ASIO_MOVE_CAST( Handler)( handler), false, + BOOST_ASIO_MOVE_CAST( Function)( function) ) ); + helper.attributes_ = attributes; + boost_asio_handler_invoke_helpers::invoke( + helper, helper.data_->handler_); +} + +template< typename Function > +void spawn( boost::asio::io_service::strand strand, + BOOST_ASIO_MOVE_ARG( Function) function, + boost::fibers::attributes const& attributes) +{ + boost::fibers::asio::spawn( + strand.wrap( & detail::default_spawn_handler), + BOOST_ASIO_MOVE_CAST( Function)( function), + attributes); +} + +template< typename Function > +void spawn( boost::asio::io_service & io_service, + BOOST_ASIO_MOVE_ARG( Function) function, + boost::fibers::attributes const& attributes) +{ + boost::fibers::asio::spawn( + boost::asio::io_service::strand( io_service), + BOOST_ASIO_MOVE_CAST( Function)( function), + attributes); +} + +#endif // !defined(GENERATING_DOCUMENTATION) + +} // namespace asio +} // namespace fibers +} // namespace boost + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_DETAIL_SPAWN_HPP diff --git a/include/boost/fiber/asio/spawn.hpp b/include/boost/fiber/asio/spawn.hpp new file mode 100644 index 00000000..cf1340a8 --- /dev/null +++ b/include/boost/fiber/asio/spawn.hpp @@ -0,0 +1,237 @@ +// +// spawn.hpp +// ~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_FIBERS_ASIO_SPAWN_HPP +#define BOOST_FIBERS_ASIO_SPAWN_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +/// Context object the represents the currently executing fiber. +/** + * The basic_yield_context class is used to represent the currently executing + * fiber. A basic_yield_context may be passed as a handler to an * asynchronous + * operation. For example: + * + * @code template< typename Handler > + * void my_fiber( basic_yield_context< Handler > yield) + * { + * ... + * std::size_t n = my_socket.async_read_some( buffer, yield); + * ... + * } @endcode + * + * The initiating function (async_read_some in the above example) suspends the + * current fiber. The fiber is resumed when the asynchronous operation + * completes, and the result of the operation is returned. + */ +template< typename Handler > +class basic_yield_context +{ +public: + /// Construct a yield context to represent the specified fiber. + /** + * Most applications do not need to use this constructor. Instead, the + * spawn() function passes a yield context as an argument to the fiber + * function. + */ + basic_yield_context( + boost::fibers::detail::fiber_base::ptr_t const& fib, + Handler& handler) : + fiber_( fib), + handler_( handler), + ec_( 0) + {} + + /// Return a yield context that sets the specified error_code. + /** + * By default, when a yield context is used with an asynchronous operation, a + * non-success error_code is converted to system_error and thrown. This + * operator may be used to specify an error_code object that should instead be + * set with the asynchronous operation's result. For example: + * + * @code template< typename Handler > + * void my_fiber( basic_yield_context< Handler > yield) + * { + * ... + * std::size_t n = my_socket.async_read_some( buffer, yield[ec]); + * if ( ec) + * { + * // An error occurred. + * } + * ... + * } @endcode + */ + basic_yield_context operator[]( boost::system::error_code & ec) + { + basic_yield_context tmp( * this); + tmp.ec_ = & ec; + return tmp; + } + +#if defined(GENERATING_DOCUMENTATION) +private: +#endif // defined(GENERATING_DOCUMENTATION) + boost::fibers::detail::fiber_base::ptr_t fiber_; + Handler & handler_; + boost::system::error_code * ec_; +}; + +#if defined(GENERATING_DOCUMENTATION) +/// Context object the represents the currently executing fiber. +typedef basic_yield_context< unspecified > yield_context; +#else // defined(GENERATING_DOCUMENTATION) +typedef basic_yield_context< + boost::asio::detail::wrapped_handler< + boost::asio::io_service::strand, void(*)(), + boost::asio::detail::is_continuation_if_running> > yield_context; +#endif // defined(GENERATING_DOCUMENTATION) + +/** + * @defgroup spawn boost::fibers::asio::spawn + * + * @brief Start a new stackful fiber. + * + * The spawn() function is a high-level wrapper over the Boost.Fiber + * library. This function enables programs to implement asynchronous logic in a + * synchronous manner, as illustrated by the following example: + * + * @code boost::asio::spawn( my_strand, do_echo); + * + * // ... + * + * void do_echo( boost::fibers::asio::yield_context yield) + * { + * try + * { + * char data[128]; + * for (;;) + * { + * std::size_t length = + * my_socket.async_read_some( + * boost::asio::buffer( data), yield); + * + * boost::asio::async_write( my_socket, + * boost::asio::buffer( data, length), yield); + * } + * } + * catch ( std::exception const& e) + * { + * // ... + * } + * } @endcode + */ +/*@{*/ + +/// Start a new fiber, calling the specified handler when it completes. +/** + * This function is used to launch a new fiber. + * + * @param handler A handler to be called when the fiber exits. More + * importantly, the handler provides an execution context (via the the handler + * invocation hook) for the fiber. The handler must have the signature: + * @code void handler(); @endcode + * + * @param function The fiber function. The function must have the signature: + * @code void function( basic_yield_context< Handler > yield); @endcode + * + * @param attributes Boost.Fiber attributes used to customise the fiber. + */ +template< typename Handler, typename Function > +void spawn( BOOST_ASIO_MOVE_ARG( Handler) handler, + BOOST_ASIO_MOVE_ARG( Function) function, + boost::fibers::attributes const& attributes + = boost::fibers::attributes() ); + +/// Start a new fiber, inheriting the execution context of another. +/** + * This function is used to launch a new fiber. + * + * @param ctx Identifies the current fiber as a parent of the new + * fiber. This specifies that the new fiber should inherit the + * execution context of the parent. For example, if the parent fiber is + * executing in a particular strand, then the new fiber will execute in the + * same strand. + * + * @param function The fiber function. The function must have the signature: + * @code void function( basic_yield_context< Handler > yield); @endcode + * + * @param attributes Boost.Fiber attributes used to customise the fiber. + */ +template< typename Handler, typename Function > +void spawn( basic_yield_context< Handler > ctx, + BOOST_ASIO_MOVE_ARG( Function) function, + boost::fibers::attributes const& attributes + = boost::fibers::attributes() ); + +/// Start a new fiber that executes in the contex of a strand. +/** + * This function is used to launch a new fiber. + * + * @param strand Identifies a strand. By starting multiple fibers on the + * same strand, the implementation ensures that none of those fibers can + * execute simultaneously. + * + * @param function The fiber function. The function must have the signature: + * @code void function( yield_context yield); @endcode + * + * @param attributes Boost.Fiber attributes used to customise the fiber. + */ +template< typename Function > +void spawn( boost::asio::io_service::strand strand, + BOOST_ASIO_MOVE_ARG( Function) function, + boost::fibers::attributes const& attributes + = boost::fibers::attributes() ); + +/// Start a new fiber that executes on a given io_service. +/** + * This function is used to launch a new fiber. + * + * @param io_service Identifies the io_service that will run the fiber. The + * new fiber is implicitly given its own strand within this io_service. + * + * @param function The fiber function. The function must have the signature: + * @code void function( yield_context yield); @endcode + * + * @param attributes Boost.Fiber attributes used to customise the fiber. + */ +template< typename Function > +void spawn( boost::asio::io_service & io_service, + BOOST_ASIO_MOVE_ARG( Function) function, + boost::fibers::attributes const& attributes + = boost::fibers::attributes() ); + +} // namespace asio +} // namespace fibers +} // namespace boost + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#include + +#endif // BOOST_FIBERS_ASIO_SPAWN_HPP diff --git a/include/boost/fiber/asio/use_future.hpp b/include/boost/fiber/asio/use_future.hpp index d3e3c9ec..8babf4fc 100644 --- a/include/boost/fiber/asio/use_future.hpp +++ b/include/boost/fiber/asio/use_future.hpp @@ -15,6 +15,7 @@ #include +#include #include #ifdef BOOST_HAS_ABI_HEADERS diff --git a/include/boost/fiber/detail/scheduler.hpp b/include/boost/fiber/detail/scheduler.hpp index 28b4aab7..b3adcf75 100644 --- a/include/boost/fiber/detail/scheduler.hpp +++ b/include/boost/fiber/detail/scheduler.hpp @@ -92,6 +92,11 @@ private: #endif public: + template< typename F > + static fiber_base::ptr_t extract( F const& f) { + return f.impl_; + } + static algorithm & instance() BOOST_NOEXCEPT; static algorithm * replace( algorithm *) BOOST_NOEXCEPT;