diff --git a/build/Jamfile.v2 b/build/Jamfile.v2 index e329684c..ff17d249 100644 --- a/build/Jamfile.v2 +++ b/build/Jamfile.v2 @@ -54,6 +54,7 @@ explicit yield_sources ; lib boost_fibers : allocator_sources yield_sources + asio/io_service.cpp barrier.cpp condition.cpp detail/fiber_base.cpp diff --git a/examples/Jamfile.v2 b/examples/Jamfile.v2 index 7a3a8ca1..8c756560 100644 --- a/examples/Jamfile.v2 +++ b/examples/Jamfile.v2 @@ -15,6 +15,7 @@ project boost/fiber/example/cpp03 /boost/thread//boost_thread static multi + BOOST_ASIO_ENABLE_HANDLER_TRACKING ; exe barrier : barrier.cpp ; @@ -25,3 +26,4 @@ exe simple : simple.cpp ; exe segmented_stack : segmented_stack.cpp ; exe asio/daytime_client : asio/daytime_client.cpp ; +exe asio/daytime_client2 : asio/daytime_client2.cpp ; diff --git a/examples/asio/daytime_client.cpp b/examples/asio/daytime_client.cpp index 78014400..e53ace97 100644 --- a/examples/asio/daytime_client.cpp +++ b/examples/asio/daytime_client.cpp @@ -66,12 +66,13 @@ void get_daytime(boost::asio::io_service& io_service, const char* hostname) { std::cerr << e.what() << std::endl; } - io_service.stop(); + io_service.stop(); } int main( int argc, char* argv[]) { - boost::fibers::round_robin ds; + boost::asio::io_service io_service; + boost::fibers::asio::io_service ds(io_service); boost::fibers::scheduling_algorithm( & ds); try { @@ -81,11 +82,6 @@ int main( int argc, char* argv[]) return 1; } - // We run the io_service off in its own thread so that it operates - // completely asynchronously with respect to the rest of the program. - boost::asio::io_service io_service; - boost::asio::io_service::work work(io_service); - boost::fibers::fiber fiber( boost::bind( get_daytime, boost::ref( io_service), argv[1]) ); io_service.run(); fiber.join(); diff --git a/examples/asio/daytime_client2.cpp b/examples/asio/daytime_client2.cpp new file mode 100644 index 00000000..92fa7ba8 --- /dev/null +++ b/examples/asio/daytime_client2.cpp @@ -0,0 +1,86 @@ +// +// daytime_client.cpp +// ~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke + +#include + +#include +#include +#include +#include +#include +#include + +#include + +using boost::asio::ip::udp; + +void get_daytime(boost::asio::io_service& io_service, const char* hostname) +{ + try + { + udp::resolver resolver(io_service); + + udp::resolver::iterator iter = + resolver.async_resolve( + udp::resolver::query( udp::v4(), hostname, "daytime"), + boost::fibers::asio::yield); + + udp::socket socket(io_service, udp::v4()); + + boost::array send_buf = {{ 0 }}; + std::size_t send_length = + socket.async_send_to(boost::asio::buffer(send_buf), + *iter, boost::fibers::asio::yield); + (void)send_length; + + boost::array recv_buf; + udp::endpoint sender_endpoint; + std::size_t recv_length = + socket.async_receive_from( + boost::asio::buffer(recv_buf), + sender_endpoint, + boost::fibers::asio::yield); + + std::cout.write( + recv_buf.data(), + recv_length); + } + catch (boost::system::system_error& e) + { + std::cerr << e.what() << std::endl; + } + io_service.stop(); +} + +int main( int argc, char* argv[]) +{ + boost::asio::io_service io_service; + boost::fibers::asio::io_service ds(io_service); + boost::fibers::scheduling_algorithm( & ds); + try + { + if (argc != 2) + { + std::cerr << "Usage: daytime_client " << std::endl; + return 1; + } + + boost::fibers::fiber fiber( boost::bind( get_daytime, boost::ref( io_service), argv[1]) ); + io_service.run(); + fiber.join(); + } + catch ( std::exception& e) + { + std::cerr << e.what() << std::endl; + } + + return 0; +} diff --git a/include/boost/fiber/algorithm.hpp b/include/boost/fiber/algorithm.hpp index c228fb5e..2c1ae4cc 100644 --- a/include/boost/fiber/algorithm.hpp +++ b/include/boost/fiber/algorithm.hpp @@ -8,12 +8,10 @@ #include #include -#include -#include #include #include -#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX diff --git a/include/boost/fiber/all.hpp b/include/boost/fiber/all.hpp index d4dc3a7a..aec32bfa 100644 --- a/include/boost/fiber/all.hpp +++ b/include/boost/fiber/all.hpp @@ -8,7 +8,9 @@ #define BOOST_FIBERS_H #include +#include #include +#include #include #include #include diff --git a/include/boost/fiber/asio/detail/yield.hpp b/include/boost/fiber/asio/detail/yield.hpp new file mode 100644 index 00000000..f814ceab --- /dev/null +++ b/include/boost/fiber/asio/detail/yield.hpp @@ -0,0 +1,182 @@ +#ifndef BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP +#define BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP + +#include +#include +#include +#include +#include +#include + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { +namespace detail { + +template< typename T > +class yield_handler +{ +public: + yield_handler( yield_t const& y) : + fiber_( boost::fibers::detail::scheduler::instance().active() ), + ec_( y.ec_), value_( 0) + {} + + void operator()( T t) + { + * ec_ = boost::system::error_code(); + * value_ = t; + fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( fiber_); + } + + void operator()( boost::system::error_code const& ec, T t) + { + * ec_ = ec; + * value_ = t; + fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( fiber_); + } + +//private: + boost::fibers::detail::fiber_base::ptr_t fiber_; + boost::system::error_code * ec_; + T * value_; +}; + +// Completion handler to adapt a void promise as a completion handler. +template<> +class yield_handler< void > +{ +public: + yield_handler( yield_t const& y) : + fiber_( boost::fibers::detail::scheduler::instance().active() ), + ec_( y.ec_) + {} + + void operator()() + { + * ec_ = boost::system::error_code(); + fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( fiber_); + } + + void operator()( boost::system::error_code const& ec) + { + * ec_ = ec; + fiber_->set_ready(); + boost::fibers::detail::scheduler::instance().spawn( fiber_); + } + +//private: + boost::fibers::detail::fiber_base::ptr_t fiber_; + boost::system::error_code * ec_; +}; + +} // namespace detail +} // namespace asio +} // namespace fibers +} // namespace boost + +namespace boost { +namespace asio { + +template< typename T > +class async_result< boost::fibers::asio::detail::yield_handler< T > > +{ +public: + typedef T type; + + explicit async_result( boost::fibers::asio::detail::yield_handler< T > & h) + { + out_ec_ = h.ec_; + if ( ! out_ec_) h.ec_ = & ec_; + h.value_ = & value_; + } + + type get() + { + boost::fibers::detail::scheduler::instance().active()->set_waiting(); + boost::fibers::detail::scheduler::instance().active()->suspend(); + if ( ! out_ec_ && ec_) + throw_exception( boost::system::system_error( ec_) ); + return value_; + } + +private: + boost::system::error_code * out_ec_; + boost::system::error_code ec_; + type value_; +}; + +template<> +class async_result< boost::fibers::asio::detail::yield_handler< void > > +{ +public: + typedef void type; + + explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h) + { + out_ec_ = h.ec_; + if ( ! out_ec_) h.ec_ = & ec_; + } + + void get() + { + boost::fibers::detail::scheduler::instance().active()->set_waiting(); + boost::fibers::detail::scheduler::instance().active()->suspend(); + if ( ! out_ec_ && ec_) + throw_exception( boost::system::system_error( ec_) ); + } + +private: + boost::system::error_code * out_ec_; + boost::system::error_code ec_; +}; + +// Handler type specialisation for use_future. +template< typename ReturnType > +struct handler_type< + boost::fibers::asio::yield_t, + ReturnType() +> +{ typedef boost::fibers::asio::detail::yield_handler< void > type; }; + +// Handler type specialisation for use_future. +template< typename ReturnType, typename Arg1 > +struct handler_type< + boost::fibers::asio::yield_t, + ReturnType( Arg1) +> +{ typedef boost::fibers::asio::detail::yield_handler< Arg1 > type; }; + +// Handler type specialisation for use_future. +template< typename ReturnType > +struct handler_type< + boost::fibers::asio::yield_t, + ReturnType( boost::system::error_code) +> +{ typedef boost::fibers::asio::detail::yield_handler< void > type; }; + +// Handler type specialisation for use_future. +template< typename ReturnType, typename Arg2 > +struct handler_type< + boost::fibers::asio::yield_t, + ReturnType( boost::system::error_code, Arg2) +> +{ typedef boost::fibers::asio::detail::yield_handler< Arg2 > type; }; + +} // namespace asio +} // namespace boost + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP diff --git a/include/boost/fiber/asio/io_service.hpp b/include/boost/fiber/asio/io_service.hpp new file mode 100644 index 00000000..556328ac --- /dev/null +++ b/include/boost/fiber/asio/io_service.hpp @@ -0,0 +1,57 @@ + +// Copyright Oliver Kowalke, Christopher M. Kohlhoff 2009. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_ASIO_IO_SERVICE_HPP +#define BOOST_FIBERS_ASIO_IO_SERVICE_HPP + +#include + +#include +#include + +#include +#include +#include + +namespace boost { +namespace fibers { +namespace asio { + +class BOOST_FIBERS_DECL io_service : public algorithm +{ +private: + typedef std::deque< detail::fiber_base::ptr_t > wqueue_t; + typedef std::deque< boost::asio::io_service::work > wqueue_work_t; + + boost::asio::io_service& io_service_; + detail::fiber_base::ptr_t active_fiber_; + wqueue_t wqueue_; + wqueue_work_t wqueue_work_; + +public: + io_service( boost::asio::io_service & svc) BOOST_NOEXCEPT; + + ~io_service() BOOST_NOEXCEPT; + + void spawn( detail::fiber_base::ptr_t const&); + + void priority( detail::fiber_base::ptr_t const&, int); + + void join( detail::fiber_base::ptr_t const&); + + detail::fiber_base::ptr_t active() BOOST_NOEXCEPT + { return active_fiber_; } + + bool run(); + + void wait(); + + void yield(); +}; + +}}} + +#endif // BOOST_FIBERS_ASIO_IO_SERVICE_HPP diff --git a/include/boost/fiber/asio/use_future.hpp b/include/boost/fiber/asio/use_future.hpp index e0b5d517..d3e3c9ec 100644 --- a/include/boost/fiber/asio/use_future.hpp +++ b/include/boost/fiber/asio/use_future.hpp @@ -13,6 +13,8 @@ #ifndef BOOST_FIBERS_ASIO_USE_FUTURE_HPP #define BOOST_FIBERS_ASIO_USE_FUTURE_HPP +#include + #include #ifdef BOOST_HAS_ABI_HEADERS @@ -45,7 +47,7 @@ public: typedef Allocator allocator_type; /// Construct using default-constructed allocator. - use_future_t() + BOOST_CONSTEXPR use_future_t() {} /// Construct using specified allocator. @@ -70,7 +72,7 @@ private: /** * See the documentation for boost::asio::use_future_t for a usage example. */ -BOOST_CONSTEXPR use_future_t<> use_future; +BOOST_CONSTEXPR_OR_CONST use_future_t<> use_future; } // namespace asio } // namespace fibers diff --git a/include/boost/fiber/asio/yield.hpp b/include/boost/fiber/asio/yield.hpp new file mode 100644 index 00000000..cb0d8c96 --- /dev/null +++ b/include/boost/fiber/asio/yield.hpp @@ -0,0 +1,56 @@ +// +// yield.hpp +// ~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// modified by Oliver Kowalke +// + +#ifndef BOOST_FIBERS_ASIO_YIELD_HPP +#define BOOST_FIBERS_ASIO_YIELD_HPP + +#include + +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +class yield_t +{ +public: + BOOST_CONSTEXPR yield_t() : + ec_( 0) + {} + + yield_t operator[]( boost::system::error_code & ec) + { + yield_t tmp; + tmp.ec_ = & ec; + return tmp; + } + +//private: + boost::system::error_code * ec_; +}; + +BOOST_CONSTEXPR_OR_CONST yield_t yield; + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#include + +#endif // BOOST_FIBERS_ASIO_YIELD_HPP diff --git a/include/boost/fiber/detail/fiber_base.hpp b/include/boost/fiber/detail/fiber_base.hpp index 92fa5ada..36032501 100644 --- a/include/boost/fiber/detail/fiber_base.hpp +++ b/include/boost/fiber/detail/fiber_base.hpp @@ -32,6 +32,8 @@ namespace boost { namespace fibers { namespace detail { +struct forced_unwind {}; + class BOOST_FIBERS_DECL fiber_base : public notify { public: diff --git a/include/boost/fiber/detail/fiber_object.hpp b/include/boost/fiber/detail/fiber_object.hpp index 344cfa89..f3ac8097 100644 --- a/include/boost/fiber/detail/fiber_object.hpp +++ b/include/boost/fiber/detail/fiber_object.hpp @@ -30,8 +30,6 @@ namespace boost { namespace fibers { namespace detail { -struct forced_unwind {}; - template< typename Fiber > void trampoline( intptr_t vp) { diff --git a/include/boost/fiber/round_robin.hpp b/include/boost/fiber/round_robin.hpp index 5b5f07e4..36d97cc6 100644 --- a/include/boost/fiber/round_robin.hpp +++ b/include/boost/fiber/round_robin.hpp @@ -6,19 +6,13 @@ #ifndef BOOST_FIBERS_DEFAULT_SCHEDULER_H #define BOOST_FIBERS_DEFAULT_SCHEDULER_H -#include #include -#include -#include -#include #include -#include -#include +#include #include #include -#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX diff --git a/src/asio/io_service.cpp b/src/asio/io_service.cpp new file mode 100644 index 00000000..d7c394a6 --- /dev/null +++ b/src/asio/io_service.cpp @@ -0,0 +1,222 @@ + +// Copyright Oliver Kowalke, Christopher M. Kohlhoff 2009. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#define BOOST_FIBERS_SOURCE + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace asio { + +io_service::io_service( boost::asio::io_service & svc) BOOST_NOEXCEPT : + io_service_( svc), + active_fiber_() +{} + +io_service::~io_service() BOOST_NOEXCEPT +{ +#if 0 + BOOST_FOREACH( detail::fiber_base::ptr_t const& p, rqueue_) + { p->release(); } + + BOOST_FOREACH( detail::fiber_base::ptr_t const& p, wqueue_) + { p->release(); } +#endif +} + +void +io_service::spawn( detail::fiber_base::ptr_t const& f) +{ + BOOST_ASSERT( f); + BOOST_ASSERT( f->is_ready() ); + + // store active fiber in local var + detail::fiber_base::ptr_t tmp = active_fiber_; + try + { + // assign new fiber to active fiber + active_fiber_ = f; + // set active fiber to state_running + active_fiber_->set_running(); + // resume active fiber + active_fiber_->resume(); + // fiber is resumed + + BOOST_ASSERT( f == active_fiber_); + } + catch (...) + { + // reset active fiber to previous + active_fiber_ = tmp; + throw; + } + // reset active fiber to previous + active_fiber_ = tmp; +} + +bool +io_service::run() +{ + // loop over waiting queue + wqueue_t wqueue; + wqueue_work_t wqueue_work; + BOOST_FOREACH( detail::fiber_base::ptr_t const& f, wqueue_) + { + BOOST_ASSERT( ! f->is_running() ); + BOOST_ASSERT( ! f->is_terminated() ); + + // set fiber to state_ready if interruption was requested + // or the fiber was woken up + if ( f->interruption_requested() ) + f->set_ready(); + if ( f->is_ready() ) + { + io_service_.post( + [this, f]() + { + if ( f->is_waiting() ) + { + wqueue_.push_back( f); + wqueue_work_.push_back( boost::asio::io_service::work( io_service_) ); + } + else if ( f->is_ready() ) spawn( f); + else BOOST_ASSERT_MSG( false, "fiber with invalid state in ready-queue"); + }); + } + else + { + wqueue.push_back( f); + wqueue_work.push_back( boost::asio::io_service::work( io_service_) ); + } + } + // exchange local with global waiting queue + wqueue_.swap( wqueue); + wqueue_work_.swap( wqueue_work); + + return io_service_.run_one() > 0; +} + +void +io_service::wait() +{ + BOOST_ASSERT( active_fiber_); + BOOST_ASSERT( active_fiber_->is_running() ); + + // set active_fiber to state_waiting + active_fiber_->set_waiting(); + // push active fiber to wqueue_ + wqueue_.push_back( active_fiber_); + wqueue_work_.push_back( boost::asio::io_service::work( io_service_) ); + // store active fiber in local var + detail::fiber_base::ptr_t tmp = active_fiber_; + // suspend active fiber + active_fiber_->suspend(); + // fiber is resumed + + BOOST_ASSERT( tmp == active_fiber_); + BOOST_ASSERT( active_fiber_->is_running() ); +} + +void +io_service::yield() +{ + BOOST_ASSERT( active_fiber_); + BOOST_ASSERT( active_fiber_->is_running() ); + + // set active fiber to state_waiting + active_fiber_->set_ready(); + // push active fiber to wqueue_ + wqueue_.push_back( active_fiber_); + wqueue_work_.push_back( boost::asio::io_service::work( io_service_) ); + // store active fiber in local var + detail::fiber_base::ptr_t tmp = active_fiber_; + // suspend acitive fiber + active_fiber_->suspend(); + // fiber is resumed + + BOOST_ASSERT( tmp == active_fiber_); + BOOST_ASSERT( active_fiber_->is_running() ); +} + +void +io_service::join( detail::fiber_base::ptr_t const& f) +{ + BOOST_ASSERT( f); + BOOST_ASSERT( f != active_fiber_); + + if ( active_fiber_) + { + // set active fiber to state_waiting + active_fiber_->set_waiting(); + // push active fiber to wqueue_ + wqueue_.push_back( active_fiber_); + wqueue_work_.push_back( boost::asio::io_service::work( io_service_)); + // add active fiber to joinig-list of f + if ( ! f->join( active_fiber_) ) + // f must be already terminated therefore we set + // active fiber to state_ready + // FIXME: better state_running and no suspend + active_fiber_->set_ready(); + // store active fiber in local var + detail::fiber_base::ptr_t tmp = active_fiber_; + // suspend fiber until f terminates + active_fiber_->suspend(); + // fiber is resumed by f + + BOOST_ASSERT( tmp == active_fiber_); + BOOST_ASSERT( active_fiber_->is_running() ); + + // check if fiber was interrupted + this_fiber::interruption_point(); + } + else + { + while ( ! f->is_terminated() ) + run(); + } + + // check if joined fiber has an exception + // and rethrow exception + if ( f->has_exception() ) f->rethrow(); + + BOOST_ASSERT( f->is_terminated() ); +} + +void +io_service::priority( detail::fiber_base::ptr_t const& f, int prio) +{ + BOOST_ASSERT( f); + + // set only priority to fiber + // round-robin does not respect priorities + f->priority( prio); +} + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif diff --git a/src/round_robin.cpp b/src/round_robin.cpp index 2d6372f8..d06a1929 100644 --- a/src/round_robin.cpp +++ b/src/round_robin.cpp @@ -4,7 +4,7 @@ // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) -#define BOOST_wqueue_SOURCE +#define BOOST_FIBERS_SOURCE #include @@ -151,7 +151,7 @@ round_robin::yield() wqueue_.push_back( active_fiber_); // store active fiber in local var detail::fiber_base::ptr_t tmp = active_fiber_; - // suspend fiber + // suspend acitive fiber active_fiber_->suspend(); // fiber is resumed