From ffffd322bbb5e71493f29024f2ffaf8974f7c5c0 Mon Sep 17 00:00:00 2001 From: oliver Kowalke Date: Tue, 1 Mar 2016 08:56:15 +0100 Subject: [PATCH] imake asio example thread-safe --- examples/asio/detail/yield.hpp | 4 +++ examples/asio/echo.cpp | 60 +++++++++++++++++++------------ examples/asio/round_robin.hpp | 65 +++++++++++++++++++++++++++------- 3 files changed, 94 insertions(+), 35 deletions(-) diff --git a/examples/asio/detail/yield.hpp b/examples/asio/detail/yield.hpp index b49d6822..68cef5d1 100644 --- a/examples/asio/detail/yield.hpp +++ b/examples/asio/detail/yield.hpp @@ -33,6 +33,7 @@ public: { * ec_ = boost::system::error_code(); * value_ = std::move( t); + boost::fibers::context::active()->migrate( ctx_); boost::fibers::context::active()->set_ready( ctx_); } @@ -40,6 +41,7 @@ public: { * ec_ = ec; * value_ = std::move( t); + boost::fibers::context::active()->migrate( ctx_); boost::fibers::context::active()->set_ready( ctx_); } @@ -62,12 +64,14 @@ public: void operator()() { * ec_ = boost::system::error_code(); + boost::fibers::context::active()->migrate( ctx_); boost::fibers::context::active()->set_ready( ctx_); } void operator()( boost::system::error_code const& ec) { * ec_ = ec; + boost::fibers::context::active()->migrate( ctx_); boost::fibers::context::active()->set_ready( ctx_); } diff --git a/examples/asio/echo.cpp b/examples/asio/echo.cpp index 21b8950f..41c9b269 100644 --- a/examples/asio/echo.cpp +++ b/examples/asio/echo.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -30,7 +31,6 @@ typedef boost::shared_ptr< tcp::socket > socket_ptr; void session( socket_ptr sock) { boost::fibers::fiber::id id = boost::this_fiber::get_id(); - std::cout << "fiber " << id << " : echo-handler started" << std::endl; try { for (;;) { char data[max_length]; @@ -41,8 +41,7 @@ void session( socket_ptr sock) { if ( ec == boost::asio::error::eof) { break; //connection closed cleanly by peer } else if ( ec) { - std::cerr << "fiber " << id << " : error occured : " << ec.message() << std::endl; - return; + throw boost::system::system_error( ec); //some other error } boost::asio::async_write( * sock, @@ -55,40 +54,51 @@ void session( socket_ptr sock) { } } } catch ( boost::fibers::fiber_interrupted const&) { - std::cout << "fiber " << id << " : interrupted" << std::endl; + std::ostringstream buffer; + buffer << "tid=" << std::this_thread::get_id() << ", fid=" << id << " : interrupted" << std::endl; + std::cerr << buffer.str() << std::flush; } catch ( std::exception const& ex) { - std::cout << "fiber " << id << " : catched exception : " << ex.what() << std::endl; + std::ostringstream buffer; + buffer << "tid=" << std::this_thread::get_id() << ", fid=" << id << " : catched exception : " << ex.what() << std::endl; + std::cerr << buffer.str() << std::flush; } - std::cout << "fiber " << id << " terminates" << std::endl; } void server( boost::asio::io_service & io_svc) { boost::fibers::fiber::id id = boost::this_fiber::get_id(); - std::cout << "fiber " << id << " : echo-server started" << std::endl; + std::ostringstream buffer; + buffer << "tid=" << std::this_thread::get_id() << ", fid=" << id << " : echo-server started" << std::endl; + std::cout << buffer.str() << std::flush; try { tcp::acceptor a( io_svc, tcp::endpoint( tcp::v4(), 9999) ); for (;;) { socket_ptr socket( new tcp::socket( io_svc) ); boost::system::error_code ec; - std::cout << "fiber " << id << " : accept new connection" << std::endl; a.async_accept( * socket, boost::fibers::asio::yield[ec]); if ( ec) { - std::cerr << "fiber " << id << " : error occured : " << ec.message() << std::endl; + throw boost::system::system_error( ec); //some other error } else { boost::fibers::fiber( session, socket).detach(); } } } catch ( boost::fibers::fiber_interrupted const&) { - std::cout << "fiber " << id << " : interrupted" << std::endl; + std::ostringstream buffer; + buffer << "tid=" << std::this_thread::get_id() << ", fid=" << id << " : interrupted" << std::endl; + std::cerr << buffer.str() << std::flush; } catch ( std::exception const& ex) { - std::cout << "fiber " << id << " : catched exception : " << ex.what() << std::endl; + std::ostringstream buffer; + buffer << "tid=" << std::this_thread::get_id() << ", fid=" << id << " : catched exception : " << ex.what() << std::endl; + std::cerr << buffer.str() << std::flush; } - std::cout << "fiber " << id << " terminates" << std::endl; } void client( boost::asio::io_service & io_svc) { + boost::fibers::fiber::id id = boost::this_fiber::get_id(); + std::ostringstream buffer; + buffer << "tid=" << std::this_thread::get_id() << ", fid=" << id << " : echo-client started" << std::endl; + std::cout << buffer.str() << std::flush; tcp::resolver resolver( io_svc); tcp::resolver::query query( tcp::v4(), "127.0.0.1", "9999"); tcp::resolver::iterator iterator = resolver.resolve( query); @@ -96,7 +106,9 @@ void client( boost::asio::io_service & io_svc) { boost::asio::connect( s, iterator); for (;;) { char request[max_length]; - std::cout << "Enter message: "; + std::ostringstream buffer; + buffer << "tid=" << std::this_thread::get_id() << ", fid=" << id << " : Enter message: "; + std::cout << buffer.str() << std::flush; std::cin.getline( request, max_length); boost::system::error_code ec; size_t request_length = std::strlen( request); @@ -118,27 +130,31 @@ void client( boost::asio::io_service & io_svc) { } else if ( ec) { throw boost::system::system_error( ec); //some other error } - std::cout << "Reply is: "; - std::cout.write( reply, reply_length); - std::cout << std::endl; + std::ostringstream result; + result << "tid=" << std::this_thread::get_id() << ", fid=" << id << " : Reply is: "; + result.write( reply, reply_length); + result << std::endl; + std::cout << result.str() << std::flush; } } int main( int argc, char* argv[]) { try { - boost::fibers::fiber::id id = boost::this_fiber::get_id(); - std::cout << "fiber " << id << " : (main-fiber) started" << std::endl; boost::asio::io_service io_svc; boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc); - // server fiber + // server boost::fibers::fiber f( server, boost::ref( io_svc) ); - // client fiber + // client boost::fibers::fiber( client, boost::ref( io_svc) ).detach(); - // run io_service + // run io_service in two threads + std::thread t([&io_svc](){ + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc); + io_svc.run(); + }); io_svc.run(); - std::cout << "fiber " << id << " (main-fiber) terminates" << std::endl; + t.join(); std::cout << "done." << std::endl; return EXIT_SUCCESS; } catch ( std::exception const& e) { diff --git a/examples/asio/round_robin.hpp b/examples/asio/round_robin.hpp index 533d4185..35b2084e 100644 --- a/examples/asio/round_robin.hpp +++ b/examples/asio/round_robin.hpp @@ -8,6 +8,8 @@ #include #include +#include +#include #include #include @@ -30,11 +32,18 @@ namespace boost { namespace fibers { namespace asio { +typedef std::unique_lock< std::mutex > lock_t; + class round_robin : public boost::fibers::sched_algorithm { private: + typedef std::queue< boost::fibers::context * > rqueue_t; + + static rqueue_t rqueue_; + static std::mutex rqueue_mtx_; + boost::asio::io_service & io_svc_; boost::asio::steady_timer suspend_timer_; - boost::fibers::scheduler::ready_queue_t ready_queue_{}; + rqueue_t local_queue_{}; boost::fibers::mutex mtx_{}; boost::fibers::condition_variable cnd_{}; std::size_t counter_{ 0 }; @@ -61,7 +70,7 @@ public: round_robin( boost::asio::io_service & io_svc) : io_svc_( io_svc), suspend_timer_( io_svc_) { - boost::asio::add_service< service >( io_svc_, new service( io_svc_) ); + boost::asio::use_service< service >( io_svc_); io_svc_.post([this]() mutable { while ( ! io_svc_.stopped() ) { if ( has_ready_fibers() ) { @@ -84,29 +93,57 @@ public: void awakened( boost::fibers::context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); - BOOST_ASSERT( ! ctx->ready_is_linked() ); - ctx->ready_link( ready_queue_); - if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { - ++counter_; + if ( ctx->is_context( boost::fibers::type::pinned_context) ) { /*< + recognize when we're passed this thread's main fiber (or an + implicit library helper fiber): never put those on the shared + queue + >*/ + local_queue_.push( ctx); + if ( ctx->is_context( boost::fibers::type::dispatcher_context) ) { + ++counter_; + } + } else { + lock_t lk(rqueue_mtx_); /*< + worker fiber, enqueue on shared queue + >*/ + rqueue_.push( ctx); } } boost::fibers::context * pick_next() noexcept { boost::fibers::context * ctx( nullptr); - if ( ! ready_queue_.empty() ) { - ctx = & ready_queue_.front(); - ready_queue_.pop_front(); + lock_t lk(rqueue_mtx_); + if ( ! rqueue_.empty() ) { /*< + pop an item from the ready queue + >*/ + ctx = rqueue_.front(); + rqueue_.pop(); + lk.unlock(); BOOST_ASSERT( nullptr != ctx); - BOOST_ASSERT( ! ctx->ready_is_linked() ); - if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) { - --counter_; + boost::fibers::context::active()->migrate( ctx); /*< + attach context to current scheduler via the active fiber + of this thread; benign if the fiber already belongs to this + thread + >*/ + } else { + lk.unlock(); + if ( ! local_queue_.empty() ) { /*< + nothing in the ready queue, return main or dispatcher fiber + >*/ + ctx = local_queue_.front(); + local_queue_.pop(); + BOOST_ASSERT ( ctx->is_context( boost::fibers::type::pinned_context) ); + if ( ctx->is_context( boost::fibers::type::dispatcher_context) ) { + --counter_; + } } } return ctx; } bool has_ready_fibers() const noexcept { - return 0 < counter_; + lock_t lock(rqueue_mtx_); + return 0 < counter_ || ! rqueue_.empty(); } void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept { @@ -125,6 +162,8 @@ public: }; boost::asio::io_service::id round_robin::service::id; +round_robin::rqueue_t round_robin::rqueue_{}; +std::mutex round_robin::rqueue_mtx_{}; }}}