diff --git a/examples/asio/detail/promise_handler.hpp b/examples/asio/detail/promise_handler.hpp index fd24e3aa..fd6c9c06 100644 --- a/examples/asio/detail/promise_handler.hpp +++ b/examples/asio/detail/promise_handler.hpp @@ -156,8 +156,8 @@ void asio_handler_invoke( Function f, promise_handler< T > * h) { fprintf(stderr, "before f()\n"); f(); fprintf(stderr, "after f()\n"); - } catch (...) { - fprintf(stderr, "p->set_exception()\n"); + } catch ( std::exception const& ec) { + fprintf(stderr, "p->set_exception() : %s\n", ec.what() ); p->set_exception( std::current_exception() ); } } diff --git a/examples/asio/echo_server.cpp b/examples/asio/echo_server.cpp index 7eadb138..9f63d297 100644 --- a/examples/asio/echo_server.cpp +++ b/examples/asio/echo_server.cpp @@ -53,14 +53,15 @@ void session( socket_ptr sock) { throw boost::system::system_error( ec); //some other error } } + std::cout << "fiber " << id << " terminates" << std::endl; } -void server( boost::asio::io_service & io_service, unsigned short port) { +void server( boost::asio::io_service & io_svc, unsigned short port) { boost::fibers::fiber::id id = boost::this_fiber::get_id(); std::cout << "fiber " << id << " : echo-server started" << std::endl; - tcp::acceptor a( io_service, tcp::endpoint( tcp::v4(), port) ); + tcp::acceptor a( io_svc, tcp::endpoint( tcp::v4(), port) ); for (;;) { - socket_ptr socket( new tcp::socket( io_service) ); + socket_ptr socket( new tcp::socket( io_svc) ); boost::system::error_code ec; std::cout << "fiber " << id << " : accept new connection" << std::endl; a.async_accept( @@ -72,6 +73,7 @@ void server( boost::asio::io_service & io_service, unsigned short port) { boost::fibers::fiber( session, socket).detach(); } } + std::cout << "fiber " << id << " terminates" << std::endl; } int main( int argc, char* argv[]) { @@ -80,40 +82,50 @@ int main( int argc, char* argv[]) { std::cerr << "Usage: echo_server \n"; return EXIT_FAILURE; } - boost::asio::io_service io_service; - boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service); + boost::fibers::fiber::id id = boost::this_fiber::get_id(); + std::cout << "fiber " << id << " : (main-fiber) started" << std::endl; + + boost::asio::io_service io_svc; + boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc); // server fiber - boost::fibers::fiber( - server, boost::ref( io_service), std::atoi( argv[1]) ).detach(); + boost::fibers::fiber f( + server, boost::ref( io_svc), std::atoi( argv[1]) ); // fiber unrelated to asio boost::fibers::fiber( [](){ boost::fibers::fiber::id id = boost::this_fiber::get_id(); - std::cout << "fiber " << id << " : sleeper tarted" << std::endl; - for ( int i = 0; i < 1; ++i) { + std::cout << "fiber " << id << " : sleeper started" << std::endl; + for ( int i = 0; i < 5; ++i) { std::cout << "fiber " << id << " : sleeps for 1 second" << std::endl; boost::this_fiber::sleep_for( std::chrono::seconds( 1) ); } std::cout << "fiber " << id << " : sleeps for 10 seconds" << std::endl; - boost::this_fiber::sleep_for( std::chrono::seconds( 2) ); - for ( int i = 0; i < 1; ++i) { + boost::this_fiber::sleep_for( std::chrono::seconds( 10) ); + for ( int i = 0; i < 5; ++i) { std::cout << "fiber " << id << " : sleeps for 1 second" << std::endl; boost::this_fiber::sleep_for( std::chrono::seconds( 1) ); } + std::cout << "fiber " << id << " sleeper terminates" << std::endl; } ).detach(); // fiber does shutdown the io_service - boost::fibers::fiber f([&io_service]() mutable { + boost::fibers::fiber([&io_svc,&f]() mutable { boost::fibers::fiber::id id = boost::this_fiber::get_id(); - std::cout << "fiber " << id << " : shutdown io_service in 10 seconds" << std::endl; - boost::this_fiber::sleep_for( std::chrono::seconds( 10) ); + std::cout << "fiber " << id << " : shutdown io_service in 30 seconds" << std::endl; + boost::this_fiber::sleep_for( std::chrono::seconds( 30) ); std::cout << "fiber " << id << " : shutdown" << std::endl; - io_service.stop(); - }); + // stop io_service + io_svc.stop(); + // interrupt acceptor fiber + f.interrupt(); + std::cout << "fiber " << id << " terminates" << std::endl; + }).detach(); // run io_service - io_service.run(); - // join fiber shutdown the io_service + //io_svc.run(); + boost::fibers::asio::run( io_svc); f.join(); + std::cout << "fiber " << id << " (main-fiber) terminates" << std::endl; + std::cout << "done." << std::endl; return EXIT_SUCCESS; } catch ( std::exception const& e) { std::cerr << "Exception: " << e.what() << "\n"; diff --git a/examples/asio/round_robin.hpp b/examples/asio/round_robin.hpp index 0f096273..3c0b5e18 100644 --- a/examples/asio/round_robin.hpp +++ b/examples/asio/round_robin.hpp @@ -32,35 +32,51 @@ namespace asio { class round_robin : public boost::fibers::sched_algorithm { private: - std::size_t counter_{ 0 }; - boost::asio::io_service & io_svc_; - boost::asio::steady_timer suspend_timer_; - boost::fibers::scheduler::ready_queue_t ready_queue_{}; - boost::fibers::mutex mtx_{}; - boost::fibers::condition_variable cnd_{}; + boost::asio::io_service & io_svc_; + boost::asio::steady_timer suspend_timer_; + boost::fibers::scheduler::ready_queue_t ready_queue_{}; + boost::fibers::mutex mtx_{}; + boost::fibers::condition_variable cnd_{}; + std::size_t counter_{ 0 }; public: + struct service : public boost::asio::io_service::service { + static boost::asio::io_service::id id; + + round_robin * rr{ nullptr }; + std::unique_ptr< boost::asio::io_service::work > work_{}; + + service( boost::asio::io_service & io_svc) : + boost::asio::io_service::service( io_svc), + work_{ new boost::asio::io_service::work( io_svc) } { + } + + service( service const&) = delete; + service & operator=( service const&) = delete; + + bool has_ready_fibers() const noexcept { + return rr->has_ready_fibers(); + } + + void poll() { + // block this fiber till all pending (ready) fibers are processed + // == round_robin::suspend_until() has been called + std::unique_lock< boost::fibers::mutex > lk( rr->mtx_); + rr->cnd_.wait( lk); + } + + void shutdown_service() override final { + work_.reset(); + std::unique_lock< boost::fibers::mutex > lk( rr->mtx_); + rr->cnd_.notify_all(); + } + }; + round_robin( boost::asio::io_service & io_svc) : io_svc_( io_svc), suspend_timer_( io_svc_) { - io_svc_.post([this]() mutable { - while ( ! io_svc_.stopped() ) { - if ( has_ready_fibers() ) { - // run all pending handlers in round_robin - while ( io_svc_.poll() ); - // block this fiber till all pending (ready) fibers are processed - // == round_robin::suspend_until() has been called - std::unique_lock< boost::fibers::mutex > lk( mtx_); - cnd_.wait( lk); - } else { - // run one handler inside io_service - // if no handler available, block this thread - if ( ! io_svc_.run_one() ) { - break; - } - } - } - }); + boost::asio::add_service< service >( io_svc_, new service( io_svc_) ); + boost::asio::use_service< service >( io_svc_).rr = this; } void awakened( boost::fibers::context * ctx) noexcept { @@ -105,6 +121,25 @@ public: } }; +boost::asio::io_service::id round_robin::service::id; + +void run( boost::asio::io_service & io_svc) { + BOOST_ASSERT( boost::asio::has_service< round_robin::service >( io_svc) ); + while ( ! io_svc.stopped() ) { + if ( boost::asio::use_service< round_robin::service >( io_svc).has_ready_fibers() ) { + // run all pending handlers in round_robin + while ( io_svc.poll() ); + boost::asio::use_service< round_robin::service >( io_svc).poll(); + } else { + // run one handler inside io_service + // if no handler available, block this thread + if ( ! io_svc.run_one() ) { + break; + } + } + } +} + }}} #ifdef BOOST_HAS_ABI_HEADERS