From 3b9c8f0b46de643ff95a1c878d9e46417783ab5b Mon Sep 17 00:00:00 2001 From: Nat Goodspeed Date: Tue, 5 Apr 2016 21:17:35 -0400 Subject: [PATCH] Add autoecho.cpp to drive echo client/server automatically. --- examples/Jamfile.v2 | 1 + examples/asio/autoecho.cpp | 268 +++++++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 examples/asio/autoecho.cpp diff --git a/examples/Jamfile.v2 b/examples/Jamfile.v2 index 405db00a..ed780637 100644 --- a/examples/Jamfile.v2 +++ b/examples/Jamfile.v2 @@ -41,6 +41,7 @@ exe work_sharing : work_sharing.cpp ; exe work_stealing : work_stealing.cpp ; exe asio/echo : asio/echo.cpp ; +exe asio/autoecho : asio/autoecho.cpp ; exe asio/ps/publisher : asio/ps/publisher.cpp ; exe asio/ps/server : asio/ps/server.cpp ; exe asio/ps/subscriber : asio/ps/subscriber.cpp ; diff --git a/examples/asio/autoecho.cpp b/examples/asio/autoecho.cpp new file mode 100644 index 00000000..0d3cb3c6 --- /dev/null +++ b/examples/asio/autoecho.cpp @@ -0,0 +1,268 @@ +// Copyright 2003-2013 Christopher M. Kohlhoff +// Copyright Oliver Kowalke, Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include "round_robin.hpp" +#include "yield.hpp" + +using boost::asio::ip::tcp; + +const int max_length = 1024; + +typedef boost::shared_ptr< tcp::socket > socket_ptr; + +const char* const alpha = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + +/***************************************************************************** +* thread names +*****************************************************************************/ +class ThreadNames { +private: + std::map names_{}; + const char* next_{ alpha }; + std::mutex mtx_{}; + +public: + ThreadNames() = default; + + std::string lookup() { + std::unique_lock lk( mtx_); + auto this_id( std::this_thread::get_id() ); + auto found = names_.find( this_id ); + if ( found != names_.end() ) { + return found->second; + } + BOOST_ASSERT( *next_); + std::string name(1, *next_++ ); + names_[ this_id ] = name; + return name; + } +}; + +ThreadNames thread_names; + +/***************************************************************************** +* fiber names +*****************************************************************************/ +class FiberNames { +private: + std::map names_{}; + unsigned next_{ 0 }; + boost::fibers::mutex mtx_{}; + +public: + FiberNames() = default; + + std::string lookup() { + std::unique_lock lk( mtx_); + auto this_id( boost::this_fiber::get_id() ); + auto found = names_.find( this_id ); + if ( found != names_.end() ) { + return found->second; + } + std::ostringstream out; + // Bake into the fiber's name the thread name on which we first + // lookup() its ID, to be able to spot when a fiber hops between + // threads. + out << thread_names.lookup() << next_++; + std::string name( out.str() ); + names_[ this_id ] = name; + return name; + } +}; + +FiberNames fiber_names; + +std::string tag() { + std::ostringstream out; + out << "Thread " << thread_names.lookup() << ": " + << std::setw(4) << fiber_names.lookup() << std::setw(0); + return out.str(); +} + +/***************************************************************************** +* message printing +*****************************************************************************/ +void print_( std::ostream& out) { + out << '\n'; +} + +template < typename T, typename... Ts > +void print_( std::ostream& out, T const& arg, Ts const&... args) { + out << arg; + print_(out, args...); +} + +template < typename... T > +void print( T const&... args ) { + std::ostringstream buffer; + print_( buffer, args...); + std::cout << buffer.str() << std::flush; +} + +/***************************************************************************** +* fiber function per server connection +*****************************************************************************/ +void session( socket_ptr sock) { + try { + for (;;) { + char data[max_length]; + boost::system::error_code ec; + std::size_t length = sock->async_read_some( + boost::asio::buffer( data), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + print( tag(), " : handled: ", std::string(data, length)); + boost::asio::async_write( + * sock, + boost::asio::buffer( data, length), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + break; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + } + print( tag(), " : connection closed"); + } catch ( boost::fibers::fiber_interrupted const&) { + print( tag(), " : interrupted"); + } catch ( std::exception const& ex) { + print( tag(), " : caught exception : ", ex.what()); + } +} + +/***************************************************************************** +* listening server +*****************************************************************************/ +void server( boost::asio::io_service & io_svc) { + print( tag(), " : echo-server started"); + try { + tcp::acceptor a( io_svc, tcp::endpoint( tcp::v4(), 9999) ); + for (;;) { + socket_ptr socket( new tcp::socket( io_svc) ); + boost::system::error_code ec; + a.async_accept( + * socket, + boost::fibers::asio::yield[ec]); + if ( ec) { + throw boost::system::system_error( ec); //some other error + } else { + boost::fibers::fiber( session, socket).detach(); + } + } + } catch ( boost::fibers::fiber_interrupted const&) { + print( tag(), " : interrupted"); + } catch ( std::exception const& ex) { + print( tag(), " : catched exception : ", ex.what()); + } +} + +/***************************************************************************** +* fiber function per client +*****************************************************************************/ +void client( boost::asio::io_service & io_svc, boost::fibers::barrier& barrier, + unsigned iterations) { + print( tag(), " : echo-client started"); + for (unsigned count = 0; count < iterations; ++count) { + tcp::resolver resolver( io_svc); + tcp::resolver::query query( tcp::v4(), "127.0.0.1", "9999"); + tcp::resolver::iterator iterator = resolver.resolve( query); + tcp::socket s( io_svc); + boost::asio::connect( s, iterator); + for (unsigned msg = 0; msg < 1; ++msg) { + std::ostringstream msgbuf; + msgbuf << "from " << fiber_names.lookup() << " " << count << "." << msg; + std::string message(msgbuf.str()); + print( tag(), " : Sending: ", message); + boost::system::error_code ec; + boost::asio::async_write( + s, + boost::asio::buffer( message), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + return; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + char reply[max_length]; + size_t reply_length = s.async_read_some( + boost::asio::buffer( reply, max_length), + boost::fibers::asio::yield[ec]); + if ( ec == boost::asio::error::eof) { + return; //connection closed cleanly by peer + } else if ( ec) { + throw boost::system::system_error( ec); //some other error + } + print( tag(), " : Reply : ", std::string( reply, reply_length)); + } + } + print( tag(), " : done"); + // done with all iterations, wait for rest of client fibers + if ( barrier.wait()) { + // exactly one barrier.wait() call returns true + // we're the lucky one + print( tag(), " : stopping io_service"); + io_svc.stop(); + } +} + +/***************************************************************************** +* main +*****************************************************************************/ +int main( int argc, char* argv[]) { + try { + boost::asio::io_service io_svc; + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc); + // server + boost::fibers::fiber f( + server, boost::ref( io_svc) ); + // client + const unsigned iterations = 20; + const unsigned clients = 3; + boost::fibers::barrier barrier(clients); + for (unsigned c = 0; c < clients; ++c) { + boost::fibers::fiber( + client, boost::ref( io_svc), boost::ref( barrier), + iterations ).detach(); + } + // run io_service in two threads + std::thread t([&io_svc](){ + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc); + print( "Thread ", thread_names.lookup(), ": started"); + io_svc.run(); + print( "Thread ", thread_names.lookup(), ": stopping"); + }); + io_svc.run(); + print( tag(), " : back from io_service::run(), waiting for thread"); + t.join(); + print( tag(), " : back from thread.join(), waiting for server fiber"); + f.interrupt(); + f.join(); + print( "done."); + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + print("Exception: ", e.what(), "\n"); + } + + return EXIT_FAILURE; +}