From 253d981f52ebcafc9d698d6d896607a6f0d807f0 Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Tue, 29 Sep 2015 17:55:46 +0200 Subject: [PATCH] examples added --- examples/adapt_callbacks.cpp | 222 ++++ examples/adapt_method_calls.cpp | 167 +++ examples/adapt_nonblocking.cpp | 207 ++++ examples/asio/daytime_client.cpp | 98 ++ examples/asio/daytime_client2.cpp | 89 ++ examples/asio/detail/promise_handler.hpp | 172 +++ examples/asio/detail/use_future.hpp | 99 ++ examples/asio/detail/yield.hpp | 109 ++ examples/asio/echo_client.cpp | 63 ++ examples/asio/echo_client2.cpp | 68 ++ examples/asio/echo_server.cpp | 101 ++ examples/asio/echo_server2.cpp | 137 +++ examples/asio/loop.hpp | 43 + examples/asio/promise_completion_token.hpp | 88 ++ examples/asio/publish_subscribe/publisher.cpp | 62 + examples/asio/publish_subscribe/server.cpp | 421 +++++++ .../asio/publish_subscribe/subscriber.cpp | 64 ++ examples/asio/use_future.hpp | 74 ++ examples/asio/yield.hpp | 127 +++ examples/priority.cpp | 337 ++++++ examples/wait_stuff.cpp | 1006 +++++++++++++++++ examples/work_sharing.cpp | 178 +++ 22 files changed, 3932 insertions(+) create mode 100644 examples/adapt_callbacks.cpp create mode 100644 examples/adapt_method_calls.cpp create mode 100644 examples/adapt_nonblocking.cpp create mode 100644 examples/asio/daytime_client.cpp create mode 100644 examples/asio/daytime_client2.cpp create mode 100644 examples/asio/detail/promise_handler.hpp create mode 100644 examples/asio/detail/use_future.hpp create mode 100644 examples/asio/detail/yield.hpp create mode 100644 examples/asio/echo_client.cpp create mode 100644 examples/asio/echo_client2.cpp create mode 100644 examples/asio/echo_server.cpp create mode 100644 examples/asio/echo_server2.cpp create mode 100644 examples/asio/loop.hpp create mode 100644 examples/asio/promise_completion_token.hpp create mode 100644 examples/asio/publish_subscribe/publisher.cpp create mode 100644 examples/asio/publish_subscribe/server.cpp create mode 100644 examples/asio/publish_subscribe/subscriber.cpp create mode 100644 examples/asio/use_future.hpp create mode 100644 examples/asio/yield.hpp create mode 100644 examples/priority.cpp create mode 100644 examples/wait_stuff.cpp create mode 100644 examples/work_sharing.cpp diff --git a/examples/adapt_callbacks.cpp b/examples/adapt_callbacks.cpp new file mode 100644 index 00000000..b074500e --- /dev/null +++ b/examples/adapt_callbacks.cpp @@ -0,0 +1,222 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include +#include +#include // std::tie() +#include + +/***************************************************************************** +* example async API +*****************************************************************************/ +//[AsyncAPI +class AsyncAPI { +public: + // constructor acquires some resource that can be read and written + AsyncAPI(); + + // callbacks accept an int error code; 0 == success + typedef int errorcode; + + // write callback only needs to indicate success or failure + void init_write( std::string const& data, + std::function< void( errorcode) > const& callback); + + // read callback needs to accept both errorcode and data + void init_read( std::function< void( errorcode, std::string const&) > const&); + + // ... other operations ... +//<- + void inject_error( errorcode ec); + +private: + std::string data_; + errorcode injected_; +//-> +}; +//] + +/***************************************************************************** +* fake AsyncAPI implementation... pay no attention to the little man behind +* the curtain... +*****************************************************************************/ +AsyncAPI::AsyncAPI() : + injected_( 0) { +} + +void AsyncAPI::inject_error( errorcode ec) { + injected_ = ec; +} + +void AsyncAPI::init_write( std::string const& data, + std::function< void( errorcode) > const& callback) { + // make a local copy of injected_ + errorcode injected( injected_); + // reset it synchronously with caller + injected_ = 0; + // update data_ (this might be an echo service) + if ( ! injected) { + data_ = data; + } + // Simulate an asynchronous I/O operation by launching a detached thread + // that sleeps a bit before calling completion callback. Echo back to + // caller any previously-injected errorcode. + std::thread( [injected, callback](){ + std::this_thread::sleep_for( std::chrono::milliseconds(100) ); + callback( injected); + }).detach(); +} + +void AsyncAPI::init_read( std::function< void( errorcode, std::string const&) > const& callback) { + // make a local copy of injected_ + errorcode injected( injected_); + // reset it synchronously with caller + injected_ = 0; + // local copy of data_ so we can capture in lambda + std::string data( data_); + // Simulate an asynchronous I/O operation by launching a detached thread + // that sleeps a bit before calling completion callback. Echo back to + // caller any previously-injected errorcode. + std::thread( [injected, callback, data](){ + std::this_thread::sleep_for( std::chrono::milliseconds(100) ); + callback( injected, data); + }).detach(); +} + +/***************************************************************************** +* adapters +*****************************************************************************/ +// helper function used in a couple of the adapters +std::runtime_error make_exception( std::string const& desc, AsyncAPI::errorcode); + +//[callbacks_write_ec +AsyncAPI::errorcode write_ec( AsyncAPI & api, std::string const& data) { + boost::fibers::promise< AsyncAPI::errorcode > promise; + boost::fibers::future< AsyncAPI::errorcode > future( promise.get_future() ); + // We can confidently bind a reference to local variable 'promise' into + // the lambda callback because we know for a fact we're going to suspend + // (preserving the lifespan of both 'promise' and 'future') until the + // callback has fired. + api.init_write( data, + [&promise]( AsyncAPI::errorcode ec){ + promise.set_value( ec); + }); + return future.get(); +} +//] + +//[callbacks_write +void write( AsyncAPI & api, std::string const& data) { + AsyncAPI::errorcode ec = write_ec( api, data); + if ( ec) { + throw make_exception("write", ec); + } +} +//] + +//[callbacks_read_ec +std::pair< AsyncAPI::errorcode, std::string > read_ec( AsyncAPI & api) { + typedef std::pair< AsyncAPI::errorcode, std::string > result_pair; + boost::fibers::promise< result_pair > promise; + boost::fibers::future< result_pair > future( promise.get_future() ); + // We promise that both 'promise' and 'future' will survive until our + // lambda has been called. + api.init_read( [&promise]( AsyncAPI::errorcode ec, std::string const& data){ + promise.set_value( result_pair( ec, data) ); + }); + return future.get(); +} +//] + +//[callbacks_read +std::string read( AsyncAPI & api) { + boost::fibers::promise< std::string > promise; + boost::fibers::future< std::string > future( promise.get_future() ); + // Both 'promise' and 'future' will survive until our lambda has been + // called. + api.init_read( [&promise]( AsyncAPI::errorcode ec, std::string const& data){ + if ( ! ec) { + promise.set_value( data); + } else { + promise.set_exception( + std::make_exception_ptr( + make_exception("read", ec) ) ); + } + }); + return future.get(); +} +//] + +/***************************************************************************** +* helpers +*****************************************************************************/ +std::runtime_error make_exception( std::string const& desc, AsyncAPI::errorcode ec) { + std::ostringstream buffer; + buffer << "Error in AsyncAPI::" << desc << "(): " << ec; + return std::runtime_error( buffer.str() ); +} + +/***************************************************************************** +* driving logic +*****************************************************************************/ +int main( int argc, char *argv[]) { + AsyncAPI api; + + // successful write(): prime AsyncAPI with some data + write( api, "abcd"); + // successful read(): retrieve it + std::string data( read( api) ); + assert( data == "abcd"); + + // successful write_ec() + AsyncAPI::errorcode ec( write_ec( api, "efgh") ); + assert( ec == 0); + + // write_ec() with error + api.inject_error(1); + ec = write_ec( api, "ijkl"); + assert( ec == 1); + + // write() with error + std::string thrown; + api.inject_error(2); + try { + write(api, "mnop"); + } catch ( std::exception const& e) { + thrown = e.what(); + } + assert( thrown == make_exception("write", 2).what() ); + + // successful read_ec() +//[callbacks_read_ec_call + std::tie( ec, data) = read_ec( api); +//] + assert( ! ec); + assert( data == "efgh"); // last successful write_ec() + + // read_ec() with error + api.inject_error(3); + std::tie( ec, data) = read_ec( api); + assert( ec == 3); + // 'data' in unspecified state, don't test + + // read() with error + thrown.clear(); + api.inject_error(4); + try { + data = read(api); + } catch ( std::exception const& e) { + thrown = e.what(); + } + assert( thrown == make_exception("read", 4).what() ); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/examples/adapt_method_calls.cpp b/examples/adapt_method_calls.cpp new file mode 100644 index 00000000..7cfd78df --- /dev/null +++ b/examples/adapt_method_calls.cpp @@ -0,0 +1,167 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include // std::shared_ptr +#include +#include +#include +#include +#include +#include + +/***************************************************************************** +* example async API +*****************************************************************************/ +// introduce class-scope typedef +struct AsyncAPIBase { + // error callback accepts an int error code; 0 == success + typedef int errorcode; +}; + +//[Response +// every async operation receives a subclass instance of this abstract base +// class through which to communicate its result +struct Response { + typedef std::shared_ptr< Response > ptr; + + // called if the operation succeeds + virtual void success( std::string const& data) = 0; + + // called if the operation fails + virtual void error( AsyncAPIBase::errorcode ec) = 0; +}; +//] + +// the actual async API +class AsyncAPI: public AsyncAPIBase { +public: + // constructor acquires some resource that can be read + AsyncAPI( std::string const& data); + +//[method_init_read + // derive Response subclass, instantiate, pass Response::ptr + void init_read( Response::ptr); +//] + + // ... other operations ... + void inject_error( errorcode ec); + +private: + std::string data_; + errorcode injected_; +}; + +/***************************************************************************** +* fake AsyncAPI implementation... pay no attention to the little man behind +* the curtain... +*****************************************************************************/ +AsyncAPI::AsyncAPI( std::string const& data) : + data_( data), + injected_( 0) { +} + +void AsyncAPI::inject_error( errorcode ec) { + injected_ = ec; +} + +void AsyncAPI::init_read( Response::ptr response) { + // make a local copy of injected_ + errorcode injected( injected_); + // reset it synchronously with caller + injected_ = 0; + // local copy of data_ so we can capture in lambda + std::string data( data_); + // Simulate an asynchronous I/O operation by launching a detached thread + // that sleeps a bit before calling either completion method. + std::thread( [injected, response, data](){ + std::this_thread::sleep_for( std::chrono::milliseconds(100) ); + if ( ! injected) { + // no error, call success() + response->success( data); + } else { + // injected error, call error() + response->error( injected); + } + }).detach(); +} + +/***************************************************************************** +* adapters +*****************************************************************************/ +// helper function +std::runtime_error make_exception( std::string const& desc, AsyncAPI::errorcode); + +//[PromiseResponse +class PromiseResponse: public Response { +public: + // called if the operation succeeds + virtual void success( std::string const& data) { + promise_.set_value( data); + } + + // called if the operation fails + virtual void error( AsyncAPIBase::errorcode ec) { + promise_.set_exception( + std::make_exception_ptr( + make_exception("read", ec) ) ); + } + + boost::fibers::future< std::string > get_future() { + return promise_.get_future(); + } + +private: + boost::fibers::promise< std::string > promise_; +}; +//] + +//[method_read +std::string read( AsyncAPI & api) { + // Because init_read() requires a shared_ptr, we must allocate our + // ResponsePromise on the heap, even though we know its lifespan. + auto promisep( std::make_shared< PromiseResponse >() ); + boost::fibers::future< std::string > future( promisep->get_future() ); + // Both 'promisep' and 'future' will survive until our lambda has been + // called. + api.init_read( promisep); + return future.get(); +} +//] + +/***************************************************************************** +* helpers +*****************************************************************************/ +std::runtime_error make_exception( std::string const& desc, AsyncAPI::errorcode ec) { + std::ostringstream buffer; + buffer << "Error in AsyncAPI::" << desc << "(): " << ec; + return std::runtime_error( buffer.str() ); +} + +/***************************************************************************** +* driving logic +*****************************************************************************/ +int main(int argc, char *argv[]) { + // prime AsyncAPI with some data + AsyncAPI api("abcd"); + + // successful read(): retrieve it + std::string data( read( api) ); + assert(data == "abcd"); + + // read() with error + std::string thrown; + api.inject_error(1); + try { + data = read( api); + } catch ( std::exception const& e) { + thrown = e.what(); + } + assert(thrown == make_exception("read", 1).what() ); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/examples/adapt_nonblocking.cpp b/examples/adapt_nonblocking.cpp new file mode 100644 index 00000000..034441cd --- /dev/null +++ b/examples/adapt_nonblocking.cpp @@ -0,0 +1,207 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include +#include // std::min() +#include // EWOULDBLOCK +#include + +/***************************************************************************** +* example nonblocking API +*****************************************************************************/ +//[NonblockingAPI +class NonblockingAPI { +public: + NonblockingAPI(); + + // nonblocking operation: may return EWOULDBLOCK + int read( std::string & data, std::size_t desired); + +/*= ...*/ +//<- + // for simulating a real nonblocking API + void set_data( std::string const& data, std::size_t chunksize); + void inject_error( int ec); + +private: + std::string data_; + int injected_; + unsigned tries_; + std::size_t chunksize_; +//-> +}; +//] + +/***************************************************************************** +* fake NonblockingAPI implementation... pay no attention to the little man +* behind the curtain... +*****************************************************************************/ +NonblockingAPI::NonblockingAPI() : + injected_( 0), + tries_( 0), + chunksize_( 9999) { +} + +void NonblockingAPI::set_data( std::string const& data, std::size_t chunksize) { + data_ = data; + chunksize_ = chunksize; + // This delimits the start of a new test. Reset state. + injected_ = 0; + tries_ = 0; +} + +void NonblockingAPI::inject_error( int ec) { + injected_ = ec; +} + +int NonblockingAPI::read( std::string & data, std::size_t desired) { + // in case of error + data.clear(); + + if ( injected_) { + // copy injected_ because we're about to reset it + auto injected( injected_); + injected_ = 0; + // after an error situation, restart success count + tries_ = 0; + return injected; + } + + if ( ++tries_ < 5) { + // no injected error, but the resource isn't yet ready + return EWOULDBLOCK; + } + + // tell caller there's nothing left + if ( data_.empty() ) { + return EOF; + } + + // okay, finally have some data + // but return minimum of desired and chunksize_ + std::size_t size( ( std::min)( desired, chunksize_) ); + data = data_.substr( 0, size); + // strip off what we just returned + data_ = data_.substr( size); + // reset I/O retries count for next time + tries_ = 0; + // success + return 0; +} + +/***************************************************************************** +* adapters +*****************************************************************************/ +//[nonblocking_read_chunk +// guaranteed not to return EWOULDBLOCK +int read_chunk( NonblockingAPI & api, std::string & data, std::size_t desired) { + int error; + while ( EWOULDBLOCK == ( error = api.read( data, desired) ) ) { + // not ready yet -- try again on the next iteration of the + // application's main loop + boost::this_fiber::yield(); + } + return error; +} +//] + +//[nonblocking_read_desired +// keep reading until desired length, EOF or error +// may return both partial data and nonzero error +int read_desired( NonblockingAPI & api, std::string & data, std::size_t desired) { + // we're going to accumulate results into 'data' + data.clear(); + std::string chunk; + int error = 0; + while ( data.length() < desired && + ( error = read_chunk( api, chunk, desired - data.length() ) ) == 0) { + data.append( chunk); + } + return error; +} +//] + +//[nonblocking_IncompleteRead +// exception class augmented with both partially-read data and errorcode +class IncompleteRead : public std::runtime_error { +public: + IncompleteRead( std::string const& what, std::string const& partial, int ec) : + std::runtime_error( what), + partial_( partial), + ec_( ec) { + } + + std::string get_partial() const { + return partial_; + } + + int get_errorcode() const { + return ec_; + } + +private: + std::string partial_; + int ec_; +}; +//] + +//[nonblocking_read +// read all desired data or throw IncompleteRead +std::string read( NonblockingAPI & api, std::size_t desired) { + std::string data; + int ec( read_desired( api, data, desired) ); + + // for present purposes, EOF isn't a failure + if ( 0 == ec || EOF == ec) { + return data; + } + + // oh oh, partial read + std::ostringstream msg; + msg << "NonblockingAPI::read() error " << ec << " after " + << data.length() << " of " << desired << " characters"; + throw IncompleteRead( msg.str(), data, ec); +} +//] + +int main( int argc, char *argv[]) { + NonblockingAPI api; + const std::string sample_data("abcdefghijklmnopqrstuvwxyz"); + + // Try just reading directly from NonblockingAPI + api.set_data( sample_data, 5); + std::string data; + int ec = api.read( data, 13); + // whoops, underlying resource not ready + assert(ec == EWOULDBLOCK); + assert(data.empty()); + + // successful read() + api.set_data( sample_data, 5); + data = read( api, 13); + assert(data == "abcdefghijklm"); + + // read() with error + api.set_data( sample_data, 5); + // don't accidentally pick either EOF or EWOULDBLOCK + assert(EOF != 1); + assert(EWOULDBLOCK != 1); + api.inject_error(1); + int thrown = 0; + try { + data = read( api, 13); + } catch ( IncompleteRead const& e) { + thrown = e.get_errorcode(); + } + assert(thrown == 1); + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/examples/asio/daytime_client.cpp b/examples/asio/daytime_client.cpp new file mode 100644 index 00000000..c6801e3e --- /dev/null +++ b/examples/asio/daytime_client.cpp @@ -0,0 +1,98 @@ +// +// daytime_client.cpp +// ~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include "loop.hpp" +#include "use_future.hpp" + +using boost::asio::ip::udp; + +void get_daytime(boost::asio::io_service& io_service, const char* hostname) +{ + try + { + udp::resolver resolver(io_service); + + boost::fibers::future iter = + resolver.async_resolve( + udp::resolver::query( udp::v4(), hostname, "daytime"), + boost::fibers::asio::use_future); + + // The async_resolve operation above returns the endpoint iterator as a + // future value that is not retrieved ... + + udp::socket socket(io_service, udp::v4()); + + boost::array send_buf = {{ 0 }}; + boost::fibers::future send_length = + socket.async_send_to(boost::asio::buffer(send_buf), + *iter.get(), // ... until here. This call may block. + boost::fibers::asio::use_future); + + // Do other things here while the send completes. + + send_length.get(); // Blocks until the send is complete. Throws any errors. + + boost::array recv_buf; + udp::endpoint sender_endpoint; + boost::fibers::future recv_length = + socket.async_receive_from( + boost::asio::buffer(recv_buf), + sender_endpoint, + boost::fibers::asio::use_future); + + // Do other things here while the receive completes. + + std::cout.write( + recv_buf.data(), + recv_length.get()); // Blocks until receive is complete. + } + catch (boost::system::system_error& e) + { + std::cerr << e.what() << std::endl; + } + io_service.stop(); +} + +int main( int argc, char* argv[]) +{ + boost::asio::io_service io_service; + try + { + if (argc != 2) + { + std::cerr << "Usage: daytime_client " << std::endl; + return 1; + } + + boost::fibers::fiber( + boost::bind( get_daytime, + boost::ref( io_service), argv[1]) ).detach(); + + boost::fibers::asio::run_service( io_service); + } + catch ( std::exception& e) + { + std::cerr << e.what() << std::endl; + } + + return 0; +} diff --git a/examples/asio/daytime_client2.cpp b/examples/asio/daytime_client2.cpp new file mode 100644 index 00000000..0460aca9 --- /dev/null +++ b/examples/asio/daytime_client2.cpp @@ -0,0 +1,89 @@ +// +// daytime_client.cpp +// ~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include "loop.hpp" +#include "yield.hpp" + +using boost::asio::ip::udp; + +void get_daytime(boost::asio::io_service& io_service, const char* hostname) +{ + try + { + udp::resolver resolver(io_service); + + udp::resolver::iterator iter = + resolver.async_resolve( + udp::resolver::query( udp::v4(), hostname, "daytime"), + boost::fibers::asio::yield); + + udp::socket socket(io_service, udp::v4()); + + boost::array send_buf = {{ 0 }}; + std::size_t send_length = + socket.async_send_to(boost::asio::buffer(send_buf), + *iter, boost::fibers::asio::yield); + (void)send_length; + + boost::array recv_buf; + udp::endpoint sender_endpoint; + std::size_t recv_length = + socket.async_receive_from( + boost::asio::buffer(recv_buf), + sender_endpoint, + boost::fibers::asio::yield); + + std::cout.write( + recv_buf.data(), + recv_length); + } + catch (boost::system::system_error& e) + { + std::cerr << e.what() << std::endl; + } + io_service.stop(); +} + +int main( int argc, char* argv[]) +{ + boost::asio::io_service io_service; + try + { + if (argc != 2) + { + std::cerr << "Usage: daytime_client " << std::endl; + return 1; + } + + boost::fibers::fiber( + boost::bind( get_daytime, + boost::ref( io_service), argv[1]) ).detach(); + + boost::fibers::asio::run_service( io_service); + } + catch ( std::exception& e) + { + std::cerr << e.what() << std::endl; + } + + return 0; +} diff --git a/examples/asio/detail/promise_handler.hpp b/examples/asio/detail/promise_handler.hpp new file mode 100644 index 00000000..4f612178 --- /dev/null +++ b/examples/asio/detail/promise_handler.hpp @@ -0,0 +1,172 @@ +// +// promise_handler.hpp +// ~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke and Nat Goodspeed +// + +#ifndef BOOST_FIBERS_ASIO_DETAIL_PROMISE_HANDLER_HPP +#define BOOST_FIBERS_ASIO_DETAIL_PROMISE_HANDLER_HPP + +#include + +#include +#include + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { +namespace detail { + +// Completion handler to adapt a promise as a completion handler. +//[fibers_asio_promise_handler_base +template< typename T > +class promise_handler_base { +public: + typedef std::shared_ptr< boost::fibers::promise< T > > promise_ptr; + + // Construct from any promise_completion_token subclass special value. + template< typename Allocator > + promise_handler_base( boost::fibers::asio::promise_completion_token< Allocator > const& pct) : + promise_( std::make_shared< boost::fibers::promise< T > >( + std::allocator_arg, pct.get_allocator() ) ) +//<- + , ecp_( pct.ec_) +//-> + {} + + bool should_set_value( boost::system::error_code const& ec) { + if ( ! ec) { + // whew, success + return true; + } + +//<- + // ec indicates error + if ( ecp_) { + // promise_completion_token bound an error_code variable: set it + * ecp_ = ec; + // This is the odd case: although there's an error, user code + // expressly forbid us to call set_exception(). We've set the + // bound error code -- but future::get() will wait forever unless + // we kick the promise SOMEHOW. Tell subclass to call set_value() + // anyway. + return true; + } +//-> + // no bound error_code: cause promise_ to throw an exception + promise_->set_exception( + std::make_exception_ptr( + boost::system::system_error( ec) ) ); + // caller should NOT call set_value() + return false; + } + + promise_ptr get_promise() const { + return promise_; + } + +private: + promise_ptr promise_; +//<- + boost::system::error_code * ecp_; +//-> +}; +//] + +// generic promise_handler for arbitrary value +//[fibers_asio_promise_handler +template< typename T > +class promise_handler : public promise_handler_base< T > { +private: +//<- + using promise_handler_base< T >::should_set_value; + +//-> +public: + // Construct from any promise_completion_token subclass special value. + template< typename Allocator > + promise_handler( boost::fibers::asio::promise_completion_token< Allocator > const& pct) : + promise_handler_base< T >( pct) { + } + +//<- + void operator()( T t) { + get_promise()->set_value( t); + } +//-> + void operator()( boost::system::error_code const& ec, T t) { + if ( should_set_value( ec) ) { + get_promise()->set_value( t); + } + } +//<- + using typename promise_handler_base< T >::promise_ptr; + using promise_handler_base< T >::get_promise; +//-> +}; +//] + +// specialize promise_handler for void +template<> +class promise_handler< void > : public promise_handler_base< void > { +private: + using promise_handler_base< void >::should_set_value; + +public: + // Construct from any promise_completion_token subclass special value. + template< typename Allocator > + promise_handler( boost::fibers::asio::promise_completion_token< Allocator > const& pct) : + promise_handler_base< void >( pct) { + } + + void operator()() { + get_promise()->set_value(); + } + + void operator()( boost::system::error_code const& ec) { + if ( should_set_value( ec) ) { + get_promise()->set_value(); + } + } + + using promise_handler_base< void >::promise_ptr; + using promise_handler_base< void >::get_promise; +}; + +}}} + +namespace asio { +namespace detail { + +// Specialize asio_handler_invoke hook to ensure that any exceptions thrown +// from the handler are propagated back to the caller via the future. +template< typename Function, typename T > +void asio_handler_invoke( Function f, fibers::asio::detail::promise_handler< T > * h) { + typename fibers::asio::detail::promise_handler< T >::promise_ptr + p( h->get_promise() ); + try { + f(); + } catch (...) { + p->set_exception( std::current_exception() ); + } +} + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_DETAIL_PROMISE_HANDLER_HPP diff --git a/examples/asio/detail/use_future.hpp b/examples/asio/detail/use_future.hpp new file mode 100644 index 00000000..349e2e09 --- /dev/null +++ b/examples/asio/detail/use_future.hpp @@ -0,0 +1,99 @@ +// +// use_future.hpp +// ~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke and Nat Goodspeed +// + +#ifndef BOOST_FIBERS_ASIO_DETAIL_USE_FUTURE_HPP +#define BOOST_FIBERS_ASIO_DETAIL_USE_FUTURE_HPP + +#include +#include + +#include + +#include "promise_handler.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { +namespace detail { + +// use_future_handler is just an alias for promise_handler -- but we must +// distinguish this case to specialize async_result below. +template< typename T > +using use_future_handler = promise_handler< T >; + +}}} + +namespace asio { + +// Handler traits specialisation for use_future_handler. +template< typename T > +class async_result< fibers::asio::detail::use_future_handler< T > > { +public: + // The initiating function will return a future. + typedef boost::fibers::future< T > type; + + // Constructor creates a new promise for the async operation, and obtains the + // corresponding future. + explicit async_result( fibers::asio::detail::use_future_handler< T > & h) { + value_ = h.get_promise()->get_future(); + } + + // Obtain the future to be returned from the initiating function. + type get() { + return boost::move( value_); + } + +private: + type value_; +}; + +// Handler type specialisation for use_future for a nullary callback. +template< typename Allocator, typename ReturnType > +struct handler_type< boost::fibers::asio::use_future_t< Allocator >, ReturnType() > { + typedef fibers::asio::detail::use_future_handler< void > type; +}; + +// Handler type specialisation for use_future for a single-argument callback. +template< typename Allocator, typename ReturnType, typename Arg1 > +struct handler_type< boost::fibers::asio::use_future_t< Allocator >, ReturnType( Arg1) > { + typedef fibers::asio::detail::use_future_handler< Arg1 > type; +}; + +// Handler type specialisation for use_future for a callback passed only +// boost::system::error_code. Note the use of use_future_handler: an +// error_code indicating error will be conveyed to consumer code via +// set_exception(). +template< typename Allocator, typename ReturnType > +struct handler_type< boost::fibers::asio::use_future_t< Allocator >, ReturnType( boost::system::error_code) > { + typedef fibers::asio::detail::use_future_handler< void > type; +}; + +// Handler type specialisation for use_future for a callback passed +// boost::system::error_code plus an arbitrary value. Note the use of a +// single-argument use_future_handler: an error_code indicating error will be +// conveyed to consumer code via set_exception(). +template< typename Allocator, typename ReturnType, typename Arg2 > +struct handler_type< boost::fibers::asio::use_future_t< Allocator >, ReturnType( boost::system::error_code, Arg2) > { + typedef fibers::asio::detail::use_future_handler< Arg2 > type; +}; + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_DETAIL_USE_FUTURE_HPP diff --git a/examples/asio/detail/yield.hpp b/examples/asio/detail/yield.hpp new file mode 100644 index 00000000..f18b1e1a --- /dev/null +++ b/examples/asio/detail/yield.hpp @@ -0,0 +1,109 @@ +// +// yield.hpp +// ~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke and Nat Goodspeed +// + +#ifndef BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP +#define BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP + +#include +#include + +#include + +#include "promise_handler.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { +namespace detail { + +// yield_handler is just an alias for promise_handler -- but we must +// distinguish this case to specialize async_result below. +//[fibers_asio_yield_handler +template< typename T > +using yield_handler = promise_handler< T >; +//] + +}}} + +namespace asio { + +// Handler traits specialisation for yield_handler. +template< typename T > +class async_result< fibers::asio::detail::yield_handler< T > > { +public: + // The initiating function will return a value of type T. + typedef T type; + + // Constructor creates a new promise for the async operation, and obtains the + // corresponding future. + explicit async_result( fibers::asio::detail::yield_handler< T > & h) { + future_ = h.get_promise()->get_future(); + } + + // This blocks the calling fiber until the handler sets either a value or + // an exception. + type get() { + return future_.get(); + } + +private: + fibers::future< T > future_; +}; + +// Handler type specialisation for yield for a nullary callback. +template< typename Allocator, typename ReturnType > +struct handler_type< boost::fibers::asio::yield_t< Allocator >, + ReturnType() > { + typedef boost::fibers::asio::detail::yield_handler< void > type; +}; + +// Handler type specialisation for yield for a single-argument callback. +template< typename Allocator, typename ReturnType, typename Arg1 > +struct handler_type< boost::fibers::asio::yield_t< Allocator >, + ReturnType( Arg1) > { + typedef fibers::asio::detail::yield_handler< Arg1 > type; +}; + +// Handler type specialisation for yield for a callback passed only +// boost::system::error_code. Note the use of yield_handler: an +// error_code indicating error will be conveyed to consumer code via an +// exception. Normal return implies (! error_code). +template< typename Allocator, typename ReturnType > +struct handler_type< boost::fibers::asio::yield_t< Allocator >, + ReturnType( boost::system::error_code) > { + typedef fibers::asio::detail::yield_handler< void > type; +}; + +// Handler type specialisation for yield for a callback passed +// boost::system::error_code plus an arbitrary value. Note the use of a +// single-argument yield_handler: an error_code indicating error will be +// conveyed to consumer code via an exception. Normal return implies (! +// error_code). +//[asio_handler_type +template< typename Allocator, typename ReturnType, typename Arg2 > +struct handler_type< boost::fibers::asio::yield_t< Allocator >, + ReturnType( boost::system::error_code, Arg2) > { + typedef fibers::asio::detail::yield_handler< Arg2 > type; +}; +//] + +}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP diff --git a/examples/asio/echo_client.cpp b/examples/asio/echo_client.cpp new file mode 100644 index 00000000..91dca760 --- /dev/null +++ b/examples/asio/echo_client.cpp @@ -0,0 +1,63 @@ +// +// echo_client.cpp +// ~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// This source is effectively identical to +// http://www.boost.org/doc/libs/release/doc/html/boost_asio/example/cpp03/echo/blocking_tcp_echo_client.cpp +// It does not use Boost.Fiber. It is copied here only for completeness. A +// server needs a client. +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include + +using boost::asio::ip::tcp; + +enum { max_length = 1024 }; + +int main(int argc, char* argv[]) +{ + try + { + if (argc != 3) + { + std::cerr << "Usage: echo_client \n"; + return 1; + } + + boost::asio::io_service io_service; + + tcp::resolver resolver(io_service); + tcp::resolver::query query(tcp::v4(), argv[1], argv[2]); + tcp::resolver::iterator iterator = resolver.resolve(query); + + tcp::socket s(io_service); + boost::asio::connect(s, iterator); + + using namespace std; // For strlen. + std::cout << "Enter message: "; + char request[max_length]; + std::cin.getline(request, max_length); + size_t request_length = strlen(request); + boost::asio::write(s, boost::asio::buffer(request, request_length)); + + char reply[max_length]; + size_t reply_length = boost::asio::read(s, + boost::asio::buffer(reply, request_length)); + std::cout << "Reply is: "; + std::cout.write(reply, reply_length); + std::cout << "\n"; + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return 0; +} diff --git a/examples/asio/echo_client2.cpp b/examples/asio/echo_client2.cpp new file mode 100644 index 00000000..5cee2554 --- /dev/null +++ b/examples/asio/echo_client2.cpp @@ -0,0 +1,68 @@ +// +// echo_client2.cpp +// ~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// This source is almost identical to +// http://www.boost.org/doc/libs/release/doc/html/boost_asio/example/cpp03/echo/blocking_tcp_echo_client.cpp +// save that it deliberately introduces a timeout. +// It does not use Boost.Fiber. It is copied here only for completeness. A +// server needs a client. +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include +#include +#include + +using boost::asio::ip::tcp; + +enum { max_length = 1024 }; + +int main(int argc, char* argv[]) +{ + try + { + if (argc != 3) + { + std::cerr << "Usage: echo_client2 \n"; + return 1; + } + + boost::asio::io_service io_service; + + tcp::resolver resolver(io_service); + tcp::resolver::query query(tcp::v4(), argv[1], argv[2]); + tcp::resolver::iterator iterator = resolver.resolve(query); + + tcp::socket s(io_service); + boost::asio::connect(s, iterator); + + using namespace std; // For strlen. + std::cout << "Enter message: "; + char request[max_length]; + std::cin.getline(request, max_length); + size_t request_length = strlen(request); + boost::asio::write(s, boost::asio::buffer(request, request_length)); + + char reply[max_length]; + size_t reply_length = boost::asio::read(s, + boost::asio::buffer(reply, request_length)); + std::cout << "Reply is: "; + std::cout.write(reply, reply_length); + std::cout << "\nWe block for 10 seconds in order to let session timeout\n"; + boost::this_thread::sleep( boost::posix_time::seconds( 10) ); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return 0; +} diff --git a/examples/asio/echo_server.cpp b/examples/asio/echo_server.cpp new file mode 100644 index 00000000..3b2b134d --- /dev/null +++ b/examples/asio/echo_server.cpp @@ -0,0 +1,101 @@ +// +// echo_server.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// 2013 Oliver Kowalke +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +#include +#include +#include + +#include + +#include "loop.hpp" // run_service() +#include "yield.hpp" + +using boost::asio::ip::tcp; + +const int max_length = 1024; + +typedef boost::shared_ptr< tcp::socket > socket_ptr; + +void session( socket_ptr sock) +{ + try + { + for (;;) + { + char data[max_length]; + + boost::system::error_code ec; + std::size_t length = sock->async_read_some( + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) + break; //connection closed cleanly by peer + else if ( ec) + throw boost::system::system_error( ec); //some other error + + boost::asio::async_write( + * sock, + boost::asio::buffer( data, length), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) + break; //connection closed cleanly by peer + else if ( ec) + throw boost::system::system_error( ec); //some other error + } + } + catch ( std::exception const& e) + { std::cerr << "Exception in fiber: " << e.what() << "\n"; } +} + +void server( boost::asio::io_service & io_service, unsigned short port) +{ + tcp::acceptor a( io_service, tcp::endpoint( tcp::v4(), port) ); + for (;;) + { + socket_ptr socket( new tcp::socket( io_service) ); + boost::system::error_code ec; + std::cout << "wait for accept" << std::endl; + a.async_accept( + * socket, + boost::fibers::asio::yield[ec]); + std::cout << "accepted" << std::endl; + if ( ! ec) { + boost::fibers::fiber( + boost::bind( session, socket) ).detach(); + } + } +} + +int main( int argc, char* argv[]) +{ + try + { + if ( argc != 2) + { + std::cerr << "Usage: echo_server \n"; + return 1; + } + + boost::asio::io_service io_service; + + boost::fibers::fiber( + boost::bind( server, boost::ref( io_service), std::atoi( argv[1]) ) ).detach(); + + boost::fibers::asio::run_service( io_service); + } + catch ( std::exception const& e) + { std::cerr << "Exception: " << e.what() << "\n"; } + + return 0; +} diff --git a/examples/asio/echo_server2.cpp b/examples/asio/echo_server2.cpp new file mode 100644 index 00000000..20d81207 --- /dev/null +++ b/examples/asio/echo_server2.cpp @@ -0,0 +1,137 @@ +// +// echo_server2.cpp +// ~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// 2013 Oliver Kowalke +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// + +#include +#include + +#include +#include +#include +#include + +#include + +#include "loop.hpp" +#include "yield.hpp" + +using boost::asio::ip::tcp; + +const int max_length = 1024; + +class session : public boost::enable_shared_from_this< session > +{ +public: + explicit session( boost::asio::io_service & io_service) : + strand_( io_service), + socket_( io_service), + timer_( io_service) + {} + + tcp::socket& socket() + { return socket_; } + + void go() + { + boost::fibers::fiber( + boost::bind(&session::echo, + shared_from_this())).detach(); + boost::fibers::fiber( + boost::bind(&session::timeout, + shared_from_this())).detach(); + } + +private: + void echo() + { + try + { + char data[max_length]; + for (;;) + { + timer_.expires_from_now( + boost::posix_time::seconds( 3) ); + std::size_t n = socket_.async_read_some( + boost::asio::buffer( data), + boost::fibers::asio::yield); + boost::asio::async_write( + socket_, + boost::asio::buffer( data, n), + boost::fibers::asio::yield); + } + } + catch ( std::exception const& e) + { + socket_.close(); + timer_.cancel(); + } + } + + void timeout() + { + while ( socket_.is_open() ) + { + boost::system::error_code ignored_ec; + timer_.async_wait( boost::fibers::asio::yield[ignored_ec]); + if ( timer_.expires_from_now() <= boost::posix_time::seconds( 0) ) { + std::cout << "session to " << socket_.remote_endpoint() << " timed out" << std::endl; + socket_.close(); + } + } + } + + boost::asio::io_service::strand strand_; + tcp::socket socket_; + boost::asio::deadline_timer timer_; +}; + +void do_accept(boost::asio::io_service& io_service, + unsigned short port) +{ + tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) ); + + for (;;) + { + boost::system::error_code ec; + boost::shared_ptr< session > new_session( new session( io_service) ); + acceptor.async_accept( + new_session->socket(), + boost::fibers::asio::yield[ec]); + if ( ! ec) { + boost::fibers::fiber( boost::bind( & session::go, new_session) ).detach(); + } + } +} + +int main( int argc, char* argv[]) +{ + try + { + if ( argc != 2) + { + std::cerr << "Usage: echo_server \n"; + return 1; + } + + boost::asio::io_service io_service; + + using namespace std; // For atoi. + boost::fibers::fiber( + boost::bind( do_accept, + boost::ref( io_service), atoi( argv[1])) ).detach(); + + boost::fibers::asio::run_service( io_service); + } + catch ( std::exception const& e) + { std::cerr << "Exception: " << e.what() << "\n"; } + + return 0; +} diff --git a/examples/asio/loop.hpp b/examples/asio/loop.hpp new file mode 100644 index 00000000..be8538ab --- /dev/null +++ b/examples/asio/loop.hpp @@ -0,0 +1,43 @@ + +// Copyright Eugene Yakubovich 2014. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include + +#include +#include +#include + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +//[timer_handler +inline void timer_handler( boost::asio::high_resolution_timer & timer) { + boost::this_fiber::yield(); + timer.expires_from_now( boost::fibers::wait_interval() ); + timer.async_wait( std::bind( timer_handler, std::ref( timer) ) ); +} +//] + +//[run_service +inline void run_service( boost::asio::io_service & io_service) { + boost::asio::high_resolution_timer timer( io_service, std::chrono::seconds(0) ); + timer.async_wait( std::bind( timer_handler, std::ref( timer) ) ); + io_service.run(); +} +//] + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif diff --git a/examples/asio/promise_completion_token.hpp b/examples/asio/promise_completion_token.hpp new file mode 100644 index 00000000..e8e9a590 --- /dev/null +++ b/examples/asio/promise_completion_token.hpp @@ -0,0 +1,88 @@ +// +// promise_completion_token.hpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke and Nat Goodspeed +// + +#ifndef BOOST_FIBERS_ASIO_PROMISE_COMPLETION_TOKEN_HPP +#define BOOST_FIBERS_ASIO_PROMISE_COMPLETION_TOKEN_HPP + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +/// Common base class for yield_t and use_future_t. See also yield.hpp and +/// use_future.hpp. +/** + * The awkward name of this class is because it's not intended to be used + * directly in user code: it's the common base class for a couple of user- + * facing placeholder classes yield_t and use_future_t. They + * share a common handler class promise_handler. + * + * Each subclass (e.g. use_future_t) has a canonical instance + * (use_future). These may be used in the following ways as a + * Boost.Asio asynchronous operation completion token: + * + *
+ *
boost::fibers::asio::use_future
+ *
This is the canonical instance of use_future_t, provided + * solely for convenience. It causes promise_handler to allocate its + * internal boost::fibers::promise using a default-constructed + * default allocator (std::allocator).
+ *
boost::fibers::asio::use_future::with(alloc_instance)
+ *
This usage specifies an alternate allocator instance + * alloc_instance. It causes promise_handler to allocate its + * internal boost::fibers::promise using the specified + * allocator.
+ *
+ */ +//[fibers_asio_promise_completion_token +template< typename Allocator > +class promise_completion_token { +public: + typedef Allocator allocator_type; + + /// Construct using default-constructed allocator. + BOOST_CONSTEXPR promise_completion_token() : + ec_( nullptr) { + } + + /// Construct using specified allocator. + explicit promise_completion_token( Allocator const& allocator) : + ec_( nullptr), + allocator_( allocator) { + } + + /// Obtain allocator. + allocator_type get_allocator() const { + return allocator_; + } + +//private: + // used by some subclasses to bind an error_code to suppress exceptions + boost::system::error_code * ec_; + +private: + Allocator allocator_; +}; +//] + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_PROMISE_COMPLETION_TOKEN_HPP diff --git a/examples/asio/publish_subscribe/publisher.cpp b/examples/asio/publish_subscribe/publisher.cpp new file mode 100644 index 00000000..6ed39745 --- /dev/null +++ b/examples/asio/publish_subscribe/publisher.cpp @@ -0,0 +1,62 @@ +// +// blocking_tcp_echo_client.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include +#include +#include + +using boost::asio::ip::tcp; + +enum { max_length = 1024 }; + +int main(int argc, char* argv[]) +{ + try + { + if (argc != 3) + { + std::cerr << "Usage: publisher \n"; + return 1; + } + + boost::asio::io_service io_service; + + tcp::resolver resolver(io_service); + tcp::resolver::query query(tcp::v4(), argv[1], "9997"); + tcp::resolver::iterator iterator = resolver.resolve(query); + + tcp::socket s(io_service); + boost::asio::connect(s, iterator); + + char msg[max_length]; + std::string channel(argv[2]); + std::memset(msg, '\0', max_length); + std::memcpy(msg, channel.c_str(), channel.size() ); + boost::asio::write(s, boost::asio::buffer(msg, max_length)); + + for (;;) + { + std::cout << "publish: "; + char request[max_length]; + std::cin.getline(request, max_length); + boost::asio::write(s, boost::asio::buffer(request, max_length)); + } + } + catch ( std::exception const& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return 0; +} diff --git a/examples/asio/publish_subscribe/server.cpp b/examples/asio/publish_subscribe/server.cpp new file mode 100644 index 00000000..8c92b7d8 --- /dev/null +++ b/examples/asio/publish_subscribe/server.cpp @@ -0,0 +1,421 @@ +// +// server.cpp +// ~~~~~~~~~~~~~~~ +// +// Copyright (c) 2013 Oliver Kowalke +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "../loop.hpp" +#include "../yield.hpp" + +using boost::asio::ip::tcp; + +const std::size_t max_length = 1024; + +class subscriber_session; +typedef boost::shared_ptr subscriber_session_ptr; + +// a channel has n subscribers (subscriptions) +// this class holds a list of subcribers for one channel +class subscriptions +{ +public: + ~subscriptions(); + + // subscribe to this channel + void subscribe( subscriber_session_ptr const& s) + { subscribers_.insert( s); } + + // unsubscribe from this channel + void unsubscribe( subscriber_session_ptr const& s) + { subscribers_.erase(s); } + + // publish a message, e.g. push this message to all subscribers + void publish( std::string const& msg); + +private: + // list of subscribers + std::set subscribers_; +}; + +// a class to register channels and to subsribe clients to this channels +class registry : private boost::noncopyable +{ +private: + typedef std::map< std::string, boost::shared_ptr< subscriptions > > channels_cont; + typedef channels_cont::iterator channels_iter; + + boost::fibers::mutex mtx_; + channels_cont channels_; + + void register_channel_( std::string const& channel) + { + if ( channels_.end() != channels_.find( channel) ) + throw std::runtime_error("channel already exists"); + channels_[channel] = boost::make_shared< subscriptions >(); + std::cout << "new channel '" << channel << "' registered" << std::endl; + } + + void unregister_channel_( std::string const& channel) + { + channels_.erase( channel); + std::cout << "channel '" << channel << "' unregistered" << std::endl; + } + + void subscribe_( std::string const& channel, subscriber_session_ptr s) + { + channels_iter iter = channels_.find( channel); + if ( channels_.end() == iter ) + throw std::runtime_error("channel does not exist"); + iter->second->subscribe( s); + std::cout << "new subscription to channel '" << channel << "'" << std::endl; + } + + void unsubscribe_( std::string const& channel, subscriber_session_ptr s) + { + channels_iter iter = channels_.find( channel); + if ( channels_.end() != iter ) + iter->second->unsubscribe( s); + } + + void publish_( std::string const& channel, std::string const& msg) + { + channels_iter iter = channels_.find( channel); + if ( channels_.end() == iter ) + throw std::runtime_error("channel does not exist"); + iter->second->publish( msg); + std::cout << "message '" << msg << "' to publish on channel '" << channel << "'" << std::endl; + } + +public: + // add a channel to registry + void register_channel( std::string const& channel) + { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + register_channel_( channel); + } + + // remove a channel from registry + void unregister_channel( std::string const& channel) + { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + unregister_channel_( channel); + } + + // subscribe to a channel + void subscribe( std::string const& channel, subscriber_session_ptr s) + { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + subscribe_( channel, s); + } + + // unsubscribe from a channel + void unsubscribe( std::string const& channel, subscriber_session_ptr s) + { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + unsubscribe_( channel, s); + } + + // publish a message to all subscribers registerd to the channel + void publish( std::string const& channel, std::string const& msg) + { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + publish_( channel, msg); + } +}; + +// a subscriber subscribes to a given channel in order to receive messages published on this channel +class subscriber_session : public boost::enable_shared_from_this< subscriber_session > +{ +public: + explicit subscriber_session( boost::asio::io_service & io_service, registry & reg) : + socket_( io_service), + reg_( reg) + {} + + tcp::socket& socket() + { return socket_; } + + // this function is executed inside the fiber + void run() + { + std::string channel; + try + { + boost::system::error_code ec; + + // read first message == channel name + // async_ready() returns if the the complete message is read + // until this the fiber is suspended until the complete message + // is read int the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data_), + boost::fibers::asio::yield[ec]); + if ( ec) throw std::runtime_error("no channel from subscriber"); + // first message ist equal to the channel name the publisher + // publishes to + channel = data_; + + // subscribe to new channel + reg_.subscribe( channel, shared_from_this() ); + + // read published messages + for (;;) + { + // wait for a conditon-variable for new messages + // the fiber will be suspended until the condtion + // becomes true and the fiber is resumed + // published message is stored in buffer 'data_' + std::unique_lock< boost::fibers::mutex > lk( mtx_); + cond_.wait( lk); + std::string data( data_); + lk.unlock(); + std::cout << "subscriber::run(): '" << data << std::endl; + + // message '' terminates subscription + if ( "" == data) break; + + // async. write message to socket connected with + // subscriber + // async_write() returns if the complete message was writen + // the fiber is suspended in the meanwhile + boost::asio::async_write( + socket_, + boost::asio::buffer( data, data.size() ), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) + break; //connection closed cleanly by peer + else if ( ec) + throw boost::system::system_error( ec); //some other error + std::cout << "subscriber::run(): '" << data << " written" << std::endl; + } + } + catch ( std::exception const& e) + { std::cerr << "subscriber [" << channel << "] failed: " << e.what() << std::endl; } + + // close socket + socket_.close(); + // unregister channel + reg_.unsubscribe( channel, shared_from_this() ); + } + + // called from publisher_session (running in other fiber) + void publish( std::string const& msg) + { + std::unique_lock< boost::fibers::mutex > lk( mtx_); + std::memset(data_, '\0', sizeof( data_)); + std::memcpy(data_, msg.c_str(), (std::min)(max_length, msg.size())); + cond_.notify_one(); + } + +private: + tcp::socket socket_; + registry & reg_; + boost::fibers::mutex mtx_; + boost::fibers::condition_variable cond_; + // fixed size message + char data_[max_length]; +}; + + +subscriptions::~subscriptions() +{ + BOOST_FOREACH( subscriber_session_ptr s, subscribers_) + { s->publish(""); } +} + +void +subscriptions::publish( std::string const& msg) +{ + BOOST_FOREACH( subscriber_session_ptr s, subscribers_) + { s->publish( msg); } +} + +// a publisher publishes messages on its channel +// subscriber might register to this channel to get the published messages +class publisher_session : public boost::enable_shared_from_this< publisher_session > +{ +public: + explicit publisher_session( boost::asio::io_service & io_service, registry & reg) : + socket_( io_service), + reg_( reg) + {} + + tcp::socket& socket() + { return socket_; } + + // this function is executed inside the fiber + void run() + { + std::string channel; + try + { + boost::system::error_code ec; + + // fixed size message + char data[max_length]; + + // read first message == channel name + // async_ready() returns if the the complete message is read + // until this the fiber is suspended until the complete message + // is read int the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec) throw std::runtime_error("no channel from publisher"); + // first message ist equal to the channel name the publisher + // publishes to + channel = data; + + // register the new channel + reg_.register_channel( channel); + + // start publishing messages + for (;;) + { + // read message from publisher asyncronous + // async_read() suspends this fiber until the complete emssage is read + // and stored in the given buffer 'data' + boost::asio::async_read( + socket_, + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) + break; //connection closed cleanly by peer + else if ( ec) + throw boost::system::system_error( ec); //some other error + + // publish message to all subscribers + reg_.publish( channel, std::string( data) ); + } + } + catch ( std::exception const& e) + { std::cerr << "publisher [" << channel << "] failed: " << e.what() << std::endl; } + + // close socket + socket_.close(); + // unregister channel + reg_.unregister_channel( channel); + } + +private: + tcp::socket socket_; + registry & reg_; +}; + +typedef boost::shared_ptr< publisher_session > publisher_session_ptr; + +// function accepts connections requests from clients acting as a publisher +void accept_publisher( boost::asio::io_service& io_service, + unsigned short port, + registry & reg) +{ + // create TCP-acceptor + tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) ); + + // loop for accepting connection requests + for (;;) + { + boost::system::error_code ec; + // create new publisher-session + // this instance will be associated with one publisher + publisher_session_ptr new_publisher_session = + boost::make_shared( boost::ref( io_service), boost::ref( reg) ); + // async. accept of new connection request + // this function will suspend this execution context (fiber) until a + // connection was established, after returning from this function a new client (publisher) + // is connected + acceptor.async_accept( + new_publisher_session->socket(), + boost::fibers::asio::yield[ec]); + if ( ! ec) { + // run the new publisher in its own fiber (one fiber for one client) + boost::fibers::fiber( + boost::bind( & publisher_session::run, new_publisher_session) ).detach(); + } + } +} + +// function accepts connections requests from clients acting as a subscriber +void accept_subscriber( boost::asio::io_service& io_service, + unsigned short port, + registry & reg) +{ + // create TCP-acceptor + tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) ); + + // loop for accepting connection requests + for (;;) + { + boost::system::error_code ec; + // create new subscriber-session + // this instance will be associated with one subscriber + subscriber_session_ptr new_subscriber_session = + boost::make_shared( boost::ref( io_service), boost::ref( reg) ); + // async. accept of new connection request + // this function will suspend this execution context (fiber) until a + // connection was established, after returning from this function a new client (subscriber) + // is connected + acceptor.async_accept( + new_subscriber_session->socket(), + boost::fibers::asio::yield[ec]); + if ( ! ec) { + // run the new subscriber in its own fiber (one fiber for one client) + boost::fibers::fiber( + boost::bind( & subscriber_session::run, new_subscriber_session) ).detach(); + } + } +} + + +int main( int argc, char* argv[]) +{ + try + { + // create io_service for async. I/O + boost::asio::io_service io_service; + + // registry for channels and its subscription + registry reg; + + // create an acceptor for publishers, run it as fiber + boost::fibers::fiber( + boost::bind( accept_publisher, + boost::ref( io_service), 9997, boost::ref( reg)) ).detach(); + + // create an acceptor for subscribers, run it as fiber + boost::fibers::fiber( + boost::bind( accept_subscriber, + boost::ref( io_service), 9998, boost::ref( reg)) ).detach(); + + boost::fibers::asio::run_service( io_service); + } + catch ( std::exception const& e) + { std::cerr << "Exception: " << e.what() << "\n"; } + + return 0; +} diff --git a/examples/asio/publish_subscribe/subscriber.cpp b/examples/asio/publish_subscribe/subscriber.cpp new file mode 100644 index 00000000..1362730a --- /dev/null +++ b/examples/asio/publish_subscribe/subscriber.cpp @@ -0,0 +1,64 @@ +// +// blocking_tcp_echo_client.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include + +#include +#include +#include + +using boost::asio::ip::tcp; + +enum { max_length = 1024 }; + +int main(int argc, char* argv[]) +{ + try + { + if (argc != 3) + { + std::cerr << "Usage: subscriber \n"; + return 1; + } + + boost::asio::io_service io_service; + + tcp::resolver resolver(io_service); + tcp::resolver::query query(tcp::v4(), argv[1], "9998"); + tcp::resolver::iterator iterator = resolver.resolve(query); + + tcp::socket s(io_service); + boost::asio::connect(s, iterator); + + char msg[max_length]; + std::string channel(argv[2]); + std::memset(msg, '\0', max_length); + std::memcpy(msg, channel.c_str(), channel.size() ); + boost::asio::write(s, boost::asio::buffer(msg, max_length)); + + for (;;) + { + char reply[max_length]; + size_t reply_length = s.read_some( + boost::asio::buffer(reply, max_length)); + std::cout << "published: "; + std::cout.write(reply, reply_length); + std::cout << std::endl; + } + } + catch (std::exception const& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return 0; +} diff --git a/examples/asio/use_future.hpp b/examples/asio/use_future.hpp new file mode 100644 index 00000000..66c10b0a --- /dev/null +++ b/examples/asio/use_future.hpp @@ -0,0 +1,74 @@ +// +// use_future.hpp +// ~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke and Nat Goodspeed +// + +#ifndef BOOST_FIBERS_ASIO_USE_FUTURE_HPP +#define BOOST_FIBERS_ASIO_USE_FUTURE_HPP + +#include // std::allocator +#include +#include "promise_completion_token.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +/// Class used to specify that a Boost.Asio asynchronous operation should +/// return a future. +/** + * The use_future_t class is used to indicate that a Boost.Asio asynchronous + * operation should return a boost::fibers::future object. A use_future_t + * object may be passed as a handler to an asynchronous operation, typically + * using the special value @c boost::fibers::asio::use_future. For example: + * + * @code boost::fibers::future my_future + * = my_socket.async_read_some(my_buffer, boost::fibers::asio::use_future); @endcode + * + * The initiating function (async_read_some in the above example) returns a + * future that will receive the result of the operation. If the operation + * completes with an error_code indicating failure, it is converted into a + * system_error and passed back to the caller via the future. + */ +template< typename Allocator = std::allocator< void > > +class use_future_t : public promise_completion_token< Allocator > { +public: + /// Construct using default-constructed allocator. + BOOST_CONSTEXPR use_future_t() { + } + + /// Construct using specified allocator. + explicit use_future_t( Allocator const& allocator) : + promise_completion_token( allocator) { + } + + /// Specify an alternate allocator. + template< typename OtherAllocator > + use_future_t< OtherAllocator > operator[]( OtherAllocator const& allocator) const { + return use_future_t< OtherAllocator >( allocator); + } +}; + +/// A special value, similar to std::nothrow. +BOOST_CONSTEXPR_OR_CONST use_future_t<> use_future; + +}}} // namespace asio + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#include "detail/use_future.hpp" + +#endif // BOOST_FIBERS_ASIO_USE_FUTURE_HPP diff --git a/examples/asio/yield.hpp b/examples/asio/yield.hpp new file mode 100644 index 00000000..1e32e626 --- /dev/null +++ b/examples/asio/yield.hpp @@ -0,0 +1,127 @@ +// +// yield.hpp +// ~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke and Nat Goodspeed +// + +#ifndef BOOST_FIBERS_ASIO_YIELD_HPP +#define BOOST_FIBERS_ASIO_YIELD_HPP + +#include // std::allocator +#include +#include "promise_completion_token.hpp" + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +/// Class used to specify that a Boost.Asio asynchronous operation should +/// suspend the calling fiber until completion. +/** + * The yield_t class is used to indicate that a Boost.Asio asynchronous + * operation should suspend the calling fiber until its completion. The + * asynchronous function will either return a suitable value, or will throw an + * exception indicating the error. A yield_t object may be passed as a handler + * to an asynchronous operation, typically using the special value @c + * boost::fibers::asio::yield. For example: + * + * @code std::size_t length_read + * = my_socket.async_read_some(my_buffer, boost::fibers::asio::yield); @endcode + * + * The initiating function (async_read_some in the above example) does not + * return to the calling fiber until the asynchronous read has completed. Like + * its synchronous counterpart, it returns the result of the operation. If the + * operation completes with an error_code indicating failure, it is converted + * into a system_error and thrown as an exception. + * + * To suppress a possible error exception: + * @code + * boost::system::error_code ec; + * std::size_t length_read = + * my_socket.async_read_some(my_buffer, boost::fibers::asio::yield[ec]); + * // test ec for success + * @endcode + * + * The crucial distinction between + * @code + * std::size_t length_read = my_socket.read_some(my_buffer); + * @endcode + * and + * @code + * std::size_t length_read = + * my_socket.async_read_some(my_buffer, boost::fibers::asio::yield); + * @code + * is that read_some() blocks the entire calling @em thread, whereas + * async_read_some(..., boost::fibers::asio::yield) blocks only the + * calling @em fiber, permitting other fibers on the same thread to continue + * running. + * + * To specify an alternate allocator for the internal + * boost::fibers::promise: + * @code + * boost::fibers::asio::yield.with(alloc_instance) + * @endcode + * + * To bind a boost::system::error_code @a ec as well as using an + * alternate allocator: + * @code + * boost::fibers::asio::yield.with(alloc_instance)[ec] + * @endcode + */ +//[fibers_asio_yield_t +template< typename Allocator = std::allocator< void > > +class yield_t : public promise_completion_token< Allocator > { +public: + /// Construct with default-constructed allocator. + BOOST_CONSTEXPR yield_t() { + } +/*= // ... ways to use an alternate allocator or bind an error_code ...*/ +/*=};*/ +//] + + /// Construct using specified allocator. + explicit yield_t( Allocator const& allocator) : + promise_completion_token< Allocator >( allocator) { + } + + /// Specify an alternate allocator. + template< typename OtherAllocator > + yield_t< OtherAllocator > with( OtherAllocator const& allocator) const { + return yield_t< OtherAllocator >( allocator); + } + + /// Bind an error_code to suppress error exception. + yield_t operator[]( boost::system::error_code & ec) const { + // Return a copy because typical usage will be on our canonical + // instance. Don't leave the canonical instance with a dangling + // binding to a transient error_code! + yield_t tmp; + tmp.ec_ = & ec; + return tmp; + } +}; + +//[fibers_asio_yield +/// A special value, similar to std::nothrow. +BOOST_CONSTEXPR_OR_CONST yield_t<> yield; +//] + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#include "detail/yield.hpp" + +#endif // BOOST_FIBERS_ASIO_YIELD_HPP diff --git a/examples/priority.cpp b/examples/priority.cpp new file mode 100644 index 00000000..0e58f2f5 --- /dev/null +++ b/examples/priority.cpp @@ -0,0 +1,337 @@ +// Copyright Nat Goodspeed 2014. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +#include + +#include +#include +#include + +class Verbose: public boost::noncopyable { +public: + Verbose( std::string const& d, std::string const& s="stop") : + desc( d), + stop( s) { + std::cout << desc << " start" << std::endl; + } + + ~Verbose() { + std::cout << desc << ' ' << stop << std::endl; + } + +private: + std::string desc; + std::string stop; +}; + +//[priority_props +class priority_props : public boost::fibers::fiber_properties { +public: + priority_props( boost::fibers::context * ctx): + fiber_properties( ctx), /*< Your subclass constructor must accept a + [^[class_link context]*] and pass it to + the `fiber_properties` constructor. >*/ + priority_( 0) { + } + + int get_priority() const { + return priority_; /*< Provide read access methods at your own discretion. >*/ + } + + // Call this method to alter priority, because we must notify + // priority_scheduler of any change. + void set_priority( int p) { + /*< It's important to call notify() on any + change in a property that can affect the + scheduler's behavior. Therefore, such + modifications should only be performed + through an access method. >*/ + // Of course, it's only worth reshuffling the queue and all if we're + // actually changing the priority. + if ( p != priority_) { + priority_ = p; + notify(); + } + } + + // The fiber name of course is solely for purposes of this example + // program; it has nothing to do with implementing scheduler priority. + // This is a public data member -- not requiring set/get access methods -- + // because we need not inform the scheduler of any change. + std::string name; /*< A property that does not affect the scheduler does + not need access methods. >*/ +private: + int priority_; +}; +//] + +//[priority_scheduler +class priority_scheduler : public boost::fibers::sched_algorithm_with_properties< priority_props > { +private: + typedef boost::fibers::scheduler::ready_queue_t rqueue_t; + + rqueue_t rqueue_; + +public: + priority_scheduler() : + rqueue_() { + } + + // For a subclass of sched_algorithm_with_properties<>, it's important to + // override the correct awakened() overload. + /*<< You must override the [member_link sched_algorithm_with_properties..awakened] + method. This is how your scheduler receives notification of a + fiber that has become ready to run. >>*/ + virtual void awakened( boost::fibers::context * ctx, priority_props & props) { + int f_priority = props.get_priority(); /*< `props` is the instance of + priority_props associated + with the passed fiber `f`. >*/ + // With this scheduler, fibers with higher priority values are + // preferred over fibers with lower priority values. But fibers with + // equal priority values are processed in round-robin fashion. So when + // we're handed a new context*, put it at the end of the fibers + // with that same priority. In other words: search for the first fiber + // in the queue with LOWER priority, and insert before that one. + rqueue_t::iterator i( rqueue_.begin() ), e( rqueue_.end() ); + for ( ; i != e; ++i) { + if ( properties( & ( * i) ).get_priority() < f_priority) { + break; + } + } + // Now, whether or not we found a fiber with lower priority, + // insert this new fiber here. + rqueue_.insert( i, * ctx); +//<- + + std::cout << "awakened(" << props.name << "): "; + describe_ready_queue(); +//-> + } + + /*<< You must override the [member_link sched_algorithm_with_properties..pick_next] + method. This is how your scheduler actually advises the fiber manager + of the next fiber to run. >>*/ + virtual boost::fibers::context * pick_next() { + // if ready queue is empty, just tell caller + if ( rqueue_.empty() ) { + return nullptr; + } + boost::fibers::context * ctx( & rqueue_.front() ); + rqueue_.pop_front(); +//<- + std::cout << "pick_next() resuming " << properties( ctx).name << ": "; + describe_ready_queue(); +//-> + return ctx; + } + + /*<< You must override [member_link sched_algorithm_with_properties..ready_fibers] + to inform the fiber manager of the size of your ready queue. >>*/ + virtual bool has_ready_fibers() const noexcept { + return ! rqueue_.empty(); + } + + /*<< Overriding [member_link sched_algorithm_with_properties..property_change] + is optional. This override handles the case in which the running + fiber changes the priority of another ready fiber: a fiber already in + our queue. In that case, move the updated fiber within the queue. >>*/ + virtual void property_change( boost::fibers::context * ctx, priority_props & props) { + // Although our priority_props class defines multiple properties, only + // one of them (priority) actually calls notify() when changed. The + // point of a property_change() override is to reshuffle the ready + // queue according to the updated priority value. +//<- + std::cout << "property_change(" << props.name << '(' << props.get_priority() + << ")): "; +//-> + + // Find 'f' in the queue. Note that it might not be in our queue at + // all, if caller is changing the priority of (say) the running fiber. + bool found = false; + rqueue_t::iterator e( rqueue_.end() ); + for ( rqueue_t::iterator i( rqueue_.begin() ); i != e; ++i) { + if ( & ( * i) == ctx) { + // found the passed fiber in our list -- unlink it + found = true; + rqueue_.erase( i); + break; + } + } + + // It's possible to get a property_change() call for a fiber that is + // not on our ready queue. If it's not there, no need to move it: + // we'll handle it next time it hits awakened(). + if ( ! found) { + /*< Your `property_change()` override must be able to + handle the case in which the passed `f` is not in + your ready queue. It might be running, or it might be + blocked. >*/ +//<- + // hopefully user will distinguish this case by noticing that + // the fiber with which we were called does not appear in the + // ready queue at all + describe_ready_queue(); +//-> + return; + } + + // Here we know that f was in our ready queue, but we've unlinked it. + // We happen to have a method that will (re-)add a context* to + // the ready queue. + awakened( ctx, props); + } +//<- + + void describe_ready_queue() { + if ( rqueue_.empty() ) { + std::cout << "[empty]"; + } else { + const char * delim = ""; + for ( boost::fibers::context & f : rqueue_) { + priority_props & props( properties( & f) ); + std::cout << delim << props.name << '(' << props.get_priority() << ')'; + delim = ", "; + } + } + std::cout << std::endl; + } +//-> +}; +//] + +//[launch +template< typename Fn > +boost::fibers::fiber launch( Fn && func, std::string const& name, int priority) { + boost::fibers::fiber fiber( func); + priority_props & props( fiber.properties< priority_props >() ); + props.name = name; + props.set_priority( priority); + return fiber; +} +//] + +void yield_fn() { + std::string name( boost::this_fiber::properties< priority_props >().name); + Verbose v( std::string("fiber ") + name); + for ( int i = 0; i < 3; ++i) { + std::cout << "fiber " << name << " yielding" << std::endl; + boost::this_fiber::yield(); + } +} + +void barrier_fn( boost::fibers::barrier & barrier) { + std::string name( boost::this_fiber::properties< priority_props >().name); + Verbose v( std::string("fiber ") + name); + std::cout << "fiber " << name << " waiting on barrier" << std::endl; + barrier.wait(); + std::cout << "fiber " << name << " yielding" << std::endl; + boost::this_fiber::yield(); +} + +//[change_fn +void change_fn( boost::fibers::fiber & other, + int other_priority, + boost::fibers::barrier& barrier) { + std::string name( boost::this_fiber::properties< priority_props >().name); + Verbose v( std::string("fiber ") + name); + +//<- + std::cout << "fiber " << name << " waiting on barrier" << std::endl; +//-> + barrier.wait(); + // We assume a couple things about 'other': + // - that it was also waiting on the same barrier + // - that it has lower priority than this fiber. + // If both are true, 'other' is now ready to run but is sitting in + // priority_scheduler's ready queue. Change its priority. + priority_props & other_props( + other.properties< priority_props >() ); +//<- + std::cout << "fiber " << name << " changing priority of " << other_props.name + << " to " << other_priority << std::endl; +//-> + other_props.set_priority( other_priority); +} +//] + +//[main +int main( int argc, char *argv[]) { + // make sure we use our priority_scheduler rather than default round_robin + boost::fibers::use_scheduling_algorithm< priority_scheduler >(); +/*= ...*/ +/*=}*/ +//] + Verbose v("main()"); + + // for clarity + std::cout << "main() setting name" << std::endl; +//[main_name + boost::this_fiber::properties< priority_props >().name = "main"; +//] + std::cout << "main() running tests" << std::endl; + + { + Verbose v("high-priority first", "stop\n"); + // verify that high-priority fiber always gets scheduled first + boost::fibers::fiber low( launch( yield_fn, "low", 1) ); + boost::fibers::fiber med( launch( yield_fn, "medium", 2) ); + boost::fibers::fiber hi( launch( yield_fn, "high", 3) ); + std::cout << "main: high.join()" << std::endl; + hi.join(); + std::cout << "main: medium.join()" << std::endl; + med.join(); + std::cout << "main: low.join()" << std::endl; + low.join(); + } + + { + Verbose v("same priority round-robin", "stop\n"); + // fibers of same priority are scheduled in round-robin order + boost::fibers::fiber a( launch( yield_fn, "a", 0) ); + boost::fibers::fiber b( launch( yield_fn, "b", 0) ); + boost::fibers::fiber c( launch( yield_fn, "c", 0) ); + std::cout << "main: a.join()" << std::endl; + a.join(); + std::cout << "main: b.join()" << std::endl; + b.join(); + std::cout << "main: c.join()" << std::endl; + c.join(); + } + + { + Verbose v("barrier wakes up all", "stop\n"); + // using a barrier wakes up all waiting fibers at the same time + boost::fibers::barrier barrier( 3); + boost::fibers::fiber low( launch( [&barrier](){ barrier_fn( barrier); }, "low", 1) ); + boost::fibers::fiber med( launch( [&barrier](){ barrier_fn( barrier); }, "medium", 2) ); + boost::fibers::fiber hi( launch( [&barrier](){ barrier_fn( barrier); }, "high", 3) ); + std::cout << "main: low.join()" << std::endl; + low.join(); + std::cout << "main: medium.join()" << std::endl; + med.join(); + std::cout << "main: high.join()" << std::endl; + hi.join(); + } + + { + Verbose v("change priority", "stop\n"); + // change priority of a fiber in priority_scheduler's ready queue + boost::fibers::barrier barrier( 3); + boost::fibers::fiber c( launch( [&barrier](){ barrier_fn( barrier); }, "c", 1) ); + boost::fibers::fiber a( launch( [&c,&barrier]() { change_fn( c, 3, barrier); }, "a", 3) ); + boost::fibers::fiber b( launch( [&barrier](){ barrier_fn( barrier); }, "b", 2) ); + std::cout << "main: a.join()" << std::endl; + std::cout << "main: a.join()" << std::endl; + a.join(); + std::cout << "main: b.join()" << std::endl; + b.join(); + std::cout << "main: c.join()" << std::endl; + c.join(); + } + + std::cout << "done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/examples/wait_stuff.cpp b/examples/wait_stuff.cpp new file mode 100644 index 00000000..df11a84d --- /dev/null +++ b/examples/wait_stuff.cpp @@ -0,0 +1,1006 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include // std::result_of +#include +#include +#include // std::shared_ptr +#include +#include +#include + +// These are wait_something() functions rather than when_something() +// functions. A big part of the point of the Fiber library is to model +// sequencing using the processor's instruction pointer rather than chains of +// callbacks. The future-oriented when_all() / when_any() functions are still +// based on chains of callbacks. With Fiber, we can do better. + +/***************************************************************************** +* Verbose +*****************************************************************************/ +class Verbose: boost::noncopyable { +public: + Verbose( std::string const& d): + desc( d) { + std::cout << desc << " start" << std::endl; + } + + ~Verbose() { + std::cout << desc << " stop" << std::endl; + } + +private: + const std::string desc; +}; + +/***************************************************************************** +* Runner and Example +*****************************************************************************/ +// collect and ultimately run every Example +class Runner { + typedef std::vector< std::pair< std::string, std::function< void() > > > function_list; + +public: + void add( std::string const& desc, std::function< void() > const& func) { + functions_.push_back( function_list::value_type( desc, func) ); + } + + void run() { + for ( function_list::value_type const& pair : functions_) { + Verbose v( pair.first); + pair.second(); + } + } + +private: + function_list functions_; +}; + +Runner runner; + +// Example allows us to embed Runner::add() calls at module scope +struct Example { + Example( Runner & runner, std::string const& desc, std::function< void() > const& func) { + runner.add( desc, func); + } +}; + +/***************************************************************************** +* example task functions +*****************************************************************************/ +//[wait_sleeper +template< typename T > +T sleeper_impl( T item, int ms, bool thrw = false) { + std::ostringstream descb, funcb; + descb << item; + std::string desc( descb.str() ); + funcb << " sleeper(" << item << ")"; + Verbose v( funcb.str() ); + + boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) ); + if ( thrw) { + throw std::runtime_error( desc); + } + return item; +} +//] + +inline +std::string sleeper( std::string const& item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +inline +double sleeper( double item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +inline +int sleeper(int item, int ms, bool thrw = false) { + return sleeper_impl( item, ms, thrw); +} + +/***************************************************************************** +* Done +*****************************************************************************/ +//[wait_done +// Wrap canonical pattern for condition_variable + bool flag +struct Done { +private: + boost::fibers::condition_variable cond; + boost::fibers::mutex mutex; + bool ready = false; + +public: + typedef std::shared_ptr< Done > ptr; + + void wait() { + std::unique_lock< boost::fibers::mutex > lock( mutex); + while ( ! ready) { + cond.wait( lock); + } + } + + void notify() { + { + std::unique_lock< boost::fibers::mutex > lock( mutex); + ready = true; + } // release mutex + cond.notify_one(); + } +}; +//] + +/***************************************************************************** +* when_any, simple completion +*****************************************************************************/ +//[wait_first_simple_impl +// Degenerate case: when there are no functions to wait for, return +// immediately. +void wait_first_simple_impl( Done::ptr) { +} + +// When there's at least one function to wait for, launch it and recur to +// process the rest. +template< typename Fn, typename ... Fns > +void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) { + boost::fibers::fiber( [done, function](){ + function(); + done->notify(); + }).detach(); + wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); +} +//] + +// interface function: instantiate Done, launch tasks, wait for Done +//[wait_first_simple +template< typename ... Fns > +void wait_first_simple( Fns && ... functions) { + // Use shared_ptr because each function's fiber will bind it separately, + // and we're going to return before the last of them completes. + auto done( std::make_shared< Done >() ); + wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); + done->wait(); +} +//] + +// example usage +Example wfs( runner, "wait_first_simple()", [](){ +//[wait_first_simple_ex + wait_first_simple( + [](){ sleeper("wfs_long", 150); }, + [](){ sleeper("wfs_medium", 100); }, + [](){ sleeper("wfs_short", 50); }); +//] +}); + +/***************************************************************************** +* when_any, return value +*****************************************************************************/ +// When there's only one function, call this overload +//[wait_first_value_impl +template< typename T, typename Fn > +void wait_first_value_impl( std::shared_ptr< boost::fibers::bounded_channel< T > > channel, + Fn && function) { + boost::fibers::fiber( [channel, function](){ + // Ignore channel_op_status returned by push(): might be closed, might + // be full; we simply don't care. + channel->push( function() ); + }).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename Fn0, typename Fn1, typename ... Fns > +void wait_first_value_impl( std::shared_ptr< boost::fibers::bounded_channel< T > > channel, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_first_value_impl< T >( channel, + std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_first_value_impl< T >( channel, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +//[wait_first_value +// Assume that all passed functions have the same return type. The return type +// of wait_first_value() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_value( Fn && function, Fns && ... functions) { + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::bounded_channel< return_t > channel_t; + // bounded_channel of size 1: only store the first value + auto channelp( std::make_shared< channel_t >( 1) ); + // launch all the relevant fibers + wait_first_value_impl< return_t >( channelp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // retrieve the first value + return_t value( channelp->value_pop() ); + // close the channel: no subsequent push() has to succeed + channelp->close(); + return value; +} +//] + +// example usage +Example wfv( runner, "wait_first_value()", [](){ +//[wait_first_value_ex + std::string result = wait_first_value( + [](){ return sleeper("wfv_third", 150); }, + [](){ return sleeper("wfv_second", 100); }, + [](){ return sleeper("wfv_first", 50); }); + std::cout << "wait_first_value() => " << result << std::endl; + assert(result == "wfv_first"); +//] +}); + +/***************************************************************************** +* when_any, produce first outcome, whether result or exception +*****************************************************************************/ +// When there's only one function, call this overload. +//[wait_first_outcome_impl +template< typename T, typename Fn > +void wait_first_outcome_impl( std::shared_ptr< + boost::fibers::bounded_channel< + boost::fibers::future< T > > > channel, + Fn && function) { + boost::fibers::fiber( [channel, function](){ + // Instantiate a packaged_task to capture any exception thrown by + // function. + boost::fibers::packaged_task< T() > task( function); + // Immediately run this packaged_task on same fiber. We want + // function() to have completed BEFORE we push the future. + task(); + // Pass the corresponding future to consumer. Ignore channel_op_status + // returned by push(): might be closed, might be full; we simply don't + // care. + channel->push( task.get_future() ); + }).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename Fn0, typename Fn1, typename ... Fns > +void wait_first_outcome_impl( std::shared_ptr< boost::fibers::bounded_channel< + boost::fibers::future< T > > > channel, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_first_outcome_impl< T >( channel, + std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_first_outcome_impl< T >( channel, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +// Assume that all passed functions have the same return type. The return type +// of wait_first_outcome() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +//[wait_first_outcome +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_outcome( Fn && function, Fns && ... functions) { + // In this case, the value we pass through the channel is actually a + // future -- which is already ready. future can carry either a value or an + // exception. + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::bounded_channel< future_t > channel_t; + // bounded_channel of size 1: only store the first future + auto channelp(std::make_shared< channel_t >( 1) ); + // launch all the relevant fibers + wait_first_outcome_impl< return_t >( channelp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // retrieve the first future + future_t future( channelp->value_pop() ); + // close the channel: no subsequent push() has to succeed + channelp->close(); + // either return value or throw exception + return future.get(); +} +//] + +// example usage +Example wfo( runner, "wait_first_outcome()", [](){ +//[wait_first_outcome_ex + std::string result = wait_first_outcome( + [](){ return sleeper("wfos_first", 50); }, + [](){ return sleeper("wfos_second", 100); }, + [](){ return sleeper("wfos_third", 150); }); + std::cout << "wait_first_outcome(success) => " << result << std::endl; + assert(result == "wfos_first"); + + std::string thrown; + try { + result = wait_first_outcome( + [](){ return sleeper("wfof_first", 50, true); }, + [](){ return sleeper("wfof_second", 100); }, + [](){ return sleeper("wfof_third", 150); }); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_first_outcome(fail) threw '" << thrown + << "'" << std::endl; + assert(thrown == "wfof_first"); +//] +}); + +/***************************************************************************** +* when_any, collect exceptions until success; throw exception_list if no +* success +*****************************************************************************/ +// define an exception to aggregate exception_ptrs; prefer +// std::exception_list (N4407 et al.) once that becomes available +//[exception_list +class exception_list : public std::runtime_error { +public: + exception_list( std::string const& what) : + std::runtime_error( what) { + } + + typedef std::vector< std::exception_ptr > bundle_t; + + // N4407 proposed std::exception_list API + typedef bundle_t::const_iterator iterator; + + std::size_t size() const noexcept { + return bundle_.size(); + } + + iterator begin() const noexcept { + return bundle_.begin(); + } + + iterator end() const noexcept { + return bundle_.end(); + } + + // extension to populate + void add( std::exception_ptr ep) { + bundle_.push_back( ep); + } + +private: + bundle_t bundle_; +}; +//] + +// Assume that all passed functions have the same return type. The return type +// of wait_first_success() is the return type of the first passed function. It is +// simply invalid to pass NO functions. +//[wait_first_success +template< typename Fn, typename ... Fns > +typename std::result_of< Fn() >::type +wait_first_success( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + // In this case, the value we pass through the channel is actually a + // future -- which is already ready. future can carry either a value or an + // exception. + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::bounded_channel< future_t > channel_t; + // make bounded_channel big enough to hold all results if need be + // (could use unbounded_channel this time, but let's just share + // wait_first_outcome_impl()) + auto channelp( std::make_shared< channel_t >( count) ); + // launch all the relevant fibers + wait_first_outcome_impl< return_t >( channelp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // instantiate exception_list, just in case + exception_list exceptions("wait_first_success() produced only errors"); + // retrieve up to 'count' results -- but stop there! + for ( std::size_t i = 0; i < count; ++i) { + // retrieve the next future + future_t future( channelp->value_pop() ); + // retrieve exception_ptr if any + std::exception_ptr error( future.get_exception_ptr() ); + // if no error, then yay, return value + if ( ! error) { + // close the channel: no subsequent push() has to succeed + channelp->close(); + // show caller the value we got + return future.get(); + } + + // error is non-null: collect + exceptions.add( error); + } + // We only arrive here when every passed function threw an exception. + // Throw our collection to inform caller. + throw exceptions; +} +//] + +// example usage +Example wfss( runner, "wait_first_success()", [](){ +//[wait_first_success_ex + std::string result = wait_first_success( + [](){ return sleeper("wfss_first", 50, true); }, + [](){ return sleeper("wfss_second", 100); }, + [](){ return sleeper("wfss_third", 150); }); + std::cout << "wait_first_success(success) => " << result << std::endl; + assert(result == "wfss_second"); +//] + + std::string thrown; + std::size_t count = 0; + try { + result = wait_first_success( + [](){ return sleeper("wfsf_first", 50, true); }, + [](){ return sleeper("wfsf_second", 100, true); }, + [](){ return sleeper("wfsf_third", 150, true); }); + } catch ( exception_list const& e) { + thrown = e.what(); + count = e.size(); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_first_success(fail) threw '" << thrown << "': " + << count << " errors" << std::endl; + assert(thrown == "wait_first_success() produced only errors"); + assert(count == 3); +}); + +/***************************************************************************** +* when_any, heterogeneous +*****************************************************************************/ +//[wait_first_value_het +// No need to break out the first Fn for interface function: let the compiler +// complain if empty. +// Our functions have different return types, and we might have to return any +// of them. Use a variant, expanding std::result_of::type for each Fn in +// parameter pack. +template< typename ... Fns > +boost::variant< typename std::result_of< Fns() >::type ... > +wait_first_value_het( Fns && ... functions) { + // Use bounded_channel>; see remarks above. + typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t; + typedef boost::fibers::bounded_channel< return_t > channel_t; + // bounded_channel of size 1: only store the first value + auto channelp( std::make_shared< channel_t >( 1) ); + // launch all the relevant fibers + wait_first_value_impl< return_t >( channelp, + std::forward< Fns >( functions) ... ); + // retrieve the first value + return_t value( channelp->value_pop() ); + // close the channel: no subsequent push() has to succeed + channelp->close(); + return value; +} +//] + +// example usage +Example wfvh( runner, "wait_first_value_het()", [](){ +//[wait_first_value_het_ex + boost::variant< std::string, double, int > result = + wait_first_value_het( + [](){ return sleeper("wfvh_third", 150); }, + [](){ return sleeper(3.14, 100); }, + [](){ return sleeper(17, 50); }); + std::cout << "wait_first_value_het() => " << result << std::endl; + assert(boost::get< int >( result) == 17); +//] +}); + +/***************************************************************************** +* when_all, simple completion +*****************************************************************************/ +// Degenerate case: when there are no functions to wait for, return +// immediately. +void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) { +} + +// When there's at least one function to wait for, launch it and recur to +// process the rest. +//[wait_all_simple_impl +template< typename Fn, typename ... Fns > +void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier, + Fn && function, Fns && ... functions) { + boost::fibers::fiber( [barrier, function](){ + function(); + barrier->wait(); + }).detach(); + wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); +} +//] + +// interface function: instantiate barrier, launch tasks, wait for barrier +//[wait_all_simple +template< typename ... Fns > +void wait_all_simple( Fns && ... functions) { + std::size_t count( sizeof ... ( functions) ); + // Initialize a barrier(count+1) because we'll immediately wait on it. We + // don't want to wake up until 'count' more fibers wait on it. Even though + // we'll stick around until the last of them completes, use shared_ptr + // anyway because it's easier to be confident about lifespan issues. + auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) ); + wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); + barrier->wait(); +} +//] + +// example usage +Example was( runner, "wait_all_simple()", [](){ +//[wait_all_simple_ex + wait_all_simple( + [](){ sleeper("was_long", 150); }, + [](){ sleeper("was_medium", 100); }, + [](){ sleeper("was_short", 50); }); +//] +}); + +/***************************************************************************** +* when_all, return values +*****************************************************************************/ +//[wait_nchannel +// Introduce a channel facade that closes the channel once a specific number +// of items has been pushed. This allows an arbitrary consumer to read until +// 'closed' without itself having to count items. +template< typename T > +class nchannel { +public: + nchannel( std::shared_ptr< boost::fibers::unbounded_channel< T > > cp, + std::size_t lm): + channel_( cp), + limit_( lm) { + assert(channel_); + if ( 0 == limit_) { + channel_->close(); + } + } + + boost::fibers::channel_op_status push( T && va) { + boost::fibers::channel_op_status ok = + channel_->push( std::forward< T >( va) ); + if ( ok == boost::fibers::channel_op_status::success && + --limit_ == 0) { + // after the 'limit_'th successful push, close the channel + channel_->close(); + } + return ok; + } + +private: + std::shared_ptr< boost::fibers::unbounded_channel< T > > channel_; + std::size_t limit_; +}; +//] + +// When there's only one function, call this overload +//[wait_all_values_impl +template< typename T, typename Fn > +void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel, + Fn && function) { + boost::fibers::fiber( [channel, function](){ + channel->push(function()); + }).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename Fn0, typename Fn1, typename ... Fns > +void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_all_values_impl< T >( channel, std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_all_values_impl< T >( channel, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +//[wait_all_values_source +// Return a shared_ptr> from which the caller can +// retrieve each new result as it arrives, until 'closed'. +template< typename Fn, typename ... Fns > +std::shared_ptr< boost::fibers::unbounded_channel< typename std::result_of< Fn() >::type > > +wait_all_values_source( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::unbounded_channel< return_t > channel_t; + // make the channel + auto channelp( std::make_shared< channel_t >() ); + // and make an nchannel facade to close it after 'count' items + auto ncp( std::make_shared< nchannel< return_t > >( channelp, count) ); + // pass that nchannel facade to all the relevant fibers + wait_all_values_impl< return_t >( ncp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // then return the channel for consumer + return channelp; +} +//] + +// When all passed functions have completed, return vector containing +// collected results. Assume that all passed functions have the same return +// type. It is simply invalid to pass NO functions. +//[wait_all_values +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_values( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + + // get channel + std::shared_ptr< boost::fibers::unbounded_channel< return_t > > channel = + wait_all_values_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // fill results vector + return_t value; + while ( boost::fibers::channel_op_status::success == channel->pop(value) ) { + results.push_back( value); + } + // return vector to caller + return results; +} +//] + +Example wav( runner, "wait_all_values()", [](){ +//[wait_all_values_source_ex + std::shared_ptr< boost::fibers::unbounded_channel< std::string > > channel = + wait_all_values_source( + [](){ return sleeper("wavs_third", 150); }, + [](){ return sleeper("wavs_second", 100); }, + [](){ return sleeper("wavs_first", 50); }); + std::string value; + while ( boost::fibers::channel_op_status::success == channel->pop(value) ) { + std::cout << "wait_all_values_source() => '" << value + << "'" << std::endl; + } +//] + +//[wait_all_values_ex + std::vector< std::string > values = + wait_all_values( + [](){ return sleeper("wav_late", 150); }, + [](){ return sleeper("wav_middle", 100); }, + [](){ return sleeper("wav_early", 50); }); +//] + std::cout << "wait_all_values() =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; +}); + +/***************************************************************************** +* when_all, throw first exception +*****************************************************************************/ +// When there's only one function, call this overload +//[wait_all_until_error_impl +template< typename T, typename Fn > +void wait_all_until_error_impl( std::shared_ptr< nchannel< boost::fibers::future< T > > > channel, + Fn && function) { + boost::fibers::fiber( [channel, function](){ + // Instantiate a packaged_task to capture any exception thrown by + // function. + boost::fibers::packaged_task< T() > task( function); + // Immediately run this packaged_task on same fiber. We want + // function() to have completed BEFORE we push the future. + task(); + // Pass the corresponding future to consumer. + channel->push( task.get_future() ); + }).detach(); +} +//] + +// When there are two or more functions, call this overload +template< typename T, typename Fn0, typename Fn1, typename ... Fns > +void wait_all_until_error_impl( std::shared_ptr< nchannel< boost::fibers::future< T > > > channel, + Fn0 && function0, + Fn1 && function1, + Fns && ... functions) { + // process the first function using the single-function overload + wait_all_until_error_impl< T >( channel, + std::forward< Fn0 >( function0) ); + // then recur to process the rest + wait_all_until_error_impl< T >( channel, + std::forward< Fn1 >( function1), + std::forward< Fns >( functions) ... ); +} + +//[wait_all_until_error_source +// Return a shared_ptr>> from which the caller can +// get() each new result as it arrives, until 'closed'. +template< typename Fn, typename ... Fns > +std::shared_ptr< + boost::fibers::unbounded_channel< + boost::fibers::future< + typename std::result_of< Fn() >::type > > > +wait_all_until_error_source( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef boost::fibers::future< return_t > future_t; + typedef boost::fibers::unbounded_channel< future_t > channel_t; + // make the channel + auto channelp( std::make_shared< channel_t >() ); + // and make an nchannel facade to close it after 'count' items + auto ncp( std::make_shared< nchannel< future_t > >( channelp, count) ); + // pass that nchannel facade to all the relevant fibers + wait_all_until_error_impl< return_t >( ncp, + std::forward< Fn >( function), + std::forward< Fns >( functions) ... ); + // then return the channel for consumer + return channelp; +} +//] + +// When all passed functions have completed, return vector containing +// collected results, or throw the first exception thrown by any of the passed +// functions. Assume that all passed functions have the same return type. It +// is simply invalid to pass NO functions. +//[wait_all_until_error +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_until_error( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef typename boost::fibers::future< return_t > future_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + + // get channel + std::shared_ptr< + boost::fibers::unbounded_channel< future_t > > channel( + wait_all_until_error_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ) ); + // fill results vector + future_t future; + while ( boost::fibers::channel_op_status::success == channel->pop( future) ) { + results.push_back( future.get() ); + } + // return vector to caller + return results; +} +//] + +Example waue( runner, "wait_all_until_error()", [](){ +//[wait_all_until_error_source_ex + typedef boost::fibers::future< std::string > future_t; + std::shared_ptr< boost::fibers::unbounded_channel< future_t > > channel = + wait_all_until_error_source( + [](){ return sleeper("wauess_third", 150); }, + [](){ return sleeper("wauess_second", 100); }, + [](){ return sleeper("wauess_first", 50); }); + future_t future; + while ( boost::fibers::channel_op_status::success == channel->pop( future) ) { + std::string value( future.get() ); + std::cout << "wait_all_until_error_source(success) => '" << value + << "'" << std::endl; + } +//] + + channel = wait_all_until_error_source( + [](){ return sleeper("wauesf_third", 150); }, + [](){ return sleeper("wauesf_second", 100, true); }, + [](){ return sleeper("wauesf_first", 50); }); +//[wait_all_until_error_ex + std::string thrown; +//<- + try { + while ( boost::fibers::channel_op_status::success == channel->pop( future) ) { + std::string value( future.get() ); + std::cout << "wait_all_until_error_source(fail) => '" << value + << "'" << std::endl; + } + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_until_error_source(fail) threw '" << thrown + << "'" << std::endl; + + thrown.clear(); +//-> + try { + std::vector< std::string > values = wait_all_until_error( + [](){ return sleeper("waue_late", 150); }, + [](){ return sleeper("waue_middle", 100, true); }, + [](){ return sleeper("waue_early", 50); }); +//<- + std::cout << "wait_all_until_error(fail) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; +//-> + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_until_error(fail) threw '" << thrown + << "'" << std::endl; +//] +}); + +/***************************************************************************** +* when_all, collect exceptions +*****************************************************************************/ +// When all passed functions have succeeded, return vector containing +// collected results, or throw exception_list containing all exceptions thrown +// by any of the passed functions. Assume that all passed functions have the +// same return type. It is simply invalid to pass NO functions. +//[wait_all_collect_errors +template< typename Fn, typename ... Fns > +std::vector< typename std::result_of< Fn() >::type > +wait_all_collect_errors( Fn && function, Fns && ... functions) { + std::size_t count( 1 + sizeof ... ( functions) ); + typedef typename std::result_of< Fn() >::type return_t; + typedef typename boost::fibers::future< return_t > future_t; + typedef std::vector< return_t > vector_t; + vector_t results; + results.reserve( count); + exception_list exceptions("wait_all_collect_errors() exceptions"); + + // get channel + std::shared_ptr< + boost::fibers::unbounded_channel< future_t > > channel( + wait_all_until_error_source( std::forward< Fn >( function), + std::forward< Fns >( functions) ... ) ); + // fill results and/or exceptions vectors + future_t future; + while ( boost::fibers::channel_op_status::success == channel->pop( future) ) { + std::exception_ptr exp = future.get_exception_ptr(); + if ( ! exp) { + results.push_back( future.get() ); + } else { + exceptions.add( exp); + } + } + // if there were any exceptions, throw + if ( exceptions.size() ) { + throw exceptions; + } + // no exceptions: return vector to caller + return results; +} +//] + +Example wace( runner, "wait_all_collect_errors()", [](){ + std::vector< std::string > values = wait_all_collect_errors( + [](){ return sleeper("waces_late", 150); }, + [](){ return sleeper("waces_middle", 100); }, + [](){ return sleeper("waces_early", 50); }); + std::cout << "wait_all_collect_errors(success) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; + + std::string thrown; + std::size_t errors = 0; + try { + values = wait_all_collect_errors( + [](){ return sleeper("wacef_late", 150, true); }, + [](){ return sleeper("wacef_middle", 100, true); }, + [](){ return sleeper("wacef_early", 50); }); + std::cout << "wait_all_collect_errors(fail) =>"; + for ( std::string const& v : values) { + std::cout << " '" << v << "'"; + } + std::cout << std::endl; + } catch ( exception_list const& e) { + thrown = e.what(); + errors = e.size(); + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_collect_errors(fail) threw '" << thrown + << "': " << errors << " errors" << std::endl; +}); + +/***************************************************************************** +* when_all, heterogeneous +*****************************************************************************/ +//[wait_all_members_get +template< typename Result, typename ... Futures > +Result wait_all_members_get( Futures && ... futures) { + // Fetch the results from the passed futures into Result's initializer + // list. It's true that the get() calls here will block the implicit + // iteration over futures -- but that doesn't matter because we won't be + // done until the slowest of them finishes anyway. As results are + // processed in argument-list order rather than order of completion, the + // leftmost get() to throw an exception will cause that exception to + // propagate to the caller. + return Result{ futures.get() ... }; +} +//] + +//[wait_all_members +// Explicitly pass Result. This can be any type capable of being initialized +// from the results of the passed functions, such as a struct. +template< typename Result, typename ... Fns > +Result wait_all_members( Fns && ... functions) { + // Run each of the passed functions on a separate fiber, passing all their + // futures to helper function for processing. + return wait_all_members_get< Result >( + boost::fibers::async( std::forward< Fns >( functions) ) ... ); +} +//] + +// used by following example +//[wait_Data +struct Data { + std::string str; + double inexact; + int exact; + + friend std::ostream& operator<<( std::ostream& out, Data const& data) +/*= ...*/ +//<- + { + return out << "Data{str='" << data.str << "', inexact=" << data.inexact + << ", exact=" << data.exact << "}"; + } +//-> +}; +//] + +// example usage +Example wam( runner, "wait_all_members()", [](){ +//[wait_all_members_data_ex + Data data = wait_all_members< Data >( + [](){ return sleeper("wams_left", 100); }, + [](){ return sleeper(3.14, 150); }, + [](){ return sleeper(17, 50); }); + std::cout << "wait_all_members(success) => " << data << std::endl; +//] + + std::string thrown; + try { + data = wait_all_members< Data >( + [](){ return sleeper("wamf_left", 100, true); }, + [](){ return sleeper(3.14, 150); }, + [](){ return sleeper(17, 50, true); }); + std::cout << "wait_all_members(fail) => " << data << std::endl; + } catch ( std::exception const& e) { + thrown = e.what(); + } + std::cout << "wait_all_members(fail) threw '" << thrown + << '"' << std::endl; + +//[wait_all_members_vector_ex + // If we don't care about obtaining results as soon as they arrive, and we + // prefer a result vector in passed argument order rather than completion + // order, wait_all_members() is another possible implementation of + // wait_all_until_error(). + auto strings = wait_all_members< std::vector< std::string > >( + [](){ return sleeper("wamv_left", 150); }, + [](){ return sleeper("wamv_middle", 100); }, + [](){ return sleeper("wamv_right", 50); }); + std::cout << "wait_all_members() =>"; + for ( std::string const& str : strings) { + std::cout << " '" << str << "'"; + } + std::cout << std::endl; +//] +}); + +/***************************************************************************** +* main() +*****************************************************************************/ +int main( int argc, char *argv[]) { + runner.run(); + std::cout << "done." << std::endl; + return EXIT_SUCCESS; +} diff --git a/examples/work_sharing.cpp b/examples/work_sharing.cpp new file mode 100644 index 00000000..1784a897 --- /dev/null +++ b/examples/work_sharing.cpp @@ -0,0 +1,178 @@ +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/***************************************************************************** +* shared_ready_queue scheduler +*****************************************************************************/ +// This simple scheduler is like round_robin, except that it shares a common +// ready queue among all participating threads. A thread participates in this +// pool by executing use_scheduling_algorithm() before any +// other Boost.Fiber operation. +class shared_ready_queue : public boost::fibers::sched_algorithm { +private: + typedef std::unique_lock< std::mutex > lock_t; + typedef std::queue< boost::fibers::context * > rqueue_t; + + // The important point about this ready queue is that it's a class static, + // common to all instances of shared_ready_queue. + static rqueue_t rqueue_; + + // so is this mutex + static std::mutex mtx_; + + // Reserve a separate, scheduler-specific slot for this thread's main + // fiber. When we're passed the main fiber, stash it there instead of in + // the shared queue: it would be Bad News for thread B to retrieve and + // attempt to execute thread A's main fiber. This slot might be empty + // (nullptr) or full: pick_next() must only return the main fiber's + // context* after it has been passed to awakened(). + boost::fibers::context * main_ctx_; + boost::fibers::context * dispatcher_ctx_; + +public: + shared_ready_queue() : + main_ctx_( nullptr), + dispatcher_ctx_( nullptr) { + } + + virtual void awakened( boost::fibers::context * ctx) { + BOOST_ASSERT( nullptr != ctx); + + // recognize when we're passed this thread's main fiber + if ( ctx->is_main_context() ) { + // never put this thread's main fiber on the queue + // stash it in separate slot + main_ctx_ = ctx; + // recognize when we're passed this thread's dispatcher fiber + } else if ( ctx->is_dispatcher_context() ) { + // never put this thread's main fiber on the queue + // stash it in separate slot + dispatcher_ctx_ = ctx; + } else { + ctx->worker_unlink(); + ctx->set_scheduler( nullptr); + // ordinary fiber, enqueue on shared queue + lock_t lock( mtx_); + rqueue_.push( ctx); + } + } + + virtual boost::fibers::context * pick_next() { + lock_t lock( mtx_); + boost::fibers::context * victim( nullptr); + if ( ! rqueue_.empty() ) { + // good, we have an item in the ready queue, pop it + victim = rqueue_.front(); + rqueue_.pop(); + BOOST_ASSERT( nullptr != victim); + } else if ( nullptr != main_ctx_) { + // nothing in the ready queue, return main_ctx_ + victim = main_ctx_; + // once we've returned main_ctx_, clear the slot + main_ctx_ = nullptr; + } else if ( nullptr != dispatcher_ctx_) { + // nothing in the ready queue, return dispatcher_ctx_ + victim = dispatcher_ctx_; + // once we've returned dispatcher_ctx_, clear the slot + dispatcher_ctx_ = nullptr; + } + return victim; + } + + virtual bool has_ready_fibers() const noexcept { + lock_t lock( mtx_); + return ! rqueue_.empty() || nullptr != main_ctx_; + } +}; + +shared_ready_queue::rqueue_t shared_ready_queue::rqueue_; +std::mutex shared_ready_queue::mtx_; + +/***************************************************************************** +* example fiber function +*****************************************************************************/ +void whatevah( char me) { + std::thread::id my_thread = std::this_thread::get_id(); + { + std::ostringstream buffer; + //buffer << "fiber " << me << " started on thread " << my_thread << '\n'; + std::cout << buffer.str() << std::flush; + } + for ( unsigned i = 0; i < 10; ++i) { + boost::this_fiber::yield(); + std::thread::id new_thread = std::this_thread::get_id(); + if ( new_thread != my_thread) { + my_thread = new_thread; + std::ostringstream buffer; + //buffer << "fiber " << me << " switched to thread " << my_thread << '\n'; + std::cout << buffer.str() << std::flush; + } + } +} + +/***************************************************************************** +* example thread function +*****************************************************************************/ +// Wait until all running fibers have completed. This works because we happen +// to know that all example fibers use yield(), which leaves them in ready +// state. A fiber blocked on a synchronization object is invisible to +// ready_fibers(). +void drain() { + // THIS fiber is running, so won't be counted among "ready" fibers + while ( boost::fibers::has_ready_fibers() ) { + boost::this_fiber::yield(); + } +} + +void thread() { + boost::fibers::use_scheduling_algorithm< shared_ready_queue >(); + drain(); +} + +/***************************************************************************** +* main() +*****************************************************************************/ +int main( int argc, char *argv[]) { + // use shared_ready_queue for main thread too, so we launch new fibers + // into shared pool + boost::fibers::use_scheduling_algorithm< shared_ready_queue >(); + + for ( int i = 0; i < 10; ++i) { + // launch a number of fibers + for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { + boost::fibers::fiber([c](){ whatevah( c); }).detach(); + } + + // launch a couple threads to help process them + std::thread threads[] = { + std::thread( thread), + std::thread( thread), + std::thread( thread), + std::thread( thread), + std::thread( thread) + }; + + // drain running fibers + drain(); + + // wait for threads to terminate + for ( std::thread & t : threads) { + t.join(); + } + } + + return EXIT_SUCCESS; +} +