From b0324573979c776bcbf20fdb1d4427424c87ebda Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Sat, 10 Oct 2015 21:49:46 +0200 Subject: [PATCH] asio examples added --- examples/Jamfile.v2 | 7 +- examples/asio/detail/promise_handler.hpp | 172 -------------- examples/asio/detail/spawn.hpp | 188 +++++++-------- examples/asio/detail/use_future.hpp | 144 +++++++++--- examples/asio/detail/yield.hpp | 190 ++++++++++------ examples/asio/echo_client.cpp | 68 +++--- ...{echo_server.cpp => echo_server_spawn.cpp} | 72 +++--- examples/asio/echo_server_yield.cpp | 91 ++++++++ examples/asio/io_service.cpp | 173 -------------- examples/asio/loop.hpp | 43 ---- examples/asio/promise_completion_token.hpp | 88 ------- examples/asio/ps/publisher.cpp | 52 +++++ .../asio/{publish_subscribe => ps}/server.cpp | 214 ++++++++---------- examples/asio/ps/subscriber.cpp | 53 +++++ examples/asio/publish_subscribe/publisher.cpp | 62 ----- .../asio/publish_subscribe/subscriber.cpp | 64 ------ .../{asio_scheduler.hpp => round_robin.hpp} | 41 +++- examples/asio/sleep.cpp | 30 ++- examples/asio/spawn.hpp | 160 +------------ examples/asio/use_future.hpp | 42 ++-- examples/asio/yield.hpp | 95 +------- 21 files changed, 765 insertions(+), 1284 deletions(-) delete mode 100644 examples/asio/detail/promise_handler.hpp rename examples/asio/{echo_server.cpp => echo_server_spawn.cpp} (65%) create mode 100644 examples/asio/echo_server_yield.cpp delete mode 100644 examples/asio/io_service.cpp delete mode 100644 examples/asio/loop.hpp delete mode 100644 examples/asio/promise_completion_token.hpp create mode 100644 examples/asio/ps/publisher.cpp rename examples/asio/{publish_subscribe => ps}/server.cpp (78%) create mode 100644 examples/asio/ps/subscriber.cpp delete mode 100644 examples/asio/publish_subscribe/publisher.cpp delete mode 100644 examples/asio/publish_subscribe/subscriber.cpp rename examples/asio/{asio_scheduler.hpp => round_robin.hpp} (65%) diff --git a/examples/Jamfile.v2 b/examples/Jamfile.v2 index 699d89fc..f4c34fe1 100644 --- a/examples/Jamfile.v2 +++ b/examples/Jamfile.v2 @@ -37,7 +37,12 @@ exe priority : priority.cpp ; exe segmented_stack : segmented_stack.cpp ; exe simple : simple.cpp ; exe wait_stuff : wait_stuff.cpp ; +exe work_sharing : work_sharing.cpp ; exe asio/sleep : asio/sleep.cpp ; exe asio/echo_client : asio/echo_client.cpp ; -exe asio/echo_server : asio/echo_server.cpp ; +exe asio/echo_server_spawn : asio/echo_server_spawn.cpp ; +exe asio/echo_server_yield : asio/echo_server_yield.cpp ; +#exe asio/ps/publisher : asio/ps/publisher.cpp ; +#exe asio/ps/server : asio/ps/server.cpp ; +#exe asio/ps/subscriber : asio/ps/subscriber.cpp ; diff --git a/examples/asio/detail/promise_handler.hpp b/examples/asio/detail/promise_handler.hpp deleted file mode 100644 index 4f612178..00000000 --- a/examples/asio/detail/promise_handler.hpp +++ /dev/null @@ -1,172 +0,0 @@ -// -// promise_handler.hpp -// ~~~~~~~~~~~~~~ -// -// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// modified by Oliver Kowalke and Nat Goodspeed -// - -#ifndef BOOST_FIBERS_ASIO_DETAIL_PROMISE_HANDLER_HPP -#define BOOST_FIBERS_ASIO_DETAIL_PROMISE_HANDLER_HPP - -#include - -#include -#include - -#include - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { -namespace asio { -namespace detail { - -// Completion handler to adapt a promise as a completion handler. -//[fibers_asio_promise_handler_base -template< typename T > -class promise_handler_base { -public: - typedef std::shared_ptr< boost::fibers::promise< T > > promise_ptr; - - // Construct from any promise_completion_token subclass special value. - template< typename Allocator > - promise_handler_base( boost::fibers::asio::promise_completion_token< Allocator > const& pct) : - promise_( std::make_shared< boost::fibers::promise< T > >( - std::allocator_arg, pct.get_allocator() ) ) -//<- - , ecp_( pct.ec_) -//-> - {} - - bool should_set_value( boost::system::error_code const& ec) { - if ( ! ec) { - // whew, success - return true; - } - -//<- - // ec indicates error - if ( ecp_) { - // promise_completion_token bound an error_code variable: set it - * ecp_ = ec; - // This is the odd case: although there's an error, user code - // expressly forbid us to call set_exception(). We've set the - // bound error code -- but future::get() will wait forever unless - // we kick the promise SOMEHOW. Tell subclass to call set_value() - // anyway. - return true; - } -//-> - // no bound error_code: cause promise_ to throw an exception - promise_->set_exception( - std::make_exception_ptr( - boost::system::system_error( ec) ) ); - // caller should NOT call set_value() - return false; - } - - promise_ptr get_promise() const { - return promise_; - } - -private: - promise_ptr promise_; -//<- - boost::system::error_code * ecp_; -//-> -}; -//] - -// generic promise_handler for arbitrary value -//[fibers_asio_promise_handler -template< typename T > -class promise_handler : public promise_handler_base< T > { -private: -//<- - using promise_handler_base< T >::should_set_value; - -//-> -public: - // Construct from any promise_completion_token subclass special value. - template< typename Allocator > - promise_handler( boost::fibers::asio::promise_completion_token< Allocator > const& pct) : - promise_handler_base< T >( pct) { - } - -//<- - void operator()( T t) { - get_promise()->set_value( t); - } -//-> - void operator()( boost::system::error_code const& ec, T t) { - if ( should_set_value( ec) ) { - get_promise()->set_value( t); - } - } -//<- - using typename promise_handler_base< T >::promise_ptr; - using promise_handler_base< T >::get_promise; -//-> -}; -//] - -// specialize promise_handler for void -template<> -class promise_handler< void > : public promise_handler_base< void > { -private: - using promise_handler_base< void >::should_set_value; - -public: - // Construct from any promise_completion_token subclass special value. - template< typename Allocator > - promise_handler( boost::fibers::asio::promise_completion_token< Allocator > const& pct) : - promise_handler_base< void >( pct) { - } - - void operator()() { - get_promise()->set_value(); - } - - void operator()( boost::system::error_code const& ec) { - if ( should_set_value( ec) ) { - get_promise()->set_value(); - } - } - - using promise_handler_base< void >::promise_ptr; - using promise_handler_base< void >::get_promise; -}; - -}}} - -namespace asio { -namespace detail { - -// Specialize asio_handler_invoke hook to ensure that any exceptions thrown -// from the handler are propagated back to the caller via the future. -template< typename Function, typename T > -void asio_handler_invoke( Function f, fibers::asio::detail::promise_handler< T > * h) { - typename fibers::asio::detail::promise_handler< T >::promise_ptr - p( h->get_promise() ); - try { - f(); - } catch (...) { - p->set_exception( std::current_exception() ); - } -} - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBERS_ASIO_DETAIL_PROMISE_HANDLER_HPP diff --git a/examples/asio/detail/spawn.hpp b/examples/asio/detail/spawn.hpp index 85c6edc4..18f92718 100644 --- a/examples/asio/detail/spawn.hpp +++ b/examples/asio/detail/spawn.hpp @@ -21,7 +21,8 @@ #include #include -#include +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -30,29 +31,25 @@ namespace boost { namespace fibers { namespace asio { - namespace detail { template< typename Handler, typename T > -class fiber_handler -{ +class fiber_handler { public: fiber_handler( basic_yield_context< Handler > ctx) : ctx_( ctx.ctx_), handler_( ctx.handler_), ec_( ctx.ec_), - value_( 0) - {} + value_( 0) { + } - void operator()( T value) - { + void operator()( T value) { * ec_ = boost::system::error_code(); * value_ = value; boost::fibers::context::active()->set_ready( ctx_); } - void operator()( boost::system::error_code ec, T value) - { + void operator()( boost::system::error_code ec, T value) { * ec_ = ec; * value_ = value; boost::fibers::context::active()->set_ready( ctx_); @@ -66,23 +63,20 @@ public: }; template< typename Handler > -class fiber_handler< Handler, void > -{ +class fiber_handler< Handler, void > { public: fiber_handler( basic_yield_context< Handler > ctx) : ctx_( ctx.ctx_), handler_( ctx.handler_), - ec_( ctx.ec_) - {} + ec_( ctx.ec_) { + } - void operator()() - { + void operator()() { * ec_ = boost::system::error_code(); boost::fibers::context::active()->set_ready( ctx_); } - void operator()( boost::system::error_code ec) - { + void operator()( boost::system::error_code ec) { * ec_ = ec; boost::fibers::context::active()->set_ready( ctx_); } @@ -95,48 +89,41 @@ public: template< typename Handler, typename T > void* asio_handler_allocate( std::size_t size, - fiber_handler< Handler, T > * this_handler) -{ + fiber_handler< Handler, T > * this_handler) { return boost_asio_handler_alloc_helpers::allocate( size, this_handler->handler_); } template< typename Handler, typename T > void asio_handler_deallocate( void* pointer, std::size_t size, - fiber_handler< Handler, T > * this_handler) -{ + fiber_handler< Handler, T > * this_handler) { boost_asio_handler_alloc_helpers::deallocate( pointer, size, this_handler->handler_); } template< typename Handler, typename T > -bool asio_handler_is_continuation( fiber_handler *) -{ return true; } +bool asio_handler_is_continuation( fiber_handler *) { + return true; +} template< typename Function, typename Handler, typename T > void asio_handler_invoke( Function & function, - fiber_handler< Handler, T > * this_handler) -{ + fiber_handler< Handler, T > * this_handler) { boost_asio_handler_invoke_helpers::invoke( function, this_handler->handler_); } template< typename Function, typename Handler, typename T > void asio_handler_invoke( Function const& function, - fiber_handler< Handler, T > * this_handler) -{ + fiber_handler< Handler, T > * this_handler) { boost_asio_handler_invoke_helpers::invoke( function, this_handler->handler_); } -} // namespace detail -} // namespace asio -} // namespace fibers +}}} namespace asio { -#if !defined(GENERATING_DOCUMENTATION) - template< typename Handler, typename ReturnType > struct handler_type< boost::fibers::asio::basic_yield_context< Handler >, @@ -166,23 +153,24 @@ struct handler_type< { typedef boost::fibers::asio::detail::fiber_handler< Handler, Arg2 > type; }; template< typename Handler, typename T > -class async_result< boost::fibers::asio::detail::fiber_handler< Handler, T > > -{ +class async_result< boost::fibers::asio::detail::fiber_handler< Handler, T > > { public: typedef T type; explicit async_result( boost::fibers::asio::detail::fiber_handler< Handler, T > & h) : - out_ec_( 0), ec_(), value_() - { + out_ec_( nullptr), ec_(), value_() { out_ec_ = h.ec_; - if ( ! out_ec_) h.ec_ = & ec_; + if ( ! out_ec_) { + h.ec_ = & ec_; + } h.value_ = & value_; } - type get() - { + type get() { boost::fibers::context::active()->suspend(); - if ( ! out_ec_ && ec_) throw boost::system::system_error( ec_); + if ( ! out_ec_ && ec_) { + throw boost::system::system_error( ec_); + } return value_; } @@ -193,22 +181,23 @@ private: }; template< typename Handler > -class async_result< boost::fibers::asio::detail::fiber_handler< Handler, void > > -{ +class async_result< boost::fibers::asio::detail::fiber_handler< Handler, void > > { public: typedef void type; explicit async_result( boost::fibers::asio::detail::fiber_handler< Handler, void > & h) : - out_ec_( 0), ec_() - { + out_ec_( nullptr), ec_() { out_ec_ = h.ec_; - if (!out_ec_) h.ec_ = &ec_; + if ( ! out_ec_) { + h.ec_ = &ec_; + } } - void get() - { + void get() { boost::fibers::context::active()->suspend(); - if ( ! out_ec_ && ec_) throw boost::system::system_error( ec_); + if ( ! out_ec_ && ec_) { + throw boost::system::system_error( ec_); + } } private: @@ -216,22 +205,21 @@ private: boost::system::error_code ec_; }; -} // namespace asio +} namespace fibers { namespace asio { namespace detail { template< typename Handler, typename Function > -struct spawn_data : private noncopyable -{ +struct spawn_data : private noncopyable { spawn_data( boost::asio::io_service& io_svc, BOOST_ASIO_MOVE_ARG( Handler) handler, bool call_handler, BOOST_ASIO_MOVE_ARG( Function) function) : io_svc_(io_svc), handler_( BOOST_ASIO_MOVE_CAST( Handler)( handler) ), call_handler_( call_handler), - function_( BOOST_ASIO_MOVE_CAST( Function)( function) ) - {} + function_( BOOST_ASIO_MOVE_CAST( Function)( function) ) { + } boost::asio::io_service & io_svc_; boost::fibers::context * ctx_; @@ -241,93 +229,83 @@ struct spawn_data : private noncopyable }; template< typename Handler, typename Function > -struct fiber_entry_point -{ - void operator()() - { - shared_ptr< spawn_data< Handler, Function > > data( data_); - data->ctx_ = boost::fibers::context::active(); - const basic_yield_context< Handler > yield( - data->ctx_, data->handler_); +struct fiber_entry_point { + void operator()() { + shared_ptr< spawn_data< Handler, Function > > data( data_); + data->ctx_ = boost::fibers::context::active(); + const basic_yield_context< Handler > yield( + data->ctx_, data->handler_); - boost::asio::io_service::work w(data->io_svc_); - ( data->function_)( yield); - if ( data->call_handler_) - ( data->handler_)(); - } + boost::asio::io_service::work w( data->io_svc_); + ( data->function_)( yield); + if ( data->call_handler_) { + ( data->handler_)(); + } + } - shared_ptr< spawn_data< Handler, Function > > data_; + shared_ptr< spawn_data< Handler, Function > > data_; }; template< typename Handler, typename Function > -struct spawn_helper -{ - void operator()() - { - fiber_entry_point< Handler, Function > entry_point = { data_ }; - boost::fibers::fiber( entry_point).detach(); - } +struct spawn_helper { + void operator()() { + fiber_entry_point< Handler, Function > entry_point = { data_ }; + boost::fibers::fiber( entry_point).detach(); + boost::this_fiber::yield(); + } - shared_ptr< spawn_data< Handler, Function > > data_; + shared_ptr< spawn_data< Handler, Function > > data_; }; inline void default_spawn_handler() {} -} // namespace detail +} template< typename Handler, typename Function > void spawn( boost::asio::io_service& io_service, - BOOST_ASIO_MOVE_ARG( Handler) handler, - BOOST_ASIO_MOVE_ARG( Function) function) -{ + BOOST_ASIO_MOVE_ARG( Handler) handler, + BOOST_ASIO_MOVE_ARG( Function) function) { detail::spawn_helper< Handler, Function > helper; helper.data_.reset( - new detail::spawn_data< Handler, Function >( - io_service, - BOOST_ASIO_MOVE_CAST( Handler)( handler), true, - BOOST_ASIO_MOVE_CAST( Function)( function) ) ); + new detail::spawn_data< Handler, Function >( + io_service, + BOOST_ASIO_MOVE_CAST( Handler)( handler), true, + BOOST_ASIO_MOVE_CAST( Function)( function) ) ); boost_asio_handler_invoke_helpers::invoke( - helper, helper.data_->handler_); + helper, helper.data_->handler_); } template< typename Handler, typename Function > void spawn( basic_yield_context< Handler > ctx, - BOOST_ASIO_MOVE_ARG( Function) function) -{ + BOOST_ASIO_MOVE_ARG( Function) function) { Handler handler( ctx.handler_); // Explicit copy that might be moved from. detail::spawn_helper< Handler, Function > helper; helper.data_.reset( - new detail::spawn_data< Handler, Function >( - BOOST_ASIO_MOVE_CAST( Handler)( handler), false, - BOOST_ASIO_MOVE_CAST( Function)( function) ) ); + new detail::spawn_data< Handler, Function >( + BOOST_ASIO_MOVE_CAST( Handler)( handler), false, + BOOST_ASIO_MOVE_CAST( Function)( function) ) ); boost_asio_handler_invoke_helpers::invoke( - helper, helper.data_->handler_); + helper, helper.data_->handler_); } template< typename Function > void spawn( boost::asio::io_service::strand strand, - BOOST_ASIO_MOVE_ARG( Function) function) -{ + BOOST_ASIO_MOVE_ARG( Function) function) { boost::fibers::asio::spawn( - strand.get_io_service(), - strand.wrap( & detail::default_spawn_handler), - BOOST_ASIO_MOVE_CAST( Function)( function)); + strand.get_io_service(), + strand.wrap( & detail::default_spawn_handler), + BOOST_ASIO_MOVE_CAST( Function)( function)); } template< typename Function > void spawn( boost::asio::io_service & io_service, - BOOST_ASIO_MOVE_ARG( Function) function) -{ + BOOST_ASIO_MOVE_ARG( Function) function) { boost::fibers::asio::spawn( - boost::asio::io_service::strand( io_service), - BOOST_ASIO_MOVE_CAST( Function)( function)); + boost::asio::io_service::strand( io_service), + BOOST_ASIO_MOVE_CAST( Function)( function)); } -#endif // !defined(GENERATING_DOCUMENTATION) - -} // namespace asio -} // namespace fibers -} // namespace boost +}}} #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_SUFFIX diff --git a/examples/asio/detail/use_future.hpp b/examples/asio/detail/use_future.hpp index 349e2e09..ac374842 100644 --- a/examples/asio/detail/use_future.hpp +++ b/examples/asio/detail/use_future.hpp @@ -7,48 +7,118 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -// modified by Oliver Kowalke and Nat Goodspeed +// modified by Oliver Kowalke // #ifndef BOOST_FIBERS_ASIO_DETAIL_USE_FUTURE_HPP #define BOOST_FIBERS_ASIO_DETAIL_USE_FUTURE_HPP +#include +#include + #include +#include #include +#include +#include -#include - -#include "promise_handler.hpp" +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX #endif namespace boost { -namespace fibers { namespace asio { namespace detail { -// use_future_handler is just an alias for promise_handler -- but we must -// distinguish this case to specialize async_result below. +// Completion handler to adapt a promise as a completion handler. template< typename T > -using use_future_handler = promise_handler< T >; +class promise_handler { +public: + // Construct from use_future special value. + template< typename Allocator > + promise_handler( boost::fibers::asio::use_future_t< Allocator > uf) : + promise_( new boost::fibers::promise< T >( std::allocator_arg, uf.get_allocator() ) ) { + } -}}} + void operator()( T t) { + promise_->set_value( t); + boost::this_fiber::yield(); + } -namespace asio { + void operator()( boost::system::error_code const& ec, T t) { + if ( ec) { + promise_->set_exception( + std::make_exception_ptr( + boost::system::system_error( ec) ) ); + } else { + promise_->set_value( t); + } + boost::this_fiber::yield(); + } -// Handler traits specialisation for use_future_handler. + //private: + boost::shared_ptr< boost::fibers::promise< T > > promise_; +}; + +// Completion handler to adapt a void promise as a completion handler. +template<> +class promise_handler< void > { +public: + // Construct from use_future special value. Used during rebinding. + template< typename Allocator > + promise_handler( boost::fibers::asio::use_future_t< Allocator > uf) : + promise_( new boost::fibers::promise< void > >( std::allocator_arg, uf.get_allocator() ) ) { + } + + void operator()() { + promise_->set_value(); + boost::this_fiber::yield(); + } + + void operator()( boost::system::error_code const& ec) { + if ( ec) { + promise_->set_exception( + std::make_exception_ptr( + boost::system::system_error( ec) ) ); + } else { + promise_->set_value(); + } + boost::this_fiber::yield(); + } + + //private: + boost::shared_ptr< boost::fibers::promise< void > > promise_; +}; + +// Ensure any exceptions thrown from the handler are propagated back to the +// caller via the future. +template< typename Function, typename T > +void asio_handler_invoke( Function f, promise_handler< T > * h) { + boost::shared_ptr< boost::fibers::promise< T > > p( h->promise_); + try { + f(); + } catch ( ... ) { + p->set_exception( std::current_exception() ); + boost::this_fiber::yield(); + } +} + +} + +// Handler traits specialisation for promise_handler. template< typename T > -class async_result< fibers::asio::detail::use_future_handler< T > > { +class async_result< detail::promise_handler< T > > { public: // The initiating function will return a future. typedef boost::fibers::future< T > type; // Constructor creates a new promise for the async operation, and obtains the // corresponding future. - explicit async_result( fibers::asio::detail::use_future_handler< T > & h) { - value_ = h.get_promise()->get_future(); + explicit async_result( detail::promise_handler< T > & h) { + value_ = h.promise_->get_future(); } // Obtain the future to be returned from the initiating function. @@ -60,35 +130,37 @@ private: type value_; }; -// Handler type specialisation for use_future for a nullary callback. +// Handler type specialisation for use_future. template< typename Allocator, typename ReturnType > -struct handler_type< boost::fibers::asio::use_future_t< Allocator >, ReturnType() > { - typedef fibers::asio::detail::use_future_handler< void > type; -}; +struct handler_type< + boost::fibers::asio::use_future_t< Allocator>, + ReturnType() +> +{ typedef detail::promise_handler< void > type; }; -// Handler type specialisation for use_future for a single-argument callback. +// Handler type specialisation for use_future. template< typename Allocator, typename ReturnType, typename Arg1 > -struct handler_type< boost::fibers::asio::use_future_t< Allocator >, ReturnType( Arg1) > { - typedef fibers::asio::detail::use_future_handler< Arg1 > type; -}; +struct handler_type< + boost::fibers::asio::use_future_t< Allocator >, + ReturnType( Arg1) +> +{ typedef detail::promise_handler< Arg1 > type; }; -// Handler type specialisation for use_future for a callback passed only -// boost::system::error_code. Note the use of use_future_handler: an -// error_code indicating error will be conveyed to consumer code via -// set_exception(). +// Handler type specialisation for use_future. template< typename Allocator, typename ReturnType > -struct handler_type< boost::fibers::asio::use_future_t< Allocator >, ReturnType( boost::system::error_code) > { - typedef fibers::asio::detail::use_future_handler< void > type; -}; +struct handler_type< + boost::fibers::asio::use_future_t< Allocator >, + ReturnType( boost::system::error_code) +> +{ typedef detail::promise_handler< void > type; }; -// Handler type specialisation for use_future for a callback passed -// boost::system::error_code plus an arbitrary value. Note the use of a -// single-argument use_future_handler: an error_code indicating error will be -// conveyed to consumer code via set_exception(). +// Handler type specialisation for use_future. template< typename Allocator, typename ReturnType, typename Arg2 > -struct handler_type< boost::fibers::asio::use_future_t< Allocator >, ReturnType( boost::system::error_code, Arg2) > { - typedef fibers::asio::detail::use_future_handler< Arg2 > type; -}; +struct handler_type< + boost::fibers::asio::use_future_t< Allocator >, + ReturnType( boost::system::error_code, Arg2) +> +{ typedef detail::promise_handler< Arg2 > type; }; }} diff --git a/examples/asio/detail/yield.hpp b/examples/asio/detail/yield.hpp index f18b1e1a..5cdf1ae1 100644 --- a/examples/asio/detail/yield.hpp +++ b/examples/asio/detail/yield.hpp @@ -1,24 +1,15 @@ -// -// yield.hpp -// ~~~~~~~~~ -// -// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// modified by Oliver Kowalke and Nat Goodspeed -// - #ifndef BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP #define BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP #include +#include #include +#include +#include +#include -#include - -#include "promise_handler.hpp" +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -29,76 +20,145 @@ namespace fibers { namespace asio { namespace detail { -// yield_handler is just an alias for promise_handler -- but we must -// distinguish this case to specialize async_result below. -//[fibers_asio_yield_handler template< typename T > -using yield_handler = promise_handler< T >; -//] +class yield_handler { +public: + yield_handler( yield_t const& y) : + ctx_( boost::fibers::context::active() ), + ec_( y.ec_), value_( 0) { + } + + void operator()( T t) { + * ec_ = boost::system::error_code(); + * value_ = t; + boost::fibers::context::active()->set_ready( ctx_); + boost::this_fiber::yield(); + } + + void operator()( boost::system::error_code const& ec, T t) { + * ec_ = ec; + * value_ = t; + boost::fibers::context::active()->set_ready( ctx_); + boost::this_fiber::yield(); + } + +//private: + boost::fibers::context * ctx_; + boost::system::error_code * ec_; + T * value_; +}; + +// Completion handler to adapt a void promise as a completion handler. +template<> +class yield_handler< void > { +public: + yield_handler( yield_t const& y) : + ctx_( boost::fibers::context::active() ), + ec_( y.ec_) { + } + + void operator()() { + * ec_ = boost::system::error_code(); + boost::fibers::context::active()->set_ready( ctx_); + boost::this_fiber::yield(); + } + + void operator()( boost::system::error_code const& ec) { + * ec_ = ec; + boost::fibers::context::active()->set_ready( ctx_); + boost::this_fiber::yield(); + } + +//private: + boost::fibers::context * ctx_; + boost::system::error_code * ec_; +}; }}} namespace asio { -// Handler traits specialisation for yield_handler. template< typename T > -class async_result< fibers::asio::detail::yield_handler< T > > { +class async_result< boost::fibers::asio::detail::yield_handler< T > > { public: - // The initiating function will return a value of type T. - typedef T type; - - // Constructor creates a new promise for the async operation, and obtains the - // corresponding future. - explicit async_result( fibers::asio::detail::yield_handler< T > & h) { - future_ = h.get_promise()->get_future(); + typedef T type; + + explicit async_result( boost::fibers::asio::detail::yield_handler< T > & h) { + out_ec_ = h.ec_; + if ( ! out_ec_) { + h.ec_ = & ec_; + } + h.value_ = & value_; } - // This blocks the calling fiber until the handler sets either a value or - // an exception. type get() { - return future_.get(); + boost::fibers::context::active()->suspend(); + if ( ! out_ec_ && ec_) + throw_exception( boost::system::system_error( ec_) ); + return value_; } private: - fibers::future< T > future_; + boost::system::error_code * out_ec_; + boost::system::error_code ec_; + type value_; }; -// Handler type specialisation for yield for a nullary callback. -template< typename Allocator, typename ReturnType > -struct handler_type< boost::fibers::asio::yield_t< Allocator >, - ReturnType() > { - typedef boost::fibers::asio::detail::yield_handler< void > type; +template<> +class async_result< boost::fibers::asio::detail::yield_handler< void > > { +public: + typedef void type; + + explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h) { + out_ec_ = h.ec_; + if ( ! out_ec_) { + h.ec_ = & ec_; + } + } + + void get() { + boost::fibers::context::active()->suspend(); + if ( ! out_ec_ && ec_) { + throw_exception( boost::system::system_error( ec_) ); + } + } + +private: + boost::system::error_code * out_ec_; + boost::system::error_code ec_; }; -// Handler type specialisation for yield for a single-argument callback. -template< typename Allocator, typename ReturnType, typename Arg1 > -struct handler_type< boost::fibers::asio::yield_t< Allocator >, - ReturnType( Arg1) > { - typedef fibers::asio::detail::yield_handler< Arg1 > type; -}; +// Handler type specialisation for use_future. +template< typename ReturnType > +struct handler_type< + boost::fibers::asio::yield_t, + ReturnType() +> +{ typedef boost::fibers::asio::detail::yield_handler< void > type; }; -// Handler type specialisation for yield for a callback passed only -// boost::system::error_code. Note the use of yield_handler: an -// error_code indicating error will be conveyed to consumer code via an -// exception. Normal return implies (! error_code). -template< typename Allocator, typename ReturnType > -struct handler_type< boost::fibers::asio::yield_t< Allocator >, - ReturnType( boost::system::error_code) > { - typedef fibers::asio::detail::yield_handler< void > type; -}; +// Handler type specialisation for use_future. +template< typename ReturnType, typename Arg1 > +struct handler_type< + boost::fibers::asio::yield_t, + ReturnType( Arg1) +> +{ typedef boost::fibers::asio::detail::yield_handler< Arg1 > type; }; -// Handler type specialisation for yield for a callback passed -// boost::system::error_code plus an arbitrary value. Note the use of a -// single-argument yield_handler: an error_code indicating error will be -// conveyed to consumer code via an exception. Normal return implies (! -// error_code). -//[asio_handler_type -template< typename Allocator, typename ReturnType, typename Arg2 > -struct handler_type< boost::fibers::asio::yield_t< Allocator >, - ReturnType( boost::system::error_code, Arg2) > { - typedef fibers::asio::detail::yield_handler< Arg2 > type; -}; -//] +// Handler type specialisation for use_future. +template< typename ReturnType > +struct handler_type< + boost::fibers::asio::yield_t, + ReturnType( boost::system::error_code) +> +{ typedef boost::fibers::asio::detail::yield_handler< void > type; }; + +// Handler type specialisation for use_future. +template< typename ReturnType, typename Arg2 > +struct handler_type< + boost::fibers::asio::yield_t, + ReturnType( boost::system::error_code, Arg2) +> +{ typedef boost::fibers::asio::detail::yield_handler< Arg2 > type; }; }} diff --git a/examples/asio/echo_client.cpp b/examples/asio/echo_client.cpp index 91dca760..87fdd012 100644 --- a/examples/asio/echo_client.cpp +++ b/examples/asio/echo_client.cpp @@ -15,49 +15,41 @@ #include #include #include + #include using boost::asio::ip::tcp; -enum { max_length = 1024 }; +enum { + max_length = 1024 +}; -int main(int argc, char* argv[]) -{ - try - { - if (argc != 3) - { - std::cerr << "Usage: echo_client \n"; - return 1; +int main( int argc, char* argv[]) { + try { + if ( 3 != argc) { + std::cerr << "Usage: echo_client \n"; + return EXIT_FAILURE; + } + boost::asio::io_service io_service; + tcp::resolver resolver( io_service); + tcp::resolver::query query( tcp::v4(), argv[1], argv[2]); + tcp::resolver::iterator iterator = resolver.resolve( query); + tcp::socket s( io_service); + boost::asio::connect( s, iterator); + std::cout << "Enter message: "; + char request[max_length]; + std::cin.getline( request, max_length); + size_t request_length = std::strlen( request); + boost::asio::write( s, boost::asio::buffer( request, request_length) ); + char reply[max_length]; + size_t reply_length = boost::asio::read( s, boost::asio::buffer( reply, request_length) ); + std::cout << "Reply is: "; + std::cout.write( reply, reply_length); + std::cout << "\n"; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; } - boost::asio::io_service io_service; - - tcp::resolver resolver(io_service); - tcp::resolver::query query(tcp::v4(), argv[1], argv[2]); - tcp::resolver::iterator iterator = resolver.resolve(query); - - tcp::socket s(io_service); - boost::asio::connect(s, iterator); - - using namespace std; // For strlen. - std::cout << "Enter message: "; - char request[max_length]; - std::cin.getline(request, max_length); - size_t request_length = strlen(request); - boost::asio::write(s, boost::asio::buffer(request, request_length)); - - char reply[max_length]; - size_t reply_length = boost::asio::read(s, - boost::asio::buffer(reply, request_length)); - std::cout << "Reply is: "; - std::cout.write(reply, reply_length); - std::cout << "\n"; - } - catch (std::exception& e) - { - std::cerr << "Exception: " << e.what() << "\n"; - } - - return 0; + return EXIT_FAILURE; } diff --git a/examples/asio/echo_server.cpp b/examples/asio/echo_server_spawn.cpp similarity index 65% rename from examples/asio/echo_server.cpp rename to examples/asio/echo_server_spawn.cpp index ded9061e..46ccd85d 100644 --- a/examples/asio/echo_server.cpp +++ b/examples/asio/echo_server_spawn.cpp @@ -17,8 +17,8 @@ #include #include - -#include "asio_scheduler.hpp" +#include "round_robin.hpp" +#include "spawn.hpp" #include "yield.hpp" using boost::asio::ip::tcp; @@ -27,47 +27,38 @@ const int max_length = 1024; typedef boost::shared_ptr< tcp::socket > socket_ptr; -void session( socket_ptr sock) -{ - try - { +void session( socket_ptr sock) { + try { std::cout << "handler request" << std::endl; - for (;;) - { + for (;;) { char data[max_length]; - boost::system::error_code ec; - std::cout << "before asyc_ready" << std::endl; std::size_t length = sock->async_read_some( boost::asio::buffer( data), boost::fibers::asio::yield[ec]); - std::cout << "after asyc_ready" << std::endl; - if ( ec == boost::asio::error::eof) + if ( ec == boost::asio::error::eof) { break; //connection closed cleanly by peer - else if ( ec) + } else if ( ec) { throw boost::system::system_error( ec); //some other error - - std::cout << "before asyc_write" << std::endl; + } boost::asio::async_write( * sock, boost::asio::buffer( data, length), boost::fibers::asio::yield[ec]); - std::cout << "after asyc_write" << std::endl; - if ( ec == boost::asio::error::eof) + if ( ec == boost::asio::error::eof) { break; //connection closed cleanly by peer - else if ( ec) + } else if ( ec) { throw boost::system::system_error( ec); //some other error + } } + } catch ( std::exception const& e) { + std::cerr << "Exception in fiber: " << e.what() << "\n"; } - catch ( std::exception const& e) - { std::cerr << "Exception in fiber: " << e.what() << "\n"; } } -void server( boost::asio::io_service & io_service, unsigned short port) -{ +void server( boost::asio::io_service & io_service, unsigned short port) { tcp::acceptor a( io_service, tcp::endpoint( tcp::v4(), port) ); - for (;;) - { + for (;;) { socket_ptr socket( new tcp::socket( io_service) ); boost::system::error_code ec; std::cout << "wait for accept" << std::endl; @@ -76,32 +67,29 @@ void server( boost::asio::io_service & io_service, unsigned short port) boost::fibers::asio::yield[ec]); std::cout << "accepted" << std::endl; if ( ! ec) { - boost::fibers::fiber( - boost::bind( session, socket) ).detach(); + boost::fibers::asio::spawn( + io_service, + boost::bind( session, socket) ); } } } -int main( int argc, char* argv[]) -{ - try - { - if ( argc != 2) - { +int main( int argc, char* argv[]) { + try { + if ( 2 != argc) { std::cerr << "Usage: echo_server \n"; - return 1; + return EXIT_FAILURE; } - boost::asio::io_service io_service; - boost::fibers::use_scheduling_algorithm< asio_scheduler >( io_service); - - boost::fibers::fiber( - boost::bind( server, boost::ref( io_service), std::atoi( argv[1]) ) ).detach(); - + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service); + boost::fibers::asio::spawn( + io_service, + boost::bind( server, boost::ref( io_service), std::atoi( argv[1]) ) ); io_service.run(); + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; } - catch ( std::exception const& e) - { std::cerr << "Exception: " << e.what() << "\n"; } - return 0; + return EXIT_FAILURE; } diff --git a/examples/asio/echo_server_yield.cpp b/examples/asio/echo_server_yield.cpp new file mode 100644 index 00000000..a800bab8 --- /dev/null +++ b/examples/asio/echo_server_yield.cpp @@ -0,0 +1,91 @@ +// +// echo_server.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// 2013 Oliver Kowalke +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +#include +#include +#include + +#include +#include "round_robin.hpp" +#include "yield.hpp" + +using boost::asio::ip::tcp; + +const int max_length = 1024; + +typedef boost::shared_ptr< tcp::socket > socket_ptr; + +void session( socket_ptr sock) { + try { + std::cout << "handler request" << std::endl; + for (;;) { + char data[max_length]; + boost::system::error_code ec; + std::size_t length = sock->async_read_some( + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + boost::asio::async_write( + * sock, + boost::asio::buffer( data, length), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + } + } catch ( std::exception const& e) { + std::cerr << "Exception in fiber: " << e.what() << "\n"; + } +} + +void server( boost::asio::io_service & io_service, unsigned short port) { + tcp::acceptor a( io_service, tcp::endpoint( tcp::v4(), port) ); + for (;;) { + socket_ptr socket( new tcp::socket( io_service) ); + boost::system::error_code ec; + std::cout << "wait for accept" << std::endl; + a.async_accept( + * socket, + boost::fibers::asio::yield[ec]); + std::cout << "accepted" << std::endl; + if ( ! ec) { + boost::fibers::fiber( session, socket).detach(); + } + } +} + +int main( int argc, char* argv[]) { + try { + if ( 2 != argc) { + std::cerr << "Usage: echo_server \n"; + return EXIT_FAILURE; + } + boost::asio::io_service io_service; + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service); + boost::fibers::fiber( + server, boost::ref( io_service), std::atoi( argv[1]) ).detach(); + io_service.run(); + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/examples/asio/io_service.cpp b/examples/asio/io_service.cpp deleted file mode 100644 index a062f293..00000000 --- a/examples/asio/io_service.cpp +++ /dev/null @@ -1,173 +0,0 @@ -io_service::io_service( boost::asio::io_service & svc) BOOST_NOEXCEPT : - io_service_( svc), - active_fiber_() -{} - -io_service::~io_service() BOOST_NOEXCEPT -{ -#if 0 - BOOST_FOREACH( detail::fiber_base::ptr_t const& p, rqueue_) - { p->release(); } - - BOOST_FOREACH( detail::fiber_base::ptr_t const& p, wqueue_) - { p->release(); } -#endif -} - -void -io_service::spawn( detail::fiber_base::ptr_t const& f) -{ - BOOST_ASSERT( f); - BOOST_ASSERT( f->is_ready() ); - - // store active fiber in local var - detail::fiber_base::ptr_t tmp = active_fiber_; - try - { - // assign new fiber to active fiber - active_fiber_ = f; - // set active fiber to state_running - active_fiber_->set_running(); - // resume active fiber - active_fiber_->resume(); - // fiber is resumed - - BOOST_ASSERT( f == active_fiber_); - } - catch (...) - { - // reset active fiber to previous - active_fiber_ = tmp; - throw; - } - // reset active fiber to previous - active_fiber_ = tmp; -} - -bool -io_service::run() -{ - // loop over waiting queue - wqueue_t wqueue; - wqueue_work_t wqueue_work; - BOOST_FOREACH( detail::fiber_base::ptr_t const& f, wqueue_) - { - BOOST_ASSERT( ! f->is_running() ); - BOOST_ASSERT( ! f->is_terminated() ); - - // set fiber to state_ready if interruption was requested - // or the fiber was woken up - if ( f->interruption_requested() ) - f->set_ready(); - if ( f->is_ready() ) - { - io_service_.post( - [this, f]() - { - if ( f->is_waiting() ) - { - wqueue_.push_back( f); - wqueue_work_.push_back( boost::asio::io_service::work( io_service_) ); - } - else if ( f->is_ready() ) spawn( f); - else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue"); - }); - } - else - { - wqueue.push_back( f); - wqueue_work.push_back( boost::asio::io_service::work( io_service_) ); - } - } - // exchange local with global waiting queue - wqueue_.swap( wqueue); - wqueue_work_.swap( wqueue_work); - - return io_service_.run_one() > 0; -} - -void -io_service::wait() -{ - BOOST_ASSERT( active_fiber_); - BOOST_ASSERT( active_fiber_->is_running() ); - - // set active_fiber to state_waiting - active_fiber_->set_waiting(); - // push active fiber to wqueue_ - wqueue_.push_back( active_fiber_); - wqueue_work_.push_back( boost::asio::io_service::work( io_service_) ); - // store active fiber in local var - detail::fiber_base::ptr_t tmp = active_fiber_; - // suspend active fiber - active_fiber_->suspend(); - // fiber is resumed - - BOOST_ASSERT( tmp == active_fiber_); - BOOST_ASSERT( active_fiber_->is_running() ); -} - -void -io_service::yield() -{ - BOOST_ASSERT( active_fiber_); - BOOST_ASSERT( active_fiber_->is_running() ); - - // set active fiber to state_waiting - active_fiber_->set_ready(); - // push active fiber to wqueue_ - wqueue_.push_back( active_fiber_); - wqueue_work_.push_back( boost::asio::io_service::work( io_service_) ); - // store active fiber in local var - detail::fiber_base::ptr_t tmp = active_fiber_; - // suspend acitive fiber - active_fiber_->suspend(); - // fiber is resumed - - BOOST_ASSERT( tmp == active_fiber_); - BOOST_ASSERT( active_fiber_->is_running() ); -} - -void -io_service::join( detail::fiber_base::ptr_t const& f) -{ - BOOST_ASSERT( f); - BOOST_ASSERT( f != active_fiber_); - - if ( active_fiber_) - { - // set active fiber to state_waiting - active_fiber_->set_waiting(); - // push active fiber to wqueue_ - wqueue_.push_back( active_fiber_); - wqueue_work_.push_back( boost::asio::io_service::work( io_service_)); - // add active fiber to joinig-list of f - if ( ! f->join( active_fiber_) ) - // f must be already terminated therefore we set - // active fiber to state_ready - // FIXME: better state_running and no suspend - active_fiber_->set_ready(); - // store active fiber in local var - detail::fiber_base::ptr_t tmp = active_fiber_; - // suspend fiber until f terminates - active_fiber_->suspend(); - // fiber is resumed by f - - BOOST_ASSERT( tmp == active_fiber_); - BOOST_ASSERT( active_fiber_->is_running() ); - - // check if fiber was interrupted - this_fiber::interruption_point(); - } - else - { - while ( ! f->is_terminated() ) - run(); - } - - // check if joined fiber has an exception - // and rethrow exception - if ( f->has_exception() ) f->rethrow(); - - BOOST_ASSERT( f->is_terminated() ); -} diff --git a/examples/asio/loop.hpp b/examples/asio/loop.hpp deleted file mode 100644 index be8538ab..00000000 --- a/examples/asio/loop.hpp +++ /dev/null @@ -1,43 +0,0 @@ - -// Copyright Eugene Yakubovich 2014. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#include - -#include -#include -#include - -#include - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { -namespace asio { - -//[timer_handler -inline void timer_handler( boost::asio::high_resolution_timer & timer) { - boost::this_fiber::yield(); - timer.expires_from_now( boost::fibers::wait_interval() ); - timer.async_wait( std::bind( timer_handler, std::ref( timer) ) ); -} -//] - -//[run_service -inline void run_service( boost::asio::io_service & io_service) { - boost::asio::high_resolution_timer timer( io_service, std::chrono::seconds(0) ); - timer.async_wait( std::bind( timer_handler, std::ref( timer) ) ); - io_service.run(); -} -//] - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif diff --git a/examples/asio/promise_completion_token.hpp b/examples/asio/promise_completion_token.hpp deleted file mode 100644 index e8e9a590..00000000 --- a/examples/asio/promise_completion_token.hpp +++ /dev/null @@ -1,88 +0,0 @@ -// -// promise_completion_token.hpp -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// -// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// modified by Oliver Kowalke and Nat Goodspeed -// - -#ifndef BOOST_FIBERS_ASIO_PROMISE_COMPLETION_TOKEN_HPP -#define BOOST_FIBERS_ASIO_PROMISE_COMPLETION_TOKEN_HPP - -#include - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { -namespace asio { - -/// Common base class for yield_t and use_future_t. See also yield.hpp and -/// use_future.hpp. -/** - * The awkward name of this class is because it's not intended to be used - * directly in user code: it's the common base class for a couple of user- - * facing placeholder classes yield_t and use_future_t. They - * share a common handler class promise_handler. - * - * Each subclass (e.g. use_future_t) has a canonical instance - * (use_future). These may be used in the following ways as a - * Boost.Asio asynchronous operation completion token: - * - *
- *
boost::fibers::asio::use_future
- *
This is the canonical instance of use_future_t, provided - * solely for convenience. It causes promise_handler to allocate its - * internal boost::fibers::promise using a default-constructed - * default allocator (std::allocator).
- *
boost::fibers::asio::use_future::with(alloc_instance)
- *
This usage specifies an alternate allocator instance - * alloc_instance. It causes promise_handler to allocate its - * internal boost::fibers::promise using the specified - * allocator.
- *
- */ -//[fibers_asio_promise_completion_token -template< typename Allocator > -class promise_completion_token { -public: - typedef Allocator allocator_type; - - /// Construct using default-constructed allocator. - BOOST_CONSTEXPR promise_completion_token() : - ec_( nullptr) { - } - - /// Construct using specified allocator. - explicit promise_completion_token( Allocator const& allocator) : - ec_( nullptr), - allocator_( allocator) { - } - - /// Obtain allocator. - allocator_type get_allocator() const { - return allocator_; - } - -//private: - // used by some subclasses to bind an error_code to suppress exceptions - boost::system::error_code * ec_; - -private: - Allocator allocator_; -}; -//] - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBERS_ASIO_PROMISE_COMPLETION_TOKEN_HPP diff --git a/examples/asio/ps/publisher.cpp b/examples/asio/ps/publisher.cpp new file mode 100644 index 00000000..55728826 --- /dev/null +++ b/examples/asio/ps/publisher.cpp @@ -0,0 +1,52 @@ +// +// blocking_tcp_echo_client.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include + +using boost::asio::ip::tcp; + +enum { + max_length = 1024 +}; + +int main( int argc, char* argv[]) { + try { + if ( 3 != argc) { + std::cerr << "Usage: publisher \n"; + return EXIT_FAILURE; + } + boost::asio::io_service io_service; + tcp::resolver resolver( io_service); + tcp::resolver::query query( tcp::v4(), argv[1], "9997"); + tcp::resolver::iterator iterator = resolver.resolve(query); + tcp::socket s( io_service); + boost::asio::connect( s, iterator); + char msg[max_length]; + std::string channel( argv[2]); + std::memset( msg, '\0', max_length); + std::memcpy( msg, channel.c_str(), channel.size() ); + boost::asio::write( s, boost::asio::buffer( msg, max_length) ); + for (;;) { + std::cout << "publish: "; + char request[max_length]; + std::cin.getline( request, max_length); + boost::asio::write( s, boost::asio::buffer( request, max_length) ); + } + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/examples/asio/publish_subscribe/server.cpp b/examples/asio/ps/server.cpp similarity index 78% rename from examples/asio/publish_subscribe/server.cpp rename to examples/asio/ps/server.cpp index 8c92b7d8..ba27b37b 100644 --- a/examples/asio/publish_subscribe/server.cpp +++ b/examples/asio/ps/server.cpp @@ -27,9 +27,6 @@ #include -#include "../loop.hpp" -#include "../yield.hpp" - using boost::asio::ip::tcp; const std::size_t max_length = 1024; @@ -39,30 +36,30 @@ typedef boost::shared_ptr subscriber_session_ptr; // a channel has n subscribers (subscriptions) // this class holds a list of subcribers for one channel -class subscriptions -{ +class subscriptions { public: ~subscriptions(); // subscribe to this channel - void subscribe( subscriber_session_ptr const& s) - { subscribers_.insert( s); } + void subscribe( subscriber_session_ptr const& s) { + subscribers_.insert( s); + } // unsubscribe from this channel - void unsubscribe( subscriber_session_ptr const& s) - { subscribers_.erase(s); } + void unsubscribe( subscriber_session_ptr const& s) { + subscribers_.erase(s); + } // publish a message, e.g. push this message to all subscribers void publish( std::string const& msg); private: // list of subscribers - std::set subscribers_; + std::set< subscriber_session_ptr > subscribers_; }; // a class to register channels and to subsribe clients to this channels -class registry : private boost::noncopyable -{ +class registry : private boost::noncopyable { private: typedef std::map< std::string, boost::shared_ptr< subscriptions > > channels_cont; typedef channels_cont::iterator channels_iter; @@ -70,102 +67,93 @@ private: boost::fibers::mutex mtx_; channels_cont channels_; - void register_channel_( std::string const& channel) - { - if ( channels_.end() != channels_.find( channel) ) + void register_channel_( std::string const& channel) { + if ( channels_.end() != channels_.find( channel) ) { throw std::runtime_error("channel already exists"); + } channels_[channel] = boost::make_shared< subscriptions >(); std::cout << "new channel '" << channel << "' registered" << std::endl; } - void unregister_channel_( std::string const& channel) - { + void unregister_channel_( std::string const& channel) { channels_.erase( channel); std::cout << "channel '" << channel << "' unregistered" << std::endl; } - void subscribe_( std::string const& channel, subscriber_session_ptr s) - { + void subscribe_( std::string const& channel, subscriber_session_ptr s) { channels_iter iter = channels_.find( channel); - if ( channels_.end() == iter ) + if ( channels_.end() == iter ) { throw std::runtime_error("channel does not exist"); + } iter->second->subscribe( s); std::cout << "new subscription to channel '" << channel << "'" << std::endl; } - void unsubscribe_( std::string const& channel, subscriber_session_ptr s) - { + void unsubscribe_( std::string const& channel, subscriber_session_ptr s) { channels_iter iter = channels_.find( channel); - if ( channels_.end() != iter ) + if ( channels_.end() != iter ) { iter->second->unsubscribe( s); + } } - void publish_( std::string const& channel, std::string const& msg) - { + void publish_( std::string const& channel, std::string const& msg) { channels_iter iter = channels_.find( channel); - if ( channels_.end() == iter ) + if ( channels_.end() == iter ) { throw std::runtime_error("channel does not exist"); + } iter->second->publish( msg); std::cout << "message '" << msg << "' to publish on channel '" << channel << "'" << std::endl; } public: // add a channel to registry - void register_channel( std::string const& channel) - { + void register_channel( std::string const& channel) { std::unique_lock< boost::fibers::mutex > lk( mtx_); register_channel_( channel); } // remove a channel from registry - void unregister_channel( std::string const& channel) - { + void unregister_channel( std::string const& channel) { std::unique_lock< boost::fibers::mutex > lk( mtx_); unregister_channel_( channel); } // subscribe to a channel - void subscribe( std::string const& channel, subscriber_session_ptr s) - { + void subscribe( std::string const& channel, subscriber_session_ptr s) { std::unique_lock< boost::fibers::mutex > lk( mtx_); subscribe_( channel, s); } // unsubscribe from a channel - void unsubscribe( std::string const& channel, subscriber_session_ptr s) - { + void unsubscribe( std::string const& channel, subscriber_session_ptr s) { std::unique_lock< boost::fibers::mutex > lk( mtx_); unsubscribe_( channel, s); } // publish a message to all subscribers registerd to the channel - void publish( std::string const& channel, std::string const& msg) - { + void publish( std::string const& channel, std::string const& msg) { std::unique_lock< boost::fibers::mutex > lk( mtx_); publish_( channel, msg); } }; // a subscriber subscribes to a given channel in order to receive messages published on this channel -class subscriber_session : public boost::enable_shared_from_this< subscriber_session > -{ +class subscriber_session : public boost::enable_shared_from_this< subscriber_session > { public: explicit subscriber_session( boost::asio::io_service & io_service, registry & reg) : socket_( io_service), - reg_( reg) - {} + reg_( reg) { + } - tcp::socket& socket() - { return socket_; } + tcp::socket& socket() { + return socket_; + } // this function is executed inside the fiber - void run() - { + void run() { std::string channel; - try - { + try { boost::system::error_code ec; - // read first message == channel name // async_ready() returns if the the complete message is read // until this the fiber is suspended until the complete message @@ -174,17 +162,16 @@ public: socket_, boost::asio::buffer( data_), boost::fibers::asio::yield[ec]); - if ( ec) throw std::runtime_error("no channel from subscriber"); + if ( ec) { + throw std::runtime_error("no channel from subscriber"); + } // first message ist equal to the channel name the publisher // publishes to channel = data_; - // subscribe to new channel reg_.subscribe( channel, shared_from_this() ); - // read published messages - for (;;) - { + for (;;) { // wait for a conditon-variable for new messages // the fiber will be suspended until the condtion // becomes true and the fiber is resumed @@ -194,10 +181,10 @@ public: std::string data( data_); lk.unlock(); std::cout << "subscriber::run(): '" << data << std::endl; - // message '' terminates subscription - if ( "" == data) break; - + if ( "" == data) { + break; + } // async. write message to socket connected with // subscriber // async_write() returns if the complete message was writen @@ -206,16 +193,16 @@ public: socket_, boost::asio::buffer( data, data.size() ), boost::fibers::asio::yield[ec]); - if ( ec == boost::asio::error::eof) + if ( ec == boost::asio::error::eof) { break; //connection closed cleanly by peer - else if ( ec) + } else if ( ec) { throw boost::system::system_error( ec); //some other error + } std::cout << "subscriber::run(): '" << data << " written" << std::endl; } + } catch ( std::exception const& e) { + std::cerr << "subscriber [" << channel << "] failed: " << e.what() << std::endl; } - catch ( std::exception const& e) - { std::cerr << "subscriber [" << channel << "] failed: " << e.what() << std::endl; } - // close socket socket_.close(); // unregister channel @@ -223,11 +210,10 @@ public: } // called from publisher_session (running in other fiber) - void publish( std::string const& msg) - { + void publish( std::string const& msg) { std::unique_lock< boost::fibers::mutex > lk( mtx_); - std::memset(data_, '\0', sizeof( data_)); - std::memcpy(data_, msg.c_str(), (std::min)(max_length, msg.size())); + std::memset( data_, '\0', sizeof( data_)); + std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size())); cond_.notify_one(); } @@ -241,43 +227,39 @@ private: }; -subscriptions::~subscriptions() -{ - BOOST_FOREACH( subscriber_session_ptr s, subscribers_) - { s->publish(""); } +subscriptions::~subscriptions() { + BOOST_FOREACH( subscriber_session_ptr s, subscribers_) { + s->publish(""); + } } void -subscriptions::publish( std::string const& msg) -{ - BOOST_FOREACH( subscriber_session_ptr s, subscribers_) - { s->publish( msg); } +subscriptions::publish( std::string const& msg) { + BOOST_FOREACH( subscriber_session_ptr s, subscribers_) { + s->publish( msg); + } } // a publisher publishes messages on its channel // subscriber might register to this channel to get the published messages -class publisher_session : public boost::enable_shared_from_this< publisher_session > -{ +class publisher_session : public boost::enable_shared_from_this< publisher_session > { public: explicit publisher_session( boost::asio::io_service & io_service, registry & reg) : socket_( io_service), - reg_( reg) - {} + reg_( reg) { + } - tcp::socket& socket() - { return socket_; } + tcp::socket& socket() { + return socket_; + } // this function is executed inside the fiber - void run() - { + void run() { std::string channel; - try - { + try { boost::system::error_code ec; - // fixed size message char data[max_length]; - // read first message == channel name // async_ready() returns if the the complete message is read // until this the fiber is suspended until the complete message @@ -286,17 +268,16 @@ public: socket_, boost::asio::buffer( data), boost::fibers::asio::yield[ec]); - if ( ec) throw std::runtime_error("no channel from publisher"); + if ( ec) { + throw std::runtime_error("no channel from publisher"); + } // first message ist equal to the channel name the publisher // publishes to channel = data; - // register the new channel reg_.register_channel( channel); - // start publishing messages - for (;;) - { + for (;;) { // read message from publisher asyncronous // async_read() suspends this fiber until the complete emssage is read // and stored in the given buffer 'data' @@ -304,18 +285,17 @@ public: socket_, boost::asio::buffer( data), boost::fibers::asio::yield[ec]); - if ( ec == boost::asio::error::eof) + if ( ec == boost::asio::error::eof) { break; //connection closed cleanly by peer - else if ( ec) + } else if ( ec) { throw boost::system::system_error( ec); //some other error - + } // publish message to all subscribers reg_.publish( channel, std::string( data) ); } + } catch ( std::exception const& e) { + std::cerr << "publisher [" << channel << "] failed: " << e.what() << std::endl; } - catch ( std::exception const& e) - { std::cerr << "publisher [" << channel << "] failed: " << e.what() << std::endl; } - // close socket socket_.close(); // unregister channel @@ -331,15 +311,12 @@ typedef boost::shared_ptr< publisher_session > publisher_session_ptr; // function accepts connections requests from clients acting as a publisher void accept_publisher( boost::asio::io_service& io_service, - unsigned short port, - registry & reg) -{ + unsigned short port, + registry & reg) { // create TCP-acceptor tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) ); - // loop for accepting connection requests - for (;;) - { + for (;;) { boost::system::error_code ec; // create new publisher-session // this instance will be associated with one publisher @@ -363,14 +340,11 @@ void accept_publisher( boost::asio::io_service& io_service, // function accepts connections requests from clients acting as a subscriber void accept_subscriber( boost::asio::io_service& io_service, unsigned short port, - registry & reg) -{ + registry & reg) { // create TCP-acceptor tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) ); - // loop for accepting connection requests - for (;;) - { + for (;;) { boost::system::error_code ec; // create new subscriber-session // this instance will be associated with one subscriber @@ -392,30 +366,26 @@ void accept_subscriber( boost::asio::io_service& io_service, } -int main( int argc, char* argv[]) -{ - try - { +int main( int argc, char* argv[]) { + try { // create io_service for async. I/O boost::asio::io_service io_service; - + // register asio scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service); // registry for channels and its subscription registry reg; - // create an acceptor for publishers, run it as fiber boost::fibers::fiber( - boost::bind( accept_publisher, - boost::ref( io_service), 9997, boost::ref( reg)) ).detach(); - + accept_publisher, boost::ref( io_service), 9997, boost::ref( reg) ).detach(); // create an acceptor for subscribers, run it as fiber boost::fibers::fiber( - boost::bind( accept_subscriber, - boost::ref( io_service), 9998, boost::ref( reg)) ).detach(); - - boost::fibers::asio::run_service( io_service); + accept_subscriber, boost::ref( io_service), 9998, boost::ref( reg) ).detach(); + // dispatch + io_service.run(); + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; } - catch ( std::exception const& e) - { std::cerr << "Exception: " << e.what() << "\n"; } - return 0; + return EXIT_FAILURE; } diff --git a/examples/asio/ps/subscriber.cpp b/examples/asio/ps/subscriber.cpp new file mode 100644 index 00000000..7acabe70 --- /dev/null +++ b/examples/asio/ps/subscriber.cpp @@ -0,0 +1,53 @@ +// +// blocking_tcp_echo_client.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include + +using boost::asio::ip::tcp; + +enum { + max_length = 1024 +}; + +int main( int argc, char* argv[]) { + try { + if ( 3 != argc) { + std::cerr << "Usage: subscriber \n"; + return EXIT_FAILURE; + } + boost::asio::io_service io_service; + tcp::resolver resolver( io_service); + tcp::resolver::query query( tcp::v4(), argv[1], "9998"); + tcp::resolver::iterator iterator = resolver.resolve( query); + tcp::socket s( io_service); + boost::asio::connect( s, iterator); + char msg[max_length]; + std::string channel( argv[2]); + std::memset( msg, '\0', max_length); + std::memcpy( msg, channel.c_str(), channel.size() ); + boost::asio::write( s, boost::asio::buffer( msg, max_length) ); + for (;;) { + char reply[max_length]; + size_t reply_length = s.read_some( boost::asio::buffer( reply, max_length) ); + std::cout << "published: "; + std::cout.write( reply, reply_length); + std::cout << std::endl; + } + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return EXIT_FAILURE; +} diff --git a/examples/asio/publish_subscribe/publisher.cpp b/examples/asio/publish_subscribe/publisher.cpp deleted file mode 100644 index 6ed39745..00000000 --- a/examples/asio/publish_subscribe/publisher.cpp +++ /dev/null @@ -1,62 +0,0 @@ -// -// blocking_tcp_echo_client.cpp -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// -// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#include -#include -#include - -#include -#include -#include - -using boost::asio::ip::tcp; - -enum { max_length = 1024 }; - -int main(int argc, char* argv[]) -{ - try - { - if (argc != 3) - { - std::cerr << "Usage: publisher \n"; - return 1; - } - - boost::asio::io_service io_service; - - tcp::resolver resolver(io_service); - tcp::resolver::query query(tcp::v4(), argv[1], "9997"); - tcp::resolver::iterator iterator = resolver.resolve(query); - - tcp::socket s(io_service); - boost::asio::connect(s, iterator); - - char msg[max_length]; - std::string channel(argv[2]); - std::memset(msg, '\0', max_length); - std::memcpy(msg, channel.c_str(), channel.size() ); - boost::asio::write(s, boost::asio::buffer(msg, max_length)); - - for (;;) - { - std::cout << "publish: "; - char request[max_length]; - std::cin.getline(request, max_length); - boost::asio::write(s, boost::asio::buffer(request, max_length)); - } - } - catch ( std::exception const& e) - { - std::cerr << "Exception: " << e.what() << "\n"; - } - - return 0; -} diff --git a/examples/asio/publish_subscribe/subscriber.cpp b/examples/asio/publish_subscribe/subscriber.cpp deleted file mode 100644 index 1362730a..00000000 --- a/examples/asio/publish_subscribe/subscriber.cpp +++ /dev/null @@ -1,64 +0,0 @@ -// -// blocking_tcp_echo_client.cpp -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// -// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#include -#include -#include - -#include -#include -#include - -using boost::asio::ip::tcp; - -enum { max_length = 1024 }; - -int main(int argc, char* argv[]) -{ - try - { - if (argc != 3) - { - std::cerr << "Usage: subscriber \n"; - return 1; - } - - boost::asio::io_service io_service; - - tcp::resolver resolver(io_service); - tcp::resolver::query query(tcp::v4(), argv[1], "9998"); - tcp::resolver::iterator iterator = resolver.resolve(query); - - tcp::socket s(io_service); - boost::asio::connect(s, iterator); - - char msg[max_length]; - std::string channel(argv[2]); - std::memset(msg, '\0', max_length); - std::memcpy(msg, channel.c_str(), channel.size() ); - boost::asio::write(s, boost::asio::buffer(msg, max_length)); - - for (;;) - { - char reply[max_length]; - size_t reply_length = s.read_some( - boost::asio::buffer(reply, max_length)); - std::cout << "published: "; - std::cout.write(reply, reply_length); - std::cout << std::endl; - } - } - catch (std::exception const& e) - { - std::cerr << "Exception: " << e.what() << "\n"; - } - - return 0; -} diff --git a/examples/asio/asio_scheduler.hpp b/examples/asio/round_robin.hpp similarity index 65% rename from examples/asio/asio_scheduler.hpp rename to examples/asio/round_robin.hpp index d56be631..9761f6e2 100644 --- a/examples/asio/asio_scheduler.hpp +++ b/examples/asio/round_robin.hpp @@ -3,38 +3,50 @@ // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) -#ifndef ASIO_SCHEDULER_H -#define ASIO_SCHEDULER_H +#ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H +#define BOOST_FIBERS_ASIO_ROUND_ROBIN_H #include #include +#include #include #include -#include +#include +#include +#include +#include "yield.hpp" #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX #endif -class asio_scheduler : public boost::fibers::sched_algorithm { +namespace boost { +namespace fibers { +namespace asio { + +class round_robin : public boost::fibers::sched_algorithm { private: boost::asio::io_service & io_svc_; boost::asio::steady_timer timer_; boost::fibers::scheduler::ready_queue_t ready_queue_; - void do_yield() { - boost::this_fiber::yield(); - io_svc_.post( std::bind( & asio_scheduler::do_yield, this) ); + void progress_() { + boost::this_fiber::yield(); + io_svc_.post( + std::bind( & round_robin::progress_, this) ); } public: - asio_scheduler( boost::asio::io_service & io_svc) : + round_robin( boost::asio::io_service & io_svc) : io_svc_( io_svc), timer_( io_svc_), ready_queue_() { - io_svc_.post( std::bind( & asio_scheduler::do_yield, this) ); +#if 0 + io_svc_.post( + std::bind( & round_robin::progress_, this) ); +#endif } void awakened( boost::fibers::context * ctx) { @@ -59,10 +71,15 @@ public: return ! ready_queue_.empty(); } + void foo_bar() { + while ( 0 < io_svc_.poll_one() ) + std::cout << "foo_bar()" << std::endl; + } + void suspend_until( std::chrono::steady_clock::time_point const& suspend_time) { timer_.expires_at( suspend_time); boost::system::error_code ignored_ec; - timer_.wait( ignored_ec); + timer_.async_wait( boost::fibers::asio::yield[ignored_ec]); } void notify() { @@ -70,8 +87,10 @@ public: } }; +}}} + #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_SUFFIX #endif -#endif // ASIO_SCHEDULER_H +#endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H diff --git a/examples/asio/sleep.cpp b/examples/asio/sleep.cpp index 8b850ef3..29f189bb 100644 --- a/examples/asio/sleep.cpp +++ b/examples/asio/sleep.cpp @@ -14,24 +14,42 @@ #include #include +#include #include -#include "asio_scheduler.hpp" +#include "round_robin.hpp" #include "spawn.hpp" -void foo( boost::fibers::asio::yield_context yield) { - std::cout << "sleep 1s" << std::endl; +void foo( boost::asio::io_service & io_svc, boost::fibers::asio::yield_context yield) { + boost::asio::steady_timer timer( io_svc); + boost::system::error_code ignored_ec; + + std::cout << "foo(): sleep 1s" << std::endl; boost::this_fiber::sleep_for( std::chrono::seconds( 1) ); - std::cout << "woken up after 1s" << std::endl; + std::cout << "foo(): woken up after 1s" << std::endl; + + std::cout << "foo(): sleep 1s" << std::endl; + timer.expires_from_now( std::chrono::seconds( 1) ); + timer.async_wait( boost::fibers::asio::yield[ignored_ec]); + std::cout << "foo(): woken up after 1s" << std::endl; + + std::cout << "foo(): sleep 1s" << std::endl; + timer.expires_from_now( std::chrono::seconds( 1) ); + timer.async_wait( boost::fibers::asio::yield[ignored_ec]); + std::cout << "foo(): woken up after 1s" << std::endl; + + std::cout << "foo(): sleep 1s" << std::endl; + boost::this_fiber::sleep_for( std::chrono::seconds( 1) ); + std::cout << "foo(): woken up after 1s" << std::endl; } int main( int argc, char* argv[]) { try { boost::asio::io_service io_svc; - boost::fibers::use_scheduling_algorithm< asio_scheduler >( io_svc); + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc); - boost::fibers::asio::spawn( io_svc, std::bind( foo, std::placeholders::_1) ); + boost::fibers::asio::spawn( io_svc, std::bind( foo, std::ref( io_svc), std::placeholders::_1) ); io_svc.run(); diff --git a/examples/asio/spawn.hpp b/examples/asio/spawn.hpp index ba3443e6..a4bc409b 100644 --- a/examples/asio/spawn.hpp +++ b/examples/asio/spawn.hpp @@ -18,7 +18,7 @@ #include #include -#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -28,193 +28,53 @@ namespace boost { namespace fibers { namespace asio { -/// Context object the represents the currently executing fiber. -/** - * The basic_yield_context class is used to represent the currently executing - * fiber. A basic_yield_context may be passed as a handler to an * asynchronous - * operation. For example: - * - * @code template< typename Handler > - * void my_fiber( basic_yield_context< Handler > yield) - * { - * ... - * std::size_t n = my_socket.async_read_some( buffer, yield); - * ... - * } @endcode - * - * The initiating function (async_read_some in the above example) suspends the - * current fiber. The fiber is resumed when the asynchronous operation - * completes, and the result of the operation is returned. - */ template< typename Handler > -class basic_yield_context -{ +class basic_yield_context { public: - /// Construct a yield context to represent the specified fiber. - /** - * Most applications do not need to use this constructor. Instead, the - * spawn() function passes a yield context as an argument to the fiber - * function. - */ basic_yield_context( boost::fibers::context * ctx, Handler& handler) : ctx_( ctx), handler_( handler), - ec_( 0) - {} - - /// Return a yield context that sets the specified error_code. - /** - * By default, when a yield context is used with an asynchronous operation, a - * non-success error_code is converted to system_error and thrown. This - * operator may be used to specify an error_code object that should instead be - * set with the asynchronous operation's result. For example: - * - * @code template< typename Handler > - * void my_fiber( basic_yield_context< Handler > yield) - * { - * ... - * std::size_t n = my_socket.async_read_some( buffer, yield[ec]); - * if ( ec) - * { - * // An error occurred. - * } - * ... - * } @endcode - */ - basic_yield_context operator[]( boost::system::error_code & ec) - { - basic_yield_context tmp( * this); - tmp.ec_ = & ec; - return tmp; + ec_( 0) { + } + + basic_yield_context operator[]( boost::system::error_code & ec) { + basic_yield_context tmp( * this); + tmp.ec_ = & ec; + return tmp; } -#if defined(GENERATING_DOCUMENTATION) private: -#endif // defined(GENERATING_DOCUMENTATION) boost::fibers::context * ctx_; Handler & handler_; boost::system::error_code * ec_; }; -#if defined(GENERATING_DOCUMENTATION) -/// Context object the represents the currently executing fiber. -typedef basic_yield_context< unspecified > yield_context; -#else // defined(GENERATING_DOCUMENTATION) typedef basic_yield_context< boost::asio::detail::wrapped_handler< boost::asio::io_service::strand, void(*)(), boost::asio::detail::is_continuation_if_running> > yield_context; -#endif // defined(GENERATING_DOCUMENTATION) -/** - * @defgroup spawn boost::fibers::asio::spawn - * - * @brief Start a new stackful fiber. - * - * The spawn() function is a high-level wrapper over the Boost.Fiber - * library. This function enables programs to implement asynchronous logic in a - * synchronous manner, as illustrated by the following example: - * - * @code boost::asio::spawn( my_strand, do_echo); - * - * // ... - * - * void do_echo( boost::fibers::asio::yield_context yield) - * { - * try - * { - * char data[128]; - * for (;;) - * { - * std::size_t length = - * my_socket.async_read_some( - * boost::asio::buffer( data), yield); - * - * boost::asio::async_write( my_socket, - * boost::asio::buffer( data, length), yield); - * } - * } - * catch ( std::exception const& e) - * { - * // ... - * } - * } @endcode - */ -/*@{*/ - -/// Start a new fiber, calling the specified handler when it completes. -/** - * This function is used to launch a new fiber. - * - * @param handler A handler to be called when the fiber exits. More - * importantly, the handler provides an execution context (via the handler - * invocation hook) for the fiber. The handler must have the signature: - * @code void handler(); @endcode - * - * @param function The fiber function. The function must have the signature: - * @code void function( basic_yield_context< Handler > yield); @endcode - * - */ template< typename Handler, typename Function > void spawn( boost::asio::io_service & io_service, BOOST_ASIO_MOVE_ARG( Handler) handler, BOOST_ASIO_MOVE_ARG( Function) function); -/// Start a new fiber, inheriting the execution context of another. -/** - * This function is used to launch a new fiber. - * - * @param ctx Identifies the current fiber as a parent of the new - * fiber. This specifies that the new fiber should inherit the - * execution context of the parent. For example, if the parent fiber is - * executing in a particular strand, then the new fiber will execute in the - * same strand. - * - * @param function The fiber function. The function must have the signature: - * @code void function( basic_yield_context< Handler > yield); @endcode - * - */ template< typename Handler, typename Function > void spawn( boost::asio::io_service & io_service, basic_yield_context< Handler > ctx, BOOST_ASIO_MOVE_ARG( Function) function); -/// Start a new fiber that executes in the contex of a strand. -/** - * This function is used to launch a new fiber. - * - * @param strand Identifies a strand. By starting multiple fibers on the - * same strand, the implementation ensures that none of those fibers can - * execute simultaneously. - * - * @param function The fiber function. The function must have the signature: - * @code void function( yield_context yield); @endcode - * - */ template< typename Function > void spawn( boost::asio::io_service::strand strand, BOOST_ASIO_MOVE_ARG( Function) function); -/// Start a new fiber that executes on a given io_service. -/** - * This function is used to launch a new fiber. - * - * @param io_service Identifies the io_service that will run the fiber. The - * new fiber is implicitly given its own strand within this io_service. - * - * @param function The fiber function. The function must have the signature: - * @code void function( yield_context yield); @endcode - * - */ template< typename Function > void spawn( boost::asio::io_service & io_service, BOOST_ASIO_MOVE_ARG( Function) function); -} // namespace asio -} // namespace fibers -} // namespace boost +}}} #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_SUFFIX diff --git a/examples/asio/use_future.hpp b/examples/asio/use_future.hpp index 66c10b0a..d2e6c3fd 100644 --- a/examples/asio/use_future.hpp +++ b/examples/asio/use_future.hpp @@ -7,15 +7,16 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -// modified by Oliver Kowalke and Nat Goodspeed +// modified by Oliver Kowalke // #ifndef BOOST_FIBERS_ASIO_USE_FUTURE_HPP #define BOOST_FIBERS_ASIO_USE_FUTURE_HPP -#include // std::allocator +#include + #include -#include "promise_completion_token.hpp" +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -25,32 +26,18 @@ namespace boost { namespace fibers { namespace asio { -/// Class used to specify that a Boost.Asio asynchronous operation should -/// return a future. -/** - * The use_future_t class is used to indicate that a Boost.Asio asynchronous - * operation should return a boost::fibers::future object. A use_future_t - * object may be passed as a handler to an asynchronous operation, typically - * using the special value @c boost::fibers::asio::use_future. For example: - * - * @code boost::fibers::future my_future - * = my_socket.async_read_some(my_buffer, boost::fibers::asio::use_future); @endcode - * - * The initiating function (async_read_some in the above example) returns a - * future that will receive the result of the operation. If the operation - * completes with an error_code indicating failure, it is converted into a - * system_error and passed back to the caller via the future. - */ template< typename Allocator = std::allocator< void > > -class use_future_t : public promise_completion_token< Allocator > { +class use_future_t { public: + typedef Allocator allocator_type; + /// Construct using default-constructed allocator. BOOST_CONSTEXPR use_future_t() { } /// Construct using specified allocator. explicit use_future_t( Allocator const& allocator) : - promise_completion_token( allocator) { + allocator_( allocator) { } /// Specify an alternate allocator. @@ -58,12 +45,23 @@ public: use_future_t< OtherAllocator > operator[]( OtherAllocator const& allocator) const { return use_future_t< OtherAllocator >( allocator); } + + /// Obtain allocator. + allocator_type get_allocator() const { + return allocator_; + } + +private: + Allocator allocator_; }; /// A special value, similar to std::nothrow. +/** + * See the documentation for boost::asio::use_future_t for a usage example. + */ BOOST_CONSTEXPR_OR_CONST use_future_t<> use_future; -}}} // namespace asio +}}} #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_SUFFIX diff --git a/examples/asio/yield.hpp b/examples/asio/yield.hpp index 1e32e626..099cf519 100644 --- a/examples/asio/yield.hpp +++ b/examples/asio/yield.hpp @@ -7,15 +7,15 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -// modified by Oliver Kowalke and Nat Goodspeed +// modified by Oliver Kowalke // #ifndef BOOST_FIBERS_ASIO_YIELD_HPP #define BOOST_FIBERS_ASIO_YIELD_HPP -#include // std::allocator -#include -#include "promise_completion_token.hpp" +#include + +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -25,96 +25,23 @@ namespace boost { namespace fibers { namespace asio { -/// Class used to specify that a Boost.Asio asynchronous operation should -/// suspend the calling fiber until completion. -/** - * The yield_t class is used to indicate that a Boost.Asio asynchronous - * operation should suspend the calling fiber until its completion. The - * asynchronous function will either return a suitable value, or will throw an - * exception indicating the error. A yield_t object may be passed as a handler - * to an asynchronous operation, typically using the special value @c - * boost::fibers::asio::yield. For example: - * - * @code std::size_t length_read - * = my_socket.async_read_some(my_buffer, boost::fibers::asio::yield); @endcode - * - * The initiating function (async_read_some in the above example) does not - * return to the calling fiber until the asynchronous read has completed. Like - * its synchronous counterpart, it returns the result of the operation. If the - * operation completes with an error_code indicating failure, it is converted - * into a system_error and thrown as an exception. - * - * To suppress a possible error exception: - * @code - * boost::system::error_code ec; - * std::size_t length_read = - * my_socket.async_read_some(my_buffer, boost::fibers::asio::yield[ec]); - * // test ec for success - * @endcode - * - * The crucial distinction between - * @code - * std::size_t length_read = my_socket.read_some(my_buffer); - * @endcode - * and - * @code - * std::size_t length_read = - * my_socket.async_read_some(my_buffer, boost::fibers::asio::yield); - * @code - * is that read_some() blocks the entire calling @em thread, whereas - * async_read_some(..., boost::fibers::asio::yield) blocks only the - * calling @em fiber, permitting other fibers on the same thread to continue - * running. - * - * To specify an alternate allocator for the internal - * boost::fibers::promise: - * @code - * boost::fibers::asio::yield.with(alloc_instance) - * @endcode - * - * To bind a boost::system::error_code @a ec as well as using an - * alternate allocator: - * @code - * boost::fibers::asio::yield.with(alloc_instance)[ec] - * @endcode - */ -//[fibers_asio_yield_t -template< typename Allocator = std::allocator< void > > -class yield_t : public promise_completion_token< Allocator > { +class yield_t { public: - /// Construct with default-constructed allocator. - BOOST_CONSTEXPR yield_t() { - } -/*= // ... ways to use an alternate allocator or bind an error_code ...*/ -/*=};*/ -//] - - /// Construct using specified allocator. - explicit yield_t( Allocator const& allocator) : - promise_completion_token< Allocator >( allocator) { + BOOST_CONSTEXPR yield_t() : + ec_( 0) { } - /// Specify an alternate allocator. - template< typename OtherAllocator > - yield_t< OtherAllocator > with( OtherAllocator const& allocator) const { - return yield_t< OtherAllocator >( allocator); - } - - /// Bind an error_code to suppress error exception. yield_t operator[]( boost::system::error_code & ec) const { - // Return a copy because typical usage will be on our canonical - // instance. Don't leave the canonical instance with a dangling - // binding to a transient error_code! yield_t tmp; tmp.ec_ = & ec; return tmp; } + +//private: + boost::system::error_code * ec_; }; -//[fibers_asio_yield -/// A special value, similar to std::nothrow. -BOOST_CONSTEXPR_OR_CONST yield_t<> yield; -//] +BOOST_CONSTEXPR_OR_CONST yield_t yield; }}}