diff --git a/examples/asio/echo_server_spawn.cpp b/examples/asio/echo_server_spawn.cpp index 46ccd85d..d44fdae5 100644 --- a/examples/asio/echo_server_spawn.cpp +++ b/examples/asio/echo_server_spawn.cpp @@ -74,6 +74,10 @@ void server( boost::asio::io_service & io_service, unsigned short port) { } } +void foo( std::string const& msg) { + std::cout << "foo(): " << msg << std::endl; +} + int main( int argc, char* argv[]) { try { if ( 2 != argc) { @@ -85,7 +89,16 @@ int main( int argc, char* argv[]) { boost::fibers::asio::spawn( io_service, boost::bind( server, boost::ref( io_service), std::atoi( argv[1]) ) ); - io_service.run(); + boost::fibers::fiber( foo, "abc").detach(); + boost::fibers::fiber( + [](){ + for ( int i = 0; i < 20; ++i) { + std::cout << "loop " << i << std::endl; + boost::this_fiber::sleep_for( std::chrono::seconds( 1) ); + } + } + ).detach(); + boost::fibers::asio::run( io_service); return EXIT_SUCCESS; } catch ( std::exception const& e) { std::cerr << "Exception: " << e.what() << "\n"; diff --git a/examples/asio/echo_server_yield.cpp b/examples/asio/echo_server_yield.cpp index a800bab8..91a93c2b 100644 --- a/examples/asio/echo_server_yield.cpp +++ b/examples/asio/echo_server_yield.cpp @@ -81,7 +81,7 @@ int main( int argc, char* argv[]) { boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service); boost::fibers::fiber( server, boost::ref( io_service), std::atoi( argv[1]) ).detach(); - io_service.run(); + boost::fibers::asio::run( io_service); return EXIT_SUCCESS; } catch ( std::exception const& e) { std::cerr << "Exception: " << e.what() << "\n"; diff --git a/examples/asio/ps/server.cpp b/examples/asio/ps/server.cpp index f94971c6..fa270b8e 100644 --- a/examples/asio/ps/server.cpp +++ b/examples/asio/ps/server.cpp @@ -384,7 +384,7 @@ int main( int argc, char* argv[]) { boost::fibers::fiber( accept_subscriber, boost::ref( io_service), 9998, boost::ref( reg) ).detach(); // dispatch - io_service.run(); + boost::fibers::asio::run( io_service); return EXIT_SUCCESS; } catch ( std::exception const& e) { std::cerr << "Exception: " << e.what() << "\n"; diff --git a/examples/asio/round_robin.hpp b/examples/asio/round_robin.hpp index f638b200..eba09240 100644 --- a/examples/asio/round_robin.hpp +++ b/examples/asio/round_robin.hpp @@ -7,6 +7,7 @@ #define BOOST_FIBERS_ASIO_ROUND_ROBIN_H #include +#include #include #include @@ -26,28 +27,31 @@ namespace boost { namespace fibers { namespace asio { -class round_robin : public boost::fibers::sched_algorithm { +class round_robin : public boost::fibers::sched_algorithm, + public boost::asio::io_service::service { private: + std::size_t worker_counter_{ 0 }; boost::asio::io_service & io_svc_; boost::asio::steady_timer suspend_timer_; - boost::asio::steady_timer keepalive_timer_; - std::chrono::steady_clock::duration keepalive_interval_; boost::fibers::scheduler::ready_queue_t ready_queue_{}; public: - round_robin( boost::asio::io_service & io_svc, - std::chrono::steady_clock::duration keepalive_interval = std::chrono::milliseconds(250) ) : + static boost::asio::io_service::id id; + + round_robin( boost::asio::io_service & io_svc) : + boost::asio::io_service::service( io_svc), io_svc_( io_svc), - suspend_timer_( io_svc_), - keepalive_timer_( io_svc_), - keepalive_interval_( keepalive_interval) { - on_empty_io_service(); + suspend_timer_( io_svc_) { + boost::asio::add_service( io_svc, this); } void awakened( boost::fibers::context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( ! ctx->ready_is_linked() ); ctx->ready_link( ready_queue_); + if ( ! ctx->is_dispatcher_context() ) { + ++worker_counter_; + } } boost::fibers::context * pick_next() noexcept { @@ -57,15 +61,20 @@ public: ready_queue_.pop_front(); BOOST_ASSERT( nullptr != ctx); BOOST_ASSERT( ! ctx->ready_is_linked() ); + if ( ! ctx->is_dispatcher_context() ) { + BOOST_ASSERT( 0 < worker_counter_); + --worker_counter_; + } } return ctx; } bool has_ready_fibers() const noexcept { - return ! ready_queue_.empty(); + return 0 < worker_counter_; } void suspend_until( std::chrono::steady_clock::time_point const& suspend_time) noexcept { + fprintf(stderr,"round_robin::suspend_until()\n"); suspend_timer_.expires_at( suspend_time); boost::system::error_code ignored_ec; suspend_timer_.async_wait( boost::fibers::asio::yield[ignored_ec]); @@ -75,14 +84,34 @@ public: suspend_timer_.expires_at( std::chrono::steady_clock::now() ); } - void on_empty_io_service() { + void poll() { io_svc_.post([](){ boost::this_fiber::yield(); }); - keepalive_timer_.expires_from_now( keepalive_interval_); - boost::system::error_code ignored_ec; - keepalive_timer_.async_wait( std::bind( & round_robin::on_empty_io_service, this) ); + } + + void shutdown_service() { } }; +boost::asio::io_service::id round_robin::id; + +void run( boost::asio::io_service & io_svc) { + BOOST_ASSERT( boost::asio::has_service< round_robin >( io_svc) ); + while ( ! io_svc.stopped() ) { + if ( ! boost::asio::use_service< round_robin >( io_svc).has_ready_fibers() ) { + fprintf(stderr,"before io_svc.run_one()\n"); + if ( ! io_svc.run_one() ) { + fprintf(stderr,"after io_svc.run_one() == false\n"); + break; + } + fprintf(stderr,"after io_svc.run_one() == true\n"); + } else { + while ( io_svc.poll() ) { + } + boost::asio::use_service< round_robin >( io_svc).poll(); + } + } +} + }}} #ifdef BOOST_HAS_ABI_HEADERS