2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-13 12:22:36 +00:00

use C++11

This commit is contained in:
Oliver Kowalke
2014-12-27 19:07:42 +01:00
parent ddbdd91ced
commit 2f19be6d67
126 changed files with 19488 additions and 21505 deletions

View File

@@ -1,54 +0,0 @@
# Boost.Fiber Library Examples Jamfile
# Copyright Oliver Kowalke 2013.
# Distributed under the Boost Software License, Version 1.0.
# (See accompanying file LICENSE_1_0.txt or copy at
# http://www.boost.org/LICENSE_1_0.txt)
# For more information, see http://www.boost.org/
import common ;
import feature ;
import indirect ;
import modules ;
import os ;
import toolset ;
project boost/fiber/example
: requirements
<library>../../build//boost_fiber
<library>/boost/atomic//boost_atomic
<library>/boost/coroutine//boost_coroutine
<library>/boost/system//boost_system
<library>/boost/thread//boost_thread
<toolset>gcc-4.7,<segmented-stacks>on:<cxxflags>-fsplit-stack
<toolset>gcc-4.7,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS
<toolset>gcc-4.8,<segmented-stacks>on:<cxxflags>-fsplit-stack
<toolset>gcc-4.8,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS
<link>static
<threading>multi
# <define>BOOST_ASIO_ENABLE_HANDLER_TRACKING
;
exe barrier : barrier.cpp ;
exe future : future.cpp test_future.cpp ;
exe futures_mt : futures_mt.cpp ;
exe interrupt : interrupt.cpp ;
exe join : join.cpp ;
exe ping_pong : ping_pong.cpp ;
exe segmented_stack : segmented_stack.cpp ;
exe simple : simple.cpp test_fiber.cpp ;
exe migrate_fibers
: migration/migrate_fibers.cpp
migration/workstealing_round_robin.cpp
;
exe asio/daytime_client : asio/daytime_client.cpp ;
exe asio/daytime_client2 : asio/daytime_client2.cpp ;
exe asio/echo_client : asio/echo_client.cpp ;
exe asio/echo_client2 : asio/echo_client2.cpp ;
exe asio/echo_server : asio/echo_server.cpp ;
exe asio/echo_server2 : asio/echo_server2.cpp ;
exe asio/publish_subscribe/server : asio/publish_subscribe/server.cpp ;
exe asio/publish_subscribe/publisher : asio/publish_subscribe/publisher.cpp ;
exe asio/publish_subscribe/subscriber : asio/publish_subscribe/subscriber.cpp ;

View File

@@ -1,101 +0,0 @@
//
// daytime_client.cpp
// ~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// modified by Oliver Kowalke
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/bind.hpp>
#include <boost/ref.hpp>
#include <boost/system/system_error.hpp>
#include <boost/fiber/all.hpp>
#include "loop.hpp"
#include "spawn.hpp"
#include "use_future.hpp"
using boost::asio::ip::udp;
void get_daytime(boost::asio::io_service& io_service, const char* hostname)
{
try
{
udp::resolver resolver(io_service);
boost::fibers::future<udp::resolver::iterator> iter =
resolver.async_resolve(
udp::resolver::query( udp::v4(), hostname, "daytime"),
boost::fibers::asio::use_future);
// The async_resolve operation above returns the endpoint iterator as a
// future value that is not retrieved ...
udp::socket socket(io_service, udp::v4());
boost::array<char, 1> send_buf = {{ 0 }};
boost::fibers::future<std::size_t> send_length =
socket.async_send_to(boost::asio::buffer(send_buf),
*iter.get(), // ... until here. This call may block.
boost::fibers::asio::use_future);
// Do other things here while the send completes.
send_length.get(); // Blocks until the send is complete. Throws any errors.
boost::array<char, 128> recv_buf;
udp::endpoint sender_endpoint;
boost::fibers::future<std::size_t> recv_length =
socket.async_receive_from(
boost::asio::buffer(recv_buf),
sender_endpoint,
boost::fibers::asio::use_future);
// Do other things here while the receive completes.
std::cout.write(
recv_buf.data(),
recv_length.get()); // Blocks until receive is complete.
}
catch (boost::system::system_error& e)
{
std::cerr << e.what() << std::endl;
}
io_service.stop();
}
int main( int argc, char* argv[])
{
boost::asio::io_service io_service;
try
{
if (argc != 2)
{
std::cerr << "Usage: daytime_client <host>" << std::endl;
return 1;
}
boost::fibers::asio::spawn( io_service,
boost::bind( get_daytime,
boost::ref( io_service), argv[1]) );
boost::fibers::fiber f(
boost::bind( boost::fibers::asio::run_service, boost::ref( io_service) ) );
f.join();
}
catch ( std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}

View File

@@ -1,92 +0,0 @@
//
// daytime_client.cpp
// ~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// modified by Oliver Kowalke
#include <iostream>
#include <boost/array.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/bind.hpp>
#include <boost/ref.hpp>
#include <boost/system/system_error.hpp>
#include <boost/fiber/all.hpp>
#include "loop.hpp"
#include "spawn.hpp"
#include "yield.hpp"
using boost::asio::ip::udp;
void get_daytime(boost::asio::io_service& io_service, const char* hostname)
{
try
{
udp::resolver resolver(io_service);
udp::resolver::iterator iter =
resolver.async_resolve(
udp::resolver::query( udp::v4(), hostname, "daytime"),
boost::fibers::asio::yield);
udp::socket socket(io_service, udp::v4());
boost::array<char, 1> send_buf = {{ 0 }};
std::size_t send_length =
socket.async_send_to(boost::asio::buffer(send_buf),
*iter, boost::fibers::asio::yield);
(void)send_length;
boost::array<char, 128> recv_buf;
udp::endpoint sender_endpoint;
std::size_t recv_length =
socket.async_receive_from(
boost::asio::buffer(recv_buf),
sender_endpoint,
boost::fibers::asio::yield);
std::cout.write(
recv_buf.data(),
recv_length);
}
catch (boost::system::system_error& e)
{
std::cerr << e.what() << std::endl;
}
io_service.stop();
}
int main( int argc, char* argv[])
{
boost::asio::io_service io_service;
try
{
if (argc != 2)
{
std::cerr << "Usage: daytime_client <host>" << std::endl;
return 1;
}
boost::fibers::asio::spawn( io_service,
boost::bind( get_daytime,
boost::ref( io_service), argv[1]) );
boost::fibers::fiber f(
boost::bind( boost::fibers::asio::run_service, boost::ref( io_service) ) );
f.join();
}
catch ( std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}

View File

@@ -1,352 +0,0 @@
//
// detail/spawn.hpp
// ~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_FIBERS_ASIO_DETAIL_SPAWN_HPP
#define BOOST_FIBERS_ASIO_DETAIL_SPAWN_HPP
#include <boost/asio/async_result.hpp>
#include <boost/asio/detail/config.hpp>
#include <boost/asio/detail/handler_alloc_helpers.hpp>
#include <boost/asio/detail/handler_cont_helpers.hpp>
#include <boost/asio/detail/handler_invoke_helpers.hpp>
#include <boost/asio/detail/noncopyable.hpp>
#include <boost/asio/detail/shared_ptr.hpp>
#include <boost/asio/handler_type.hpp>
#include <boost/config.hpp>
#include <boost/fiber/detail/scheduler.hpp>
#include <boost/fiber/fiber.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
namespace detail {
template< typename Handler, typename T >
class fiber_handler
{
public:
fiber_handler( basic_yield_context< Handler > ctx) :
fiber_( ctx.fiber_),
handler_( ctx.handler_),
ec_( ctx.ec_),
value_( 0)
{}
void operator()( T value)
{
* ec_ = boost::system::error_code();
* value_ = value;
fiber_->set_ready();
}
void operator()( boost::system::error_code ec, T value)
{
* ec_ = ec;
* value_ = value;
fiber_->set_ready();
}
//private:
boost::fibers::detail::fiber_base * fiber_;
Handler & handler_;
boost::system::error_code * ec_;
T * value_;
};
template< typename Handler >
class fiber_handler< Handler, void >
{
public:
fiber_handler( basic_yield_context< Handler > ctx) :
fiber_( ctx.fiber_),
handler_( ctx.handler_),
ec_( ctx.ec_)
{}
void operator()()
{
* ec_ = boost::system::error_code();
fiber_->set_ready();
}
void operator()( boost::system::error_code ec)
{
* ec_ = ec;
fiber_->set_ready();
}
//private:
boost::fibers::detail::fiber_base * fiber_;
Handler & handler_;
boost::system::error_code * ec_;
};
template< typename Handler, typename T >
void* asio_handler_allocate( std::size_t size,
fiber_handler< Handler, T > * this_handler)
{
return boost_asio_handler_alloc_helpers::allocate(
size, this_handler->handler_);
}
template< typename Handler, typename T >
void asio_handler_deallocate( void* pointer, std::size_t size,
fiber_handler< Handler, T > * this_handler)
{
boost_asio_handler_alloc_helpers::deallocate(
pointer, size, this_handler->handler_);
}
template< typename Handler, typename T >
bool asio_handler_is_continuation( fiber_handler<Handler, T> *)
{ return true; }
template< typename Function, typename Handler, typename T >
void asio_handler_invoke( Function & function,
fiber_handler< Handler, T > * this_handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, this_handler->handler_);
}
template< typename Function, typename Handler, typename T >
void asio_handler_invoke( Function const& function,
fiber_handler< Handler, T > * this_handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, this_handler->handler_);
}
} // namespace detail
} // namespace asio
} // namespace fibers
namespace asio {
#if !defined(GENERATING_DOCUMENTATION)
template< typename Handler, typename ReturnType >
struct handler_type<
boost::fibers::asio::basic_yield_context< Handler >,
ReturnType()
>
{ typedef boost::fibers::asio::detail::fiber_handler< Handler, void > type; };
template< typename Handler, typename ReturnType, typename Arg1 >
struct handler_type<
boost::fibers::asio::basic_yield_context< Handler >,
ReturnType( Arg1)
>
{ typedef boost::fibers::asio::detail::fiber_handler< Handler, Arg1 > type; };
template< typename Handler, typename ReturnType >
struct handler_type<
boost::fibers::asio::basic_yield_context< Handler >,
ReturnType( boost::system::error_code)
>
{ typedef boost::fibers::asio::detail::fiber_handler <Handler, void > type; };
template< typename Handler, typename ReturnType, typename Arg2 >
struct handler_type<
boost::fibers::asio::basic_yield_context< Handler >,
ReturnType( boost::system::error_code, Arg2)
>
{ typedef boost::fibers::asio::detail::fiber_handler< Handler, Arg2 > type; };
template< typename Handler, typename T >
class async_result< boost::fibers::asio::detail::fiber_handler< Handler, T > >
{
public:
typedef T type;
explicit async_result( boost::fibers::asio::detail::fiber_handler< Handler, T > & h) :
out_ec_( 0), ec_(), value_()
{
out_ec_ = h.ec_;
if ( ! out_ec_) h.ec_ = & ec_;
h.value_ = & value_;
}
type get()
{
fibers::detail::spinlock splk;
unique_lock< fibers::detail::spinlock > lk( splk);
boost::fibers::fm_wait(lk);
if ( ! out_ec_ && ec_) throw boost::system::system_error( ec_);
return value_;
}
private:
boost::system::error_code * out_ec_;
boost::system::error_code ec_;
type value_;
};
template< typename Handler >
class async_result< boost::fibers::asio::detail::fiber_handler< Handler, void > >
{
public:
typedef void type;
explicit async_result( boost::fibers::asio::detail::fiber_handler< Handler, void > & h) :
out_ec_( 0), ec_()
{
out_ec_ = h.ec_;
if (!out_ec_) h.ec_ = &ec_;
}
void get()
{
fibers::detail::spinlock splk;
unique_lock< fibers::detail::spinlock > lk( splk);
boost::fibers::fm_wait(lk);
if ( ! out_ec_ && ec_) throw boost::system::system_error( ec_);
}
private:
boost::system::error_code * out_ec_;
boost::system::error_code ec_;
};
} // namespace asio
namespace fibers {
namespace asio {
namespace detail {
template< typename Handler, typename Function >
struct spawn_data : private noncopyable
{
spawn_data( boost::asio::io_service& io_svc, BOOST_ASIO_MOVE_ARG( Handler) handler,
bool call_handler, BOOST_ASIO_MOVE_ARG( Function) function) :
io_svc_(io_svc),
handler_( BOOST_ASIO_MOVE_CAST( Handler)( handler) ),
call_handler_( call_handler),
function_( BOOST_ASIO_MOVE_CAST( Function)( function) )
{}
boost::asio::io_service& io_svc_;
boost::fibers::detail::fiber_base* fiber_;
Handler handler_;
bool call_handler_;
Function function_;
};
template< typename Handler, typename Function >
struct fiber_entry_point
{
void operator()()
{
shared_ptr< spawn_data< Handler, Function > > data( data_);
data->fiber_ = boost::fibers::fm_active();
const basic_yield_context< Handler > yield(
data->fiber_, data->handler_);
boost::asio::io_service::work w(data->io_svc_);
( data->function_)( yield);
if ( data->call_handler_)
( data->handler_)();
}
shared_ptr< spawn_data< Handler, Function > > data_;
};
template< typename Handler, typename Function >
struct spawn_helper
{
void operator()()
{
fiber_entry_point< Handler, Function > entry_point = { data_ };
boost::fibers::fiber fiber( attributes_, entry_point);
fiber.detach();
}
shared_ptr< spawn_data< Handler, Function > > data_;
boost::fibers::attributes attributes_;
};
inline void default_spawn_handler() {}
} // namespace detail
template< typename Handler, typename Function >
void spawn( boost::asio::io_service& io_service,
BOOST_ASIO_MOVE_ARG( Handler) handler,
BOOST_ASIO_MOVE_ARG( Function) function,
boost::fibers::attributes const& attributes)
{
detail::spawn_helper< Handler, Function > helper;
helper.data_.reset(
new detail::spawn_data< Handler, Function >(
io_service,
BOOST_ASIO_MOVE_CAST( Handler)( handler), true,
BOOST_ASIO_MOVE_CAST( Function)( function) ) );
helper.attributes_ = attributes;
boost_asio_handler_invoke_helpers::invoke(
helper, helper.data_->handler_);
}
template< typename Handler, typename Function >
void spawn( basic_yield_context< Handler > ctx,
BOOST_ASIO_MOVE_ARG( Function) function,
boost::fibers::attributes const& attributes)
{
Handler handler( ctx.handler_); // Explicit copy that might be moved from.
detail::spawn_helper< Handler, Function > helper;
helper.data_.reset(
new detail::spawn_data< Handler, Function >(
BOOST_ASIO_MOVE_CAST( Handler)( handler), false,
BOOST_ASIO_MOVE_CAST( Function)( function) ) );
helper.attributes_ = attributes;
boost_asio_handler_invoke_helpers::invoke(
helper, helper.data_->handler_);
}
template< typename Function >
void spawn( boost::asio::io_service::strand strand,
BOOST_ASIO_MOVE_ARG( Function) function,
boost::fibers::attributes const& attributes)
{
boost::fibers::asio::spawn(
strand.get_io_service(),
strand.wrap( & detail::default_spawn_handler),
BOOST_ASIO_MOVE_CAST( Function)( function),
attributes);
}
template< typename Function >
void spawn( boost::asio::io_service & io_service,
BOOST_ASIO_MOVE_ARG( Function) function,
boost::fibers::attributes const& attributes)
{
boost::fibers::asio::spawn(
boost::asio::io_service::strand( io_service),
BOOST_ASIO_MOVE_CAST( Function)( function),
attributes);
}
#endif // !defined(GENERATING_DOCUMENTATION)
} // namespace asio
} // namespace fibers
} // namespace boost
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_ASIO_DETAIL_SPAWN_HPP

View File

@@ -1,190 +0,0 @@
//
// use_future.hpp
// ~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// modified by Oliver Kowalke
//
#ifndef BOOST_FIBERS_ASIO_DETAIL_USE_FUTURE_HPP
#define BOOST_FIBERS_ASIO_DETAIL_USE_FUTURE_HPP
#include <memory>
#include <boost/asio/async_result.hpp>
#include <boost/asio/detail/config.hpp>
#include <boost/asio/handler_type.hpp>
#include <boost/make_shared.hpp>
#include <boost/move/move.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/system/error_code.hpp>
#include <boost/system/system_error.hpp>
#include <boost/thread/detail/memory.hpp>
#include <boost/throw_exception.hpp>
#include <boost/fiber/future/future.hpp>
#include <boost/fiber/future/promise.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace asio {
namespace detail {
// Completion handler to adapt a promise as a completion handler.
template< typename T >
class promise_handler
{
public:
// Construct from use_future special value.
template< typename Allocator >
promise_handler( boost::fibers::asio::use_future_t< Allocator > uf) :
promise_( boost::allocate_shared< boost::fibers::promise< T > >(
uf.get_allocator(), boost::allocator_arg, uf.get_allocator() ) )
{}
void operator()( T t)
{
promise_->set_value( t);
//boost::fibers::fm_run();
}
void operator()( boost::system::error_code const& ec, T t)
{
if (ec)
promise_->set_exception(
boost::copy_exception(
boost::system::system_error( ec) ) );
else
promise_->set_value( t);
// scheduler::run() resumes a ready fiber
// invoke scheduler::run() until no fiber was resumed
//boost::fibers::fm_run();
}
//private:
boost::shared_ptr< boost::fibers::promise< T > > promise_;
};
// Completion handler to adapt a void promise as a completion handler.
template<>
class promise_handler< void >
{
public:
// Construct from use_future special value. Used during rebinding.
template< typename Allocator >
promise_handler( boost::fibers::asio::use_future_t< Allocator > uf) :
promise_( boost::allocate_shared< boost::fibers::promise< void > >(
uf.get_allocator(), boost::allocator_arg, uf.get_allocator() ) )
{}
void operator()()
{
promise_->set_value();
//boost::fibers::fm_run();
}
void operator()( boost::system::error_code const& ec)
{
if ( ec)
promise_->set_exception(
boost::copy_exception(
boost::system::system_error( ec) ) );
else
promise_->set_value();
// scheduler::run() resumes a ready fiber
// invoke scheduler::run() until no fiber was resumed
//boost::fibers::fm_run();
}
//private:
boost::shared_ptr< boost::fibers::promise< void > > promise_;
};
// Ensure any exceptions thrown from the handler are propagated back to the
// caller via the future.
template< typename Function, typename T >
void asio_handler_invoke( Function f, promise_handler< T > * h)
{
boost::shared_ptr< boost::fibers::promise< T > > p( h->promise_);
try
{ f(); }
catch (...)
{ p->set_exception( boost::current_exception() ); }
}
} // namespace detail
#if !defined(GENERATING_DOCUMENTATION)
// Handler traits specialisation for promise_handler.
template< typename T >
class async_result< detail::promise_handler< T > >
{
public:
// The initiating function will return a future.
typedef boost::fibers::future< T > type;
// Constructor creates a new promise for the async operation, and obtains the
// corresponding future.
explicit async_result( detail::promise_handler< T > & h)
{ value_ = h.promise_->get_future(); }
// Obtain the future to be returned from the initiating function.
type get()
{ return boost::move( value_); }
private:
type value_;
};
// Handler type specialisation for use_future.
template< typename Allocator, typename ReturnType >
struct handler_type<
boost::fibers::asio::use_future_t< Allocator>,
ReturnType()
>
{ typedef detail::promise_handler< void > type; };
// Handler type specialisation for use_future.
template< typename Allocator, typename ReturnType, typename Arg1 >
struct handler_type<
boost::fibers::asio::use_future_t< Allocator >,
ReturnType( Arg1)
>
{ typedef detail::promise_handler< Arg1 > type; };
// Handler type specialisation for use_future.
template< typename Allocator, typename ReturnType >
struct handler_type<
boost::fibers::asio::use_future_t< Allocator >,
ReturnType( boost::system::error_code)
>
{ typedef detail::promise_handler< void > type; };
// Handler type specialisation for use_future.
template< typename Allocator, typename ReturnType, typename Arg2 >
struct handler_type<
boost::fibers::asio::use_future_t< Allocator >,
ReturnType( boost::system::error_code, Arg2)
>
{ typedef detail::promise_handler< Arg2 > type; };
#endif // !defined(GENERATING_DOCUMENTATION)
} // namespace asio
} // namespace boost
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_ASIO_DETAIL_USE_FUTURE_HPP

View File

@@ -1,180 +0,0 @@
#ifndef BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP
#define BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP
#include <boost/asio/async_result.hpp>
#include <boost/asio/detail/config.hpp>
#include <boost/asio/handler_type.hpp>
#include <boost/system/error_code.hpp>
#include <boost/system/system_error.hpp>
#include <boost/throw_exception.hpp>
#include <boost/fiber/all.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
namespace detail {
template< typename T >
class yield_handler
{
public:
yield_handler( yield_t const& y) :
fiber_( boost::fibers::fm_active() ),
ec_( y.ec_), value_( 0)
{}
void operator()( T t)
{
* ec_ = boost::system::error_code();
* value_ = t;
fiber_->set_ready();
}
void operator()( boost::system::error_code const& ec, T t)
{
* ec_ = ec;
* value_ = t;
fiber_->set_ready();
}
//private:
boost::fibers::detail::fiber_base * fiber_;
boost::system::error_code * ec_;
T * value_;
};
// Completion handler to adapt a void promise as a completion handler.
template<>
class yield_handler< void >
{
public:
yield_handler( yield_t const& y) :
fiber_( boost::fibers::fm_active() ),
ec_( y.ec_)
{}
void operator()()
{
* ec_ = boost::system::error_code();
fiber_->set_ready();
}
void operator()( boost::system::error_code const& ec)
{
* ec_ = ec;
fiber_->set_ready();
}
//private:
boost::fibers::detail::fiber_base * fiber_;
boost::system::error_code * ec_;
};
} // namespace detail
} // namespace asio
} // namespace fibers
} // namespace boost
namespace boost {
namespace asio {
template< typename T >
class async_result< boost::fibers::asio::detail::yield_handler< T > >
{
public:
typedef T type;
explicit async_result( boost::fibers::asio::detail::yield_handler< T > & h)
{
out_ec_ = h.ec_;
if ( ! out_ec_) h.ec_ = & ec_;
h.value_ = & value_;
}
type get()
{
fibers::detail::spinlock splk;
unique_lock< fibers::detail::spinlock > lk( splk);
boost::fibers::fm_wait(lk);
if ( ! out_ec_ && ec_)
throw_exception( boost::system::system_error( ec_) );
return value_;
}
private:
boost::system::error_code * out_ec_;
boost::system::error_code ec_;
type value_;
};
template<>
class async_result< boost::fibers::asio::detail::yield_handler< void > >
{
public:
typedef void type;
explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h)
{
out_ec_ = h.ec_;
if ( ! out_ec_) h.ec_ = & ec_;
}
void get()
{
fibers::detail::spinlock splk;
unique_lock< fibers::detail::spinlock > lk( splk);
boost::fibers::fm_wait(lk);
if ( ! out_ec_ && ec_)
throw_exception( boost::system::system_error( ec_) );
}
private:
boost::system::error_code * out_ec_;
boost::system::error_code ec_;
};
// Handler type specialisation for use_future.
template< typename ReturnType >
struct handler_type<
boost::fibers::asio::yield_t,
ReturnType()
>
{ typedef boost::fibers::asio::detail::yield_handler< void > type; };
// Handler type specialisation for use_future.
template< typename ReturnType, typename Arg1 >
struct handler_type<
boost::fibers::asio::yield_t,
ReturnType( Arg1)
>
{ typedef boost::fibers::asio::detail::yield_handler< Arg1 > type; };
// Handler type specialisation for use_future.
template< typename ReturnType >
struct handler_type<
boost::fibers::asio::yield_t,
ReturnType( boost::system::error_code)
>
{ typedef boost::fibers::asio::detail::yield_handler< void > type; };
// Handler type specialisation for use_future.
template< typename ReturnType, typename Arg2 >
struct handler_type<
boost::fibers::asio::yield_t,
ReturnType( boost::system::error_code, Arg2)
>
{ typedef boost::fibers::asio::detail::yield_handler< Arg2 > type; };
} // namespace asio
} // namespace boost
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP

View File

@@ -1,59 +0,0 @@
//
// blocking_tcp_echo_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
enum { max_length = 1024 };
int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: blocking_tcp_echo_client <host> <port>\n";
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), argv[1], argv[2]);
tcp::resolver::iterator iterator = resolver.resolve(query);
tcp::socket s(io_service);
boost::asio::connect(s, iterator);
using namespace std; // For strlen.
std::cout << "Enter message: ";
char request[max_length];
std::cin.getline(request, max_length);
size_t request_length = strlen(request);
boost::asio::write(s, boost::asio::buffer(request, request_length));
char reply[max_length];
size_t reply_length = boost::asio::read(s,
boost::asio::buffer(reply, request_length));
std::cout << "Reply is: ";
std::cout.write(reply, reply_length);
std::cout << "\n";
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@@ -1,63 +0,0 @@
//
// blocking_tcp_echo_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/thread.hpp>
using boost::asio::ip::tcp;
enum { max_length = 1024 };
int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: blocking_tcp_echo_client <host> <port>\n";
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), argv[1], argv[2]);
tcp::resolver::iterator iterator = resolver.resolve(query);
tcp::socket s(io_service);
boost::asio::connect(s, iterator);
using namespace std; // For strlen.
std::cout << "Enter message: ";
char request[max_length];
std::cin.getline(request, max_length);
size_t request_length = strlen(request);
boost::asio::write(s, boost::asio::buffer(request, request_length));
char reply[max_length];
size_t reply_length = boost::asio::read(s,
boost::asio::buffer(reply, request_length));
std::cout << "Reply is: ";
std::cout.write(reply, reply_length);
std::cout << "\nWe block for 10 seconds in order to let session timeout\n";
boost::this_thread::sleep( boost::posix_time::seconds( 10) );
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@@ -1,105 +0,0 @@
//
// echo_server.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
// 2013 Oliver Kowalke
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/fiber/all.hpp>
#include "loop.hpp"
#include "spawn.hpp"
#include "yield.hpp"
using boost::asio::ip::tcp;
const int max_length = 1024;
typedef boost::shared_ptr< tcp::socket > socket_ptr;
void session( socket_ptr sock)
{
try
{
for (;;)
{
char data[max_length];
boost::system::error_code ec;
std::size_t length = sock->async_read_some(
boost::asio::buffer( data),
boost::fibers::asio::yield[ec]);
if ( ec == boost::asio::error::eof)
break; //connection closed cleanlyby peer
else if ( ec)
throw boost::system::system_error( ec); //some other error
boost::asio::async_write(
* sock,
boost::asio::buffer( data, length),
boost::fibers::asio::yield[ec]);
if ( ec == boost::asio::error::eof)
break; //connection closed cleanlyby peer
else if ( ec)
throw boost::system::system_error( ec); //some other error
}
}
catch ( std::exception const& e)
{ std::cerr << "Exception in fiber: " << e.what() << "\n"; }
}
void server( boost::asio::io_service & io_service, unsigned short port)
{
tcp::acceptor a( io_service, tcp::endpoint( tcp::v4(), port) );
for (;;)
{
socket_ptr socket( new tcp::socket( io_service) );
boost::system::error_code ec;
std::cout << "wait for accept" << std::endl;
a.async_accept(
* socket,
boost::fibers::asio::yield[ec]);
std::cout << "accepted" << std::endl;
if ( ! ec) {
boost::fibers::fiber(
boost::bind( session, socket) ).detach();
}
}
}
int main( int argc, char* argv[])
{
try
{
if ( argc != 2)
{
std::cerr << "Usage: blocking_tcp_echo_server <port>\n";
return 1;
}
boost::asio::io_service io_service;
using namespace std; // For atoi.
boost::fibers::fiber(
boost::bind( server, boost::ref( io_service), atoi( argv[1]) ) ).detach();
boost::fibers::fiber f(
boost::bind( boost::fibers::asio::run_service, boost::ref( io_service) ) );
f.join();
}
catch ( std::exception const& e)
{ std::cerr << "Exception: " << e.what() << "\n"; }
return 0;
}

View File

@@ -1,140 +0,0 @@
//
// echo_server2.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
// 2013 Oliver Kowalke
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
//
#include <cstdlib>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/fiber/all.hpp>
#include "loop.hpp"
#include "spawn.hpp"
#include "yield.hpp"
using boost::asio::ip::tcp;
const int max_length = 1024;
class session : public boost::enable_shared_from_this< session >
{
public:
explicit session( boost::asio::io_service & io_service) :
strand_( io_service),
socket_( io_service),
timer_( io_service)
{}
tcp::socket& socket()
{ return socket_; }
void go()
{
boost::fibers::asio::spawn(strand_,
boost::bind(&session::echo,
shared_from_this(), _1));
boost::fibers::asio::spawn(strand_,
boost::bind(&session::timeout,
shared_from_this(), _1));
}
private:
void echo( boost::fibers::asio::yield_context yield)
{
try
{
char data[max_length];
for (;;)
{
timer_.expires_from_now(
boost::posix_time::seconds( 3) );
std::size_t n = socket_.async_read_some(
boost::asio::buffer( data),
boost::fibers::asio::yield);
boost::asio::async_write(
socket_,
boost::asio::buffer( data, n),
boost::fibers::asio::yield);
}
}
catch ( std::exception const& e)
{
socket_.close();
timer_.cancel();
}
}
void timeout( boost::fibers::asio::yield_context yield)
{
while ( socket_.is_open() )
{
boost::system::error_code ignored_ec;
timer_.async_wait( boost::fibers::asio::yield[ignored_ec]);
if ( timer_.expires_from_now() <= boost::posix_time::seconds( 0) ) {
std::cout << "session to " << socket_.remote_endpoint() << " timed out" << std::endl;
socket_.close();
}
}
}
boost::asio::io_service::strand strand_;
tcp::socket socket_;
boost::asio::deadline_timer timer_;
};
void do_accept(boost::asio::io_service& io_service,
unsigned short port, boost::fibers::asio::yield_context yield)
{
tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
for (;;)
{
boost::system::error_code ec;
boost::shared_ptr< session > new_session( new session( io_service) );
acceptor.async_accept(
new_session->socket(),
boost::fibers::asio::yield[ec]);
if ( ! ec) {
boost::fibers::fiber( boost::bind( & session::go, new_session) ).detach();
}
}
}
int main( int argc, char* argv[])
{
try
{
if ( argc != 2)
{
std::cerr << "Usage: echo_server <port>\n";
return 1;
}
boost::asio::io_service io_service;
using namespace std; // For atoi.
boost::fibers::asio::spawn( io_service,
boost::bind( do_accept,
boost::ref( io_service), atoi( argv[1]), _1) );
boost::fibers::fiber f(
boost::bind( boost::fibers::asio::run_service, boost::ref( io_service) ) );
f.join();
}
catch ( std::exception const& e)
{ std::cerr << "Exception: " << e.what() << "\n"; }
return 0;
}

View File

@@ -1,41 +0,0 @@
// Copyright Eugene Yakubovich 2014.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <boost/bind.hpp>
#include <boost/chrono/system_clocks.hpp>
#include <boost/config.hpp>
#include <boost/ref.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/scheduler.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
inline void timer_handler( boost::asio::high_resolution_timer & timer) {
boost::fibers::fm_yield();
timer.expires_at( boost::fibers::fm_next_wakeup() );
timer.async_wait( boost::bind( timer_handler, boost::ref( timer) ) );
}
inline void run_service( boost::asio::io_service & io_service) {
boost::asio::high_resolution_timer timer( io_service, boost::chrono::seconds(0) );
timer.async_wait( boost::bind( timer_handler, boost::ref( timer) ) );
io_service.run();
}
}}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif

View File

@@ -1,62 +0,0 @@
//
// blocking_tcp_echo_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/thread.hpp>
using boost::asio::ip::tcp;
enum { max_length = 1024 };
int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: publisher <host> <channel>\n";
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), argv[1], "9997");
tcp::resolver::iterator iterator = resolver.resolve(query);
tcp::socket s(io_service);
boost::asio::connect(s, iterator);
char msg[max_length];
std::string channel(argv[2]);
std::memset(msg, '\0', max_length);
std::memcpy(msg, channel.c_str(), channel.size() );
boost::asio::write(s, boost::asio::buffer(msg, max_length));
for (;;)
{
std::cout << "publish: ";
char request[max_length];
std::cin.getline(request, max_length);
boost::asio::write(s, boost::asio::buffer(request, max_length));
}
}
catch ( std::exception const& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@@ -1,426 +0,0 @@
//
// server.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2013 Oliver Kowalke
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
//
#include <cstddef>
#include <cstdlib>
#include <map>
#include <set>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/foreach.hpp>
#include <boost/make_shared.hpp>
#include <boost/ref.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/all.hpp>
#include "../loop.hpp"
#include "../spawn.hpp"
#include "../yield.hpp"
using boost::asio::ip::tcp;
const std::size_t max_length = 1024;
class subscriber_session;
typedef boost::shared_ptr<subscriber_session> subscriber_session_ptr;
// a channel has n subscribers (subscriptions)
// this class holds a list of subcribers for one channel
class subscriptions
{
public:
~subscriptions();
// subscribe to this channel
void subscribe( subscriber_session_ptr const& s)
{ subscribers_.insert( s); }
// unsubscribe from this channel
void unsubscribe( subscriber_session_ptr const& s)
{ subscribers_.erase(s); }
// publish a message, e.g. push this message to all subscribers
void publish( std::string const& msg);
private:
// list of subscribers
std::set<subscriber_session_ptr> subscribers_;
};
// a class to register channels and to subsribe clients to this channels
class registry : private boost::noncopyable
{
private:
typedef std::map< std::string, boost::shared_ptr< subscriptions > > channels_cont;
typedef channels_cont::iterator channels_iter;
boost::fibers::mutex mtx_;
channels_cont channels_;
void register_channel_( std::string const& channel)
{
if ( channels_.end() != channels_.find( channel) )
throw std::runtime_error("channel already exists");
channels_[channel] = boost::make_shared< subscriptions >();
std::cout << "new channel '" << channel << "' registered" << std::endl;
}
void unregister_channel_( std::string const& channel)
{
channels_.erase( channel);
std::cout << "channel '" << channel << "' unregistered" << std::endl;
}
void subscribe_( std::string const& channel, subscriber_session_ptr s)
{
channels_iter iter = channels_.find( channel);
if ( channels_.end() == iter )
throw std::runtime_error("channel does not exist");
iter->second->subscribe( s);
std::cout << "new subscription to channel '" << channel << "'" << std::endl;
}
void unsubscribe_( std::string const& channel, subscriber_session_ptr s)
{
channels_iter iter = channels_.find( channel);
if ( channels_.end() != iter )
iter->second->unsubscribe( s);
}
void publish_( std::string const& channel, std::string const& msg)
{
channels_iter iter = channels_.find( channel);
if ( channels_.end() == iter )
throw std::runtime_error("channel does not exist");
iter->second->publish( msg);
std::cout << "message '" << msg << "' to publish on channel '" << channel << "'" << std::endl;
}
public:
// add a channel to registry
void register_channel( std::string const& channel)
{
boost::unique_lock< boost::fibers::mutex > lk( mtx_);
register_channel_( channel);
}
// remove a channel from registry
void unregister_channel( std::string const& channel)
{
boost::unique_lock< boost::fibers::mutex > lk( mtx_);
unregister_channel_( channel);
}
// subscribe to a channel
void subscribe( std::string const& channel, subscriber_session_ptr s)
{
boost::unique_lock< boost::fibers::mutex > lk( mtx_);
subscribe_( channel, s);
}
// unsubscribe from a channel
void unsubscribe( std::string const& channel, subscriber_session_ptr s)
{
boost::unique_lock< boost::fibers::mutex > lk( mtx_);
unsubscribe_( channel, s);
}
// publish a message to all subscribers registerd to the channel
void publish( std::string const& channel, std::string const& msg)
{
boost::unique_lock< boost::fibers::mutex > lk( mtx_);
publish_( channel, msg);
}
};
// a subscriber subscribes to a given channel in order to receive messages published on this channel
class subscriber_session : public boost::enable_shared_from_this< subscriber_session >
{
public:
explicit subscriber_session( boost::asio::io_service & io_service, registry & reg) :
socket_( io_service),
reg_( reg)
{}
tcp::socket& socket()
{ return socket_; }
// this function is executed inside the fiber
void run( boost::fibers::asio::yield_context yield)
{
std::string channel;
try
{
boost::system::error_code ec;
// read first message == channel name
// async_ready() returns if the the complete message is read
// until this the fiber is suspended until the complete message
// is read int the given buffer 'data'
boost::asio::async_read(
socket_,
boost::asio::buffer( data_),
yield[ec]);
if ( ec) throw std::runtime_error("no channel from subscriber");
// first message ist equal to the channel name the publisher
// publishes to
channel = data_;
// subscribe to new channel
reg_.subscribe( channel, shared_from_this() );
// read published messages
for (;;)
{
// wait for a conditon-variable for new messages
// the fiber will be suspended until the condtion
// becomes true and the fiber is resumed
// published message is stored in buffer 'data_'
boost::unique_lock< boost::fibers::mutex > lk( mtx_);
cond_.wait( lk);
std::string data( data_);
lk.unlock();
std::cout << "subscriber::run(): '" << data << std::endl;
// message '<fini>' terminates subscription
if ( "<fini>" == data) break;
// async. write message to socket connected with
// subscriber
// async_write() returns if the complete message was writen
// the fiber is suspended in the meanwhile
boost::asio::async_write(
socket_,
boost::asio::buffer( data, data.size() ),
yield[ec]);
if ( ec == boost::asio::error::eof)
break; //connection closed cleanly by peer
else if ( ec)
throw boost::system::system_error( ec); //some other error
std::cout << "subscriber::run(): '" << data << " written" << std::endl;
}
}
catch ( std::exception const& e)
{ std::cerr << "subscriber [" << channel << "] failed: " << e.what() << std::endl; }
// close socket
socket_.close();
// unregister channel
reg_.unsubscribe( channel, shared_from_this() );
}
// called from publisher_session (running in other fiber)
void publish( std::string const& msg)
{
boost::unique_lock< boost::fibers::mutex > lk( mtx_);
std::memset(data_, '\0', sizeof( data_));
std::memcpy(data_, msg.c_str(), (std::min)(max_length, msg.size()));
cond_.notify_one();
}
private:
tcp::socket socket_;
registry & reg_;
boost::fibers::mutex mtx_;
boost::fibers::condition_variable cond_;
// fixed size message
char data_[max_length];
};
subscriptions::~subscriptions()
{
BOOST_FOREACH( subscriber_session_ptr s, subscribers_)
{ s->publish("<fini>"); }
}
void
subscriptions::publish( std::string const& msg)
{
BOOST_FOREACH( subscriber_session_ptr s, subscribers_)
{ s->publish( msg); }
}
// a publisher publishes messages on its channel
// subscriber might register to this channel to get the published messages
class publisher_session : public boost::enable_shared_from_this< publisher_session >
{
public:
explicit publisher_session( boost::asio::io_service & io_service, registry & reg) :
socket_( io_service),
reg_( reg)
{}
tcp::socket& socket()
{ return socket_; }
// this function is executed inside the fiber
void run( boost::fibers::asio::yield_context yield)
{
std::string channel;
try
{
boost::system::error_code ec;
// fixed size message
char data[max_length];
// read first message == channel name
// async_ready() returns if the the complete message is read
// until this the fiber is suspended until the complete message
// is read int the given buffer 'data'
boost::asio::async_read(
socket_,
boost::asio::buffer( data),
yield[ec]);
if ( ec) throw std::runtime_error("no channel from publisher");
// first message ist equal to the channel name the publisher
// publishes to
channel = data;
// register the new channel
reg_.register_channel( channel);
// start publishing messages
for (;;)
{
// read message from publisher asyncronous
// async_read() suspends this fiber until the complete emssage is read
// and stored in the given buffer 'data'
boost::asio::async_read(
socket_,
boost::asio::buffer( data),
yield[ec]);
if ( ec == boost::asio::error::eof)
break; //connection closed cleanly by peer
else if ( ec)
throw boost::system::system_error( ec); //some other error
// publish message to all subscribers
reg_.publish( channel, std::string( data) );
}
}
catch ( std::exception const& e)
{ std::cerr << "publisher [" << channel << "] failed: " << e.what() << std::endl; }
// close socket
socket_.close();
// unregister channel
reg_.unregister_channel( channel);
}
private:
tcp::socket socket_;
registry & reg_;
};
typedef boost::shared_ptr< publisher_session > publisher_session_ptr;
// function accepts connections requests from clients acting as a publisher
void accept_publisher( boost::asio::io_service& io_service,
unsigned short port,
registry & reg,
boost::fibers::asio::yield_context yield)
{
// create TCP-acceptor
tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
// loop for accepting connection requests
for (;;)
{
boost::system::error_code ec;
// create new publisher-session
// this instance will be associated with one publisher
publisher_session_ptr new_publisher_session =
boost::make_shared<publisher_session>( boost::ref( io_service), boost::ref( reg) );
// async. accept of new connection request
// this function will suspend this execution context (fiber) until a
// connection was established, after returning from this function a new client (publisher)
// is connected
acceptor.async_accept(
new_publisher_session->socket(),
yield[ec]);
if ( ! ec) {
// run the new publisher in its own fiber (one fiber for one client)
boost::fibers::asio::spawn( io_service,
boost::bind( & publisher_session::run, new_publisher_session, _1) );
}
}
}
// function accepts connections requests from clients acting as a subscriber
void accept_subscriber( boost::asio::io_service& io_service,
unsigned short port,
registry & reg,
boost::fibers::asio::yield_context yield)
{
// create TCP-acceptor
tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), port) );
// loop for accepting connection requests
for (;;)
{
boost::system::error_code ec;
// create new subscriber-session
// this instance will be associated with one subscriber
subscriber_session_ptr new_subscriber_session =
boost::make_shared<subscriber_session>( boost::ref( io_service), boost::ref( reg) );
// async. accept of new connection request
// this function will suspend this execution context (fiber) until a
// connection was established, after returning from this function a new client (subscriber)
// is connected
acceptor.async_accept(
new_subscriber_session->socket(),
yield[ec]);
if ( ! ec) {
// run the new subscriber in its own fiber (one fiber for one client)
boost::fibers::asio::spawn( io_service,
boost::bind( & subscriber_session::run, new_subscriber_session, _1) );
}
}
}
int main( int argc, char* argv[])
{
try
{
// create io_service for async. I/O
boost::asio::io_service io_service;
// registry for channels and its subscription
registry reg;
// create an acceptor for publishers, run it as fiber
boost::fibers::asio::spawn( io_service,
boost::bind( accept_publisher,
boost::ref( io_service), 9997, boost::ref( reg), _1) );
// create an acceptor for subscribersm, run it as fiber
boost::fibers::asio::spawn( io_service,
boost::bind( accept_subscriber,
boost::ref( io_service), 9998, boost::ref( reg), _1) );
boost::fibers::fiber f(
boost::bind( boost::fibers::asio::run_service, boost::ref( io_service) ) );
f.join();
}
catch ( std::exception const& e)
{ std::cerr << "Exception: " << e.what() << "\n"; }
return 0;
}

View File

@@ -1,64 +0,0 @@
//
// blocking_tcp_echo_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/thread.hpp>
using boost::asio::ip::tcp;
enum { max_length = 1024 };
int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: subscriber <host> <channel>\n";
return 1;
}
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), argv[1], "9998");
tcp::resolver::iterator iterator = resolver.resolve(query);
tcp::socket s(io_service);
boost::asio::connect(s, iterator);
char msg[max_length];
std::string channel(argv[2]);
std::memset(msg, '\0', max_length);
std::memcpy(msg, channel.c_str(), channel.size() );
boost::asio::write(s, boost::asio::buffer(msg, max_length));
for (;;)
{
char reply[max_length];
size_t reply_length = s.read_some(
boost::asio::buffer(reply, max_length));
std::cout << "published: ";
std::cout.write(reply, reply_length);
std::cout << std::endl;
}
}
catch (std::exception const& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@@ -1,239 +0,0 @@
//
// spawn.hpp
// ~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_FIBERS_ASIO_SPAWN_HPP
#define BOOST_FIBERS_ASIO_SPAWN_HPP
#include <boost/asio/detail/config.hpp>
#include <boost/asio/detail/weak_ptr.hpp>
#include <boost/asio/detail/wrapped_handler.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/strand.hpp>
#include <boost/config.hpp>
#include <boost/move/move.hpp>
#include <boost/fiber/attributes.hpp>
#include <boost/fiber/detail/fiber_base.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
/// Context object the represents the currently executing fiber.
/**
* The basic_yield_context class is used to represent the currently executing
* fiber. A basic_yield_context may be passed as a handler to an * asynchronous
* operation. For example:
*
* @code template< typename Handler >
* void my_fiber( basic_yield_context< Handler > yield)
* {
* ...
* std::size_t n = my_socket.async_read_some( buffer, yield);
* ...
* } @endcode
*
* The initiating function (async_read_some in the above example) suspends the
* current fiber. The fiber is resumed when the asynchronous operation
* completes, and the result of the operation is returned.
*/
template< typename Handler >
class basic_yield_context
{
public:
/// Construct a yield context to represent the specified fiber.
/**
* Most applications do not need to use this constructor. Instead, the
* spawn() function passes a yield context as an argument to the fiber
* function.
*/
basic_yield_context(
boost::fibers::detail::fiber_base * fib,
Handler& handler) :
fiber_( fib),
handler_( handler),
ec_( 0)
{}
/// Return a yield context that sets the specified error_code.
/**
* By default, when a yield context is used with an asynchronous operation, a
* non-success error_code is converted to system_error and thrown. This
* operator may be used to specify an error_code object that should instead be
* set with the asynchronous operation's result. For example:
*
* @code template< typename Handler >
* void my_fiber( basic_yield_context< Handler > yield)
* {
* ...
* std::size_t n = my_socket.async_read_some( buffer, yield[ec]);
* if ( ec)
* {
* // An error occurred.
* }
* ...
* } @endcode
*/
basic_yield_context operator[]( boost::system::error_code & ec)
{
basic_yield_context tmp( * this);
tmp.ec_ = & ec;
return tmp;
}
#if defined(GENERATING_DOCUMENTATION)
private:
#endif // defined(GENERATING_DOCUMENTATION)
boost::fibers::detail::fiber_base * fiber_;
Handler & handler_;
boost::system::error_code * ec_;
};
#if defined(GENERATING_DOCUMENTATION)
/// Context object the represents the currently executing fiber.
typedef basic_yield_context< unspecified > yield_context;
#else // defined(GENERATING_DOCUMENTATION)
typedef basic_yield_context<
boost::asio::detail::wrapped_handler<
boost::asio::io_service::strand, void(*)(),
boost::asio::detail::is_continuation_if_running> > yield_context;
#endif // defined(GENERATING_DOCUMENTATION)
/**
* @defgroup spawn boost::fibers::asio::spawn
*
* @brief Start a new stackful fiber.
*
* The spawn() function is a high-level wrapper over the Boost.Fiber
* library. This function enables programs to implement asynchronous logic in a
* synchronous manner, as illustrated by the following example:
*
* @code boost::asio::spawn( my_strand, do_echo);
*
* // ...
*
* void do_echo( boost::fibers::asio::yield_context yield)
* {
* try
* {
* char data[128];
* for (;;)
* {
* std::size_t length =
* my_socket.async_read_some(
* boost::asio::buffer( data), yield);
*
* boost::asio::async_write( my_socket,
* boost::asio::buffer( data, length), yield);
* }
* }
* catch ( std::exception const& e)
* {
* // ...
* }
* } @endcode
*/
/*@{*/
/// Start a new fiber, calling the specified handler when it completes.
/**
* This function is used to launch a new fiber.
*
* @param handler A handler to be called when the fiber exits. More
* importantly, the handler provides an execution context (via the handler
* invocation hook) for the fiber. The handler must have the signature:
* @code void handler(); @endcode
*
* @param function The fiber function. The function must have the signature:
* @code void function( basic_yield_context< Handler > yield); @endcode
*
* @param attributes Boost.Fiber attributes used to customise the fiber.
*/
template< typename Handler, typename Function >
void spawn( boost::asio::io_service & io_service,
BOOST_ASIO_MOVE_ARG( Handler) handler,
BOOST_ASIO_MOVE_ARG( Function) function,
boost::fibers::attributes const& attributes
= boost::fibers::attributes() );
/// Start a new fiber, inheriting the execution context of another.
/**
* This function is used to launch a new fiber.
*
* @param ctx Identifies the current fiber as a parent of the new
* fiber. This specifies that the new fiber should inherit the
* execution context of the parent. For example, if the parent fiber is
* executing in a particular strand, then the new fiber will execute in the
* same strand.
*
* @param function The fiber function. The function must have the signature:
* @code void function( basic_yield_context< Handler > yield); @endcode
*
* @param attributes Boost.Fiber attributes used to customise the fiber.
*/
template< typename Handler, typename Function >
void spawn( boost::asio::io_service & io_service,
basic_yield_context< Handler > ctx,
BOOST_ASIO_MOVE_ARG( Function) function,
boost::fibers::attributes const& attributes
= boost::fibers::attributes() );
/// Start a new fiber that executes in the contex of a strand.
/**
* This function is used to launch a new fiber.
*
* @param strand Identifies a strand. By starting multiple fibers on the
* same strand, the implementation ensures that none of those fibers can
* execute simultaneously.
*
* @param function The fiber function. The function must have the signature:
* @code void function( yield_context yield); @endcode
*
* @param attributes Boost.Fiber attributes used to customise the fiber.
*/
template< typename Function >
void spawn( boost::asio::io_service::strand strand,
BOOST_ASIO_MOVE_ARG( Function) function,
boost::fibers::attributes const& attributes
= boost::fibers::attributes() );
/// Start a new fiber that executes on a given io_service.
/**
* This function is used to launch a new fiber.
*
* @param io_service Identifies the io_service that will run the fiber. The
* new fiber is implicitly given its own strand within this io_service.
*
* @param function The fiber function. The function must have the signature:
* @code void function( yield_context yield); @endcode
*
* @param attributes Boost.Fiber attributes used to customise the fiber.
*/
template< typename Function >
void spawn( boost::asio::io_service & io_service,
BOOST_ASIO_MOVE_ARG( Function) function,
boost::fibers::attributes const& attributes
= boost::fibers::attributes() );
} // namespace asio
} // namespace fibers
} // namespace boost
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#include "detail/spawn.hpp"
#endif // BOOST_FIBERS_ASIO_SPAWN_HPP

View File

@@ -1,88 +0,0 @@
//
// use_future.hpp
// ~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// modified by Oliver Kowalke
//
#ifndef BOOST_FIBERS_ASIO_USE_FUTURE_HPP
#define BOOST_FIBERS_ASIO_USE_FUTURE_HPP
#include <memory>
#include <boost/config.hpp>
#include <boost/asio/detail/config.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
/// Class used to specify that an asynchronous operation should return a future.
/**
* The use_future_t class is used to indicate that an asynchronous operation
* should return a boost::fibers::future object. A use_future_t object may be passed as a
* handler to an asynchronous operation, typically using the special value @c
* boost::asio::use_future. For example:
*
* @code boost::fibers::future<std::size_t> my_future
* = my_socket.async_read_some(my_buffer, boost::fibers::asio::use_future); @endcode
*
* The initiating function (async_read_some in the above example) returns a
* future that will receive the result of the operation. If the operation
* completes with an error_code indicating failure, it is converted into a
* system_error and passed back to the caller via the future.
*/
template< typename Allocator = std::allocator< void > >
class use_future_t
{
public:
typedef Allocator allocator_type;
/// Construct using default-constructed allocator.
BOOST_CONSTEXPR use_future_t()
{}
/// Construct using specified allocator.
explicit use_future_t( Allocator const& allocator) :
allocator_( allocator)
{}
/// Specify an alternate allocator.
template< typename OtherAllocator >
use_future_t< OtherAllocator > operator[]( OtherAllocator const& allocator) const
{ return use_future_t< OtherAllocator >( allocator); }
/// Obtain allocator.
allocator_type get_allocator() const
{ return allocator_; }
private:
Allocator allocator_;
};
/// A special value, similar to std::nothrow.
/**
* See the documentation for boost::asio::use_future_t for a usage example.
*/
BOOST_CONSTEXPR_OR_CONST use_future_t<> use_future;
} // namespace asio
} // namespace fibers
} // namespace boost
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#include "detail/use_future.hpp"
#endif // BOOST_FIBERS_ASIO_USE_FUTURE_HPP

View File

@@ -1,56 +0,0 @@
//
// yield.hpp
// ~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// modified by Oliver Kowalke
//
#ifndef BOOST_FIBERS_ASIO_YIELD_HPP
#define BOOST_FIBERS_ASIO_YIELD_HPP
#include <memory>
#include <boost/asio/detail/config.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
class yield_t
{
public:
BOOST_CONSTEXPR yield_t() :
ec_( 0)
{}
yield_t operator[]( boost::system::error_code & ec) const
{
yield_t tmp;
tmp.ec_ = & ec;
return tmp;
}
//private:
boost::system::error_code * ec_;
};
BOOST_CONSTEXPR_OR_CONST yield_t yield;
}}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#include "detail/yield.hpp"
#endif // BOOST_FIBERS_ASIO_YIELD_HPP

View File

@@ -1,92 +0,0 @@
#include <cstdlib>
#include <iostream>
#include <boost/assert.hpp>
#include <boost/ref.hpp>
#include <boost/thread.hpp>
#include <boost/fiber/all.hpp>
int value1 = 0;
int value2 = 0;
inline
void fn1( boost::fibers::barrier & b)
{
boost::fibers::fiber::id id = boost::this_fiber::get_id();
std::cout << "fiber " << id << ": fn1 entered" << std::endl;
++value1;
std::cout << "fiber " << id << ": incremented value1: " << value1 << std::endl;
boost::this_fiber::yield();
std::cout << "fiber " << id << ": waits for barrier" << std::endl;
b.wait();
std::cout << "fiber " << id << ": passed barrier" << std::endl;
++value1;
std::cout << "fiber " << id << ": incremented value1: " << value1 << std::endl;
boost::this_fiber::yield();
++value1;
std::cout << "fiber " << id << ": incremented value1: " << value1 << std::endl;
boost::this_fiber::yield();
++value1;
std::cout << "fiber " << id << ": incremented value1: " << value1 << std::endl;
boost::this_fiber::yield();
std::cout << "fiber " << id << ": fn1 returns" << std::endl;
}
inline
void fn2( boost::fibers::barrier & b)
{
boost::fibers::fiber::id id = boost::this_fiber::get_id();
std::cout << "fiber " << id << ": fn2 entered" << std::endl;
++value2;
std::cout << "fiber " << id << ": incremented value2: " << value2 << std::endl;
boost::this_fiber::yield();
++value2;
std::cout << "fiber " << id << ": incremented value2: " << value2 << std::endl;
boost::this_fiber::yield();
++value2;
std::cout << "fiber " << id << ": incremented value2: " << value2 << std::endl;
boost::this_fiber::yield();
std::cout << "fiber " << id << ": waits for barrier" << std::endl;
b.wait();
std::cout << "fiber " << id << ": passed barrier" << std::endl;
++value2;
std::cout << "fiber " << id << ": incremented value2: " << value2 << std::endl;
boost::this_fiber::yield();
std::cout << "fiber " << id << ": fn2 returns" << std::endl;
}
int main()
{
try
{
boost::fibers::barrier fb( 2);
boost::fibers::fiber f1( boost::bind( & fn1, boost::ref( fb) ) );
boost::fibers::fiber f2( boost::bind( & fn2, boost::ref( fb) ) );
f1.join();
f2.join();
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
catch ( std::exception const& e)
{ std::cerr << "exception: " << e.what() << std::endl; }
catch (...)
{ std::cerr << "unhandled exception" << std::endl; }
return EXIT_FAILURE;
}

View File

@@ -1,45 +0,0 @@
#include <cstdlib>
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/fiber/all.hpp>
inline
int fn( std::string const& str, int n)
{
for ( int i = 0; i < n; ++i)
{
std::cout << i << ": " << str << std::endl;
boost::this_fiber::yield();
}
return n;
}
void start()
{
boost::fibers::future< int > fi(
boost::fibers::async(
boost::bind( fn, "abc", 5) ) );
fi.wait();
std::cout << "fn() returned " << fi.get() << std::endl;
}
int main()
{
try
{
boost::fibers::fiber( start).join();
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
catch ( std::exception const& e)
{ std::cerr << "exception: " << e.what() << std::endl; }
catch (...)
{ std::cerr << "unhandled exception" << std::endl; }
return EXIT_FAILURE;
}

View File

@@ -1,51 +0,0 @@
// (C) Copyright 2013 Oliver Kowalke
//
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <utility>
#include <memory>
#include <stdexcept>
#include <string>
#include <boost/assert.hpp>
#include <boost/bind.hpp>
#include <boost/fiber/all.hpp>
#include <boost/move/move.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
typedef boost::shared_ptr< boost::fibers::packaged_task< int() > > packaged_task_t;
int fn( int i)
{ return i; }
void exec( packaged_task_t pt)
{
boost::fibers::fiber( boost::move( * pt) ).join();
}
boost::fibers::future< int > async( int i)
{
packaged_task_t pt(
new boost::fibers::packaged_task< int() >(
boost::bind( fn, i) ) );
boost::fibers::future< int > f( pt->get_future() );
boost::thread( boost::bind( exec, pt) ).detach();
return boost::move( f);
}
int main( int argc, char * argv[])
{
for ( int i = 0; i < 5; ++i)
{
int n = 3;
boost::fibers::future< int > f = async( n);
int result = f.get();
BOOST_ASSERT( n == result);
std::cout << "result == " << result << std::endl;
}
return 0;
}

View File

@@ -1,49 +0,0 @@
#include <cstdlib>
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/fiber/all.hpp>
struct condition_test_data
{
condition_test_data() : notified(0), awoken(0) { }
boost::fibers::mutex mutex;
boost::fibers::condition condition;
int notified;
int awoken;
};
void condition_test_fiber(condition_test_data* data)
{
boost::unique_lock<boost::fibers::mutex> lock(data->mutex);
while (!(data->notified > 0))
data->condition.wait(lock);
data->awoken++;
}
int main()
{
condition_test_data data;
boost::fibers::fiber f(boost::bind(&condition_test_fiber, &data));
f.interrupt();
try
{
f.join();
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
catch ( boost::fibers::fiber_interrupted const&)
{ std::cerr << "interrupted" << std::endl; }
catch ( std::exception const& e)
{ std::cerr << "exception: " << e.what() << std::endl; }
catch (...)
{ std::cerr << "unhandled exception" << std::endl; }
return EXIT_FAILURE;
}

View File

@@ -1,61 +0,0 @@
#include <cstdlib>
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/fiber/all.hpp>
int value1 = 0;
int value2 = 0;
void fn1()
{
boost::fibers::fiber::id id =boost::this_fiber::get_id();
for ( int i = 0; i < 5; ++i)
{
++value1;
std::cout << "fiber " << id << " fn1: increment value1: " << value1 << std::endl;
boost::this_fiber::yield();
}
std::cout << "fiber " << id << " fn1: returns" << std::endl;
}
void fn2( boost::fibers::fiber & f)
{
boost::fibers::fiber::id id =boost::this_fiber::get_id();
for ( int i = 0; i < 5; ++i)
{
++value2;
std::cout << "fiber " << id << " fn2: increment value2: " << value2 << std::endl;
if ( i == 1)
{
boost::fibers::fiber::id id = f.get_id();
std::cout << "fiber " << id << " fn2: joins fiber " << id << std::endl;
f.join();
std::cout << "fiber " << id << " fn2: joined fiber " << id << std::endl;
}
boost::this_fiber::yield();
}
std::cout << "fiber " << id << " fn2: returns" << std::endl;
}
int main()
{
try
{
boost::fibers::fiber f1( fn1);
boost::fibers::fiber f2( boost::bind( fn2, boost::ref( f1) ) );
f2.join();
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
catch ( std::exception const& e)
{ std::cerr << "exception: " << e.what() << std::endl; }
catch (...)
{ std::cerr << "unhandled exception" << std::endl; }
return EXIT_FAILURE;
}

View File

@@ -1,125 +0,0 @@
// Copyright Oliver Kowalke 2009.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <cstdio>
#include <sstream>
#include <string>
#include <boost/atomic.hpp>
#include <boost/assert.hpp>
#include <boost/thread.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/all.hpp>
#include "workstealing_round_robin.hpp"
#define MAXCOUNT 10
boost::atomic< bool > fini( false);
boost::fibers::future< int > fibonacci( int);
int fibonacci_( int n)
{
boost::this_fiber::yield();
int res = 1;
if ( 0 != n && 1 != n)
{
boost::fibers::future< int > f1 = fibonacci( n - 1);
boost::fibers::future< int > f2 = fibonacci( n - 2);
res = f1.get() + f2.get();
}
return res;
}
boost::fibers::future< int > fibonacci( int n)
{
boost::fibers::packaged_task< int() > pt( boost::bind( fibonacci_, n) );
boost::fibers::future< int > f( pt.get_future() );
boost::fibers::fiber( boost::move( pt) ).detach();
return boost::move( f);
}
int create_fiber( int n)
{
return fibonacci( n).get();
}
void fn_create_fibers( workstealing_round_robin * ds, boost::barrier * b)
{
boost::fibers::set_scheduling_algorithm( ds);
b->wait();
int n = 10;
int result = boost::fibers::async( boost::bind( create_fiber, n) ).get();
BOOST_ASSERT( 89 == result);
fprintf( stderr, "fibonacci(%d) = %d", n, result);
fini = true;
}
void fn_migrate_fibers( workstealing_round_robin * other_ds, boost::barrier * b, int * count)
{
BOOST_ASSERT( other_ds);
b->wait();
while ( ! fini)
{
// To guarantee progress, we must ensure that
// threads that have work to do are not unreasonably delayed by (thief) threads
// which are idle except for task-stealing.
// This call yields the thief s processor to another thread, allowing
// descheduled threads to regain a processor and make progress.
boost::this_thread::yield();
boost::fibers::fiber f( other_ds->steal() );
if ( f)
{
++( * count);
boost::fibers::migrate( f);
f.join();
}
//boost::this_fiber::yield();
}
}
int main()
{
try
{
for ( int i = 0; i < MAXCOUNT; ++i) {
fprintf(stderr, "%d. ", i);
fini = false;
int count = 0;
workstealing_round_robin * ds = new workstealing_round_robin();
boost::barrier b( 2);
boost::thread t1( boost::bind( fn_create_fibers, ds, &b) );
boost::thread t2( boost::bind( fn_migrate_fibers, ds, &b, &count) );
t1.join();
t2.join();
fprintf(stderr, ", %d fibers stolen\n", count);
delete ds;
}
return EXIT_SUCCESS;
}
catch ( std::exception const& e)
{ std::cerr << "exception: " << e.what() << std::endl; }
catch (...)
{ std::cerr << "unhandled exception" << std::endl; }
return EXIT_FAILURE;
}

View File

@@ -1,69 +0,0 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include "workstealing_round_robin.hpp"
#include <boost/assert.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
void
workstealing_round_robin::awakened( boost::fibers::detail::fiber_base * f)
{
boost::mutex::scoped_lock lk( mtx_);
rqueue_.push_back( f);
}
boost::fibers::detail::fiber_base *
workstealing_round_robin::pick_next()
{
boost::mutex::scoped_lock lk( mtx_);
boost::fibers::detail::fiber_base * f = 0;
if ( ! rqueue_.empty() )
{
f = rqueue_.front();
rqueue_.pop_front();
}
return f;
}
void
workstealing_round_robin::priority( boost::fibers::detail::fiber_base * f, int prio) BOOST_NOEXCEPT
{
BOOST_ASSERT( f);
// set only priority to fiber
// round-robin does not respect priorities
f->priority( prio);
}
boost::fibers::fiber
workstealing_round_robin::steal() BOOST_NOEXCEPT
{
boost::mutex::scoped_lock lk( mtx_);
boost::fibers::detail::fiber_base * f = 0;
if ( ! rqueue_.empty() )
{
f = rqueue_.back();
rqueue_.pop_back();
if ( f->thread_affinity() )
{
rqueue_.push_back( f);
f = 0;
}
}
#if 0
if ( 0 != f)
fprintf(stderr, "migrated fiber: %p\n", f);
#endif
return boost::fibers::fiber( f);
}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif

View File

@@ -1,51 +0,0 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef WORKSTEALING_ROUND_ROBIN_H
#define WORKSTEALING_ROUND_ROBIN_H
#include <list>
#include <boost/config.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/fiber/all.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
# if defined(BOOST_MSVC)
# pragma warning(push)
# pragma warning(disable:4251 4275)
# endif
class workstealing_round_robin : public boost::fibers::sched_algorithm
{
private:
typedef std::list< boost::fibers::detail::fiber_base * > rqueue_t;
boost::mutex mtx_;
rqueue_t rqueue_;
public:
virtual void awakened( boost::fibers::detail::fiber_base *);
virtual boost::fibers::detail::fiber_base * pick_next();
virtual void priority( boost::fibers::detail::fiber_base *, int) BOOST_NOEXCEPT;
boost::fibers::fiber steal();
};
# if defined(BOOST_MSVC)
# pragma warning(pop)
# endif
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // WORKSTEALING_ROUND_ROBIN_H

View File

@@ -1,90 +0,0 @@
#include <cstdlib>
#include <iostream>
#include <string>
#include <boost/assert.hpp>
#include <boost/bind.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/ref.hpp>
#include <boost/optional.hpp>
#include <boost/fiber/all.hpp>
typedef boost::fibers::unbounded_queue< std::string > fifo_t;
inline
void ping( fifo_t & recv_buf, fifo_t & send_buf)
{
boost::fibers::fiber::id id( boost::this_fiber::get_id() );
send_buf.push( std::string("ping") );
std::string value = recv_buf.value_pop();
std::cout << "fiber " << id << ": ping received: " << value << std::endl;
value.clear();
send_buf.push( std::string("ping") );
value = recv_buf.value_pop();
std::cout << "fiber " << id << ": ping received: " << value << std::endl;
value.clear();
send_buf.push( std::string("ping") );
value = recv_buf.value_pop();
std::cout << "fiber " << id << ": ping received: " << value << std::endl;
send_buf.close();
}
inline
void pong( fifo_t & recv_buf, fifo_t & send_buf)
{
boost::fibers::fiber::id id( boost::this_fiber::get_id() );
std::string value = recv_buf.value_pop();
std::cout << "fiber " << id << ": pong received: " << value << std::endl;
value.clear();
send_buf.push( std::string("pong") );
value = recv_buf.value_pop();
std::cout << "fiber " << id << ": pong received: " << value << std::endl;
value.clear();
send_buf.push( std::string("pong") );
value = recv_buf.value_pop();
std::cout << "fiber " << id << ": pong received: " << value << std::endl;
send_buf.push( std::string("pong") );
send_buf.close();
}
int main()
{
try
{
fifo_t buf1, buf2;
boost::fibers::fiber f1(
boost::bind(
& ping, boost::ref( buf1), boost::ref( buf2) ) );
boost::fibers::fiber f2(
boost::bind(
& pong, boost::ref( buf2), boost::ref( buf1) ) );
f1.join();
f2.join();
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
catch ( std::exception const& e)
{ std::cerr << "exception: " << e.what() << std::endl; }
catch (...)
{ std::cerr << "unhandled exception" << std::endl; }
return EXIT_FAILURE;
}

View File

@@ -1,66 +0,0 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <iostream>
#include <boost/assert.hpp>
#include <boost/fiber/all.hpp>
#include <boost/thread.hpp>
int count = 384;
#ifdef BOOST_MSVC //MS VisualStudio
__declspec(noinline) void access( char *buf);
#else // GCC
void access( char *buf) __attribute__ ((noinline));
#endif
void access( char *buf)
{
buf[0] = '\0';
}
void bar( int i)
{
char buf[4 * 1024];
if ( i > 0)
{
access( buf);
std::cout << i << ". iteration" << std::endl;
bar( i - 1);
}
}
void foo()
{
bar( count);
boost::this_fiber::yield();
}
void thread_fn()
{
{
boost::fibers::fiber f( foo);
f.join();
}
}
int main( int argc, char * argv[])
{
#if defined(BOOST_USE_SEGMENTED_STACKS)
std::cout << "using segmented stacks: allocates " << count << " * 4kB == " << 4 * count << "kB on stack, ";
std::cout << "initial stack size = " << boost::coroutines::stack_allocator::default_stacksize() / 1024 << "kB" << std::endl;
std::cout << "application should not fail" << std::endl;
#else
std::cout << "using standard stacks: allocates " << count << " * 4kB == " << 4 * count << "kB on stack, ";
std::cout << "initial stack size = " << boost::coroutines::stack_allocator::traits_type::default_size() / 1024 << "kB" << std::endl;
std::cout << "application might fail" << std::endl;
#endif
boost::thread( thread_fn).join();
return 0;
}

View File

@@ -1,39 +0,0 @@
#include <cstdlib>
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/fiber/all.hpp>
inline
void fn( std::string const& str, int n)
{
for ( int i = 0; i < n; ++i)
{
std::cout << i << ": " << str << std::endl;
boost::this_fiber::yield();
}
}
int main()
{
try
{
boost::fibers::fiber f1( boost::bind( fn, "abc", 5) );
boost::fibers::fiber f2( boost::bind( fn, "xyz", 7) );
f1.join();
f2.join();
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}
catch ( std::exception const& e)
{ std::cerr << "exception: " << e.what() << std::endl; }
catch (...)
{ std::cerr << "unhandled exception" << std::endl; }
return EXIT_FAILURE;
}

View File

@@ -1,23 +0,0 @@
#include <string>
#include <boost/bind.hpp>
#include <boost/fiber/all.hpp>
void foo( std::string const& str, int n)
{
for ( int i = 0; i < n; ++i)
{
std::cout << i << ": " << str << std::endl;
boost::this_fiber::yield();
}
}
void bar()
{
boost::fibers::fiber f1( boost::bind( foo, "abc", 5) );
boost::fibers::fiber f2( boost::bind( foo, "xyz", 7) );
f1.join();
f2.join();
}

View File

@@ -1,24 +0,0 @@
#include <string>
#include <boost/bind.hpp>
#include <boost/fiber/all.hpp>
int foo( std::string const& str, int n)
{
for ( int i = 0; i < n; ++i)
{
std::cout << i << ": " << str << std::endl;
boost::this_fiber::yield();
}
return n;
}
void bar()
{
boost::fibers::future< int > fi(
boost::fibers::async(
boost::bind( foo, "abc", 5) ) );
fi.wait();
}