mirror of
https://github.com/boostorg/coroutine.git
synced 2026-02-11 23:52:26 +00:00
336 lines
12 KiB
Plaintext
336 lines
12 KiB
Plaintext
[/
|
|
Copyright Oliver Kowalke 2009.
|
|
Distributed under the Boost Software License, Version 1.0.
|
|
(See accompanying file LICENSE_1_0.txt or copy at
|
|
http://www.boost.org/LICENSE_1_0.txt
|
|
]
|
|
|
|
[section:intro Introduction]
|
|
|
|
[heading Definition]
|
|
|
|
In computer science routines are defined as a sequence of operations.
|
|
The execution of routines form a parent-child relationship and the child
|
|
terminates always before the parent.
|
|
Coroutines are a generalization of routines.
|
|
The principal difference between coroutines and routines is that a coroutine
|
|
enables explicit suspend and resume their progress via additional operations by
|
|
preserving local state, e.g. a coroutine is a kind of continuation.
|
|
A continuation is a object representing a suspended execution (registers,
|
|
stack). Each coroutine has its own stack and local variables, sub-routine calls
|
|
etc.
|
|
In this sense coroutines are (actually) a language concept.
|
|
|
|
[heading How it works]
|
|
|
|
Functions foo() and bar() are supposed to alternate their execution (leave and
|
|
enter function body).
|
|
|
|
[$../../../../libs/coroutine/doc/images/foo_bar.png [align center]]
|
|
|
|
If coroutines would be called such as routines, the stack would grow with
|
|
every call and will never be degraded. A jump into the middle of a coroutine
|
|
would not be possible, because the return address would have been on top of
|
|
stack entries.
|
|
|
|
The solution is that each coroutine has its own stack and control-block
|
|
(__fcontext__ from __boost_context__).
|
|
Before the coroutine gets suspended, the non-volatile registers (including stack
|
|
and instruction/program pointer) of the currently active coroutine are stored in
|
|
coroutine's control-block.
|
|
The registers of the newly activated coroutine must be restored from its
|
|
associated control-block before it can continue with their work.
|
|
|
|
[$../../../../libs/coroutine/doc/images/foo_bar_seq.png [align center]]
|
|
|
|
The context switch requires no system privileges and provides cooperative
|
|
multitasking on the level of language. Coroutines provide quasi parallelism.
|
|
When a program is supposed to do several things at the same time, coroutines
|
|
help to do this much simpler and more elegant than with only a single flow of
|
|
control.
|
|
Advantages can be seen particularly clearly with the use of a recursive
|
|
function, such as traversal of binary trees (see example 'same fringe').
|
|
|
|
|
|
[heading Example: asio::io_stream with std::stream]
|
|
|
|
This section demonstrates how stackfull coroutines help to use standard C++
|
|
IO-streams together with IO-demultiplexer like __io_service__ (using
|
|
non-blocking IO).
|
|
|
|
int main( int argc, char * argv[])
|
|
{
|
|
...
|
|
{
|
|
boost::asio::io_service io_service;
|
|
io_service.post(
|
|
boost::bind(
|
|
& server::start,
|
|
server::create(
|
|
io_service, port) ) );
|
|
io_service.run();
|
|
}
|
|
...
|
|
}
|
|
|
|
|
|
__server__ accepts connection-requests made by clients, creates for each new
|
|
connection an instance of type __session__ and invokes __start__ on it.
|
|
|
|
class server : public boost::enable_shared_from_this< server >
|
|
{
|
|
private:
|
|
boost::asio::io_service & io_service_;
|
|
boost::asio::ip::tcp::acceptor acceptor_;
|
|
|
|
void handle_accept_( session * new_session, boost::system::error_code const& error)
|
|
{
|
|
if ( ! error)
|
|
{
|
|
// start asynchronous read
|
|
new_session->start();
|
|
|
|
// start asynchronous accept
|
|
start();
|
|
}
|
|
}
|
|
|
|
server( boost::asio::io_service & io_service, short port) :
|
|
io_service_( io_service),
|
|
acceptor_(
|
|
io_service_,
|
|
boost::asio::ip::tcp::endpoint( boost::asio::ip::tcp::v4(), port) )
|
|
{}
|
|
|
|
public:
|
|
typedef boost::shared_ptr< server > ptr_t;
|
|
|
|
static ptr_t create( boost::asio::io_service & io_service, short port)
|
|
{ return ptr_t( new server( io_service, port) ); }
|
|
|
|
void start()
|
|
{
|
|
// create new session which gets started if asynchronous
|
|
// accept completes
|
|
session * new_session( new session( io_service_) );
|
|
acceptor_.async_accept(
|
|
new_session->socket(),
|
|
boost::bind( & server::handle_accept_, this->shared_from_this(),
|
|
new_session, boost::asio::placeholders::error) );
|
|
}
|
|
};
|
|
|
|
|
|
Each __session__ communicates with the connected client and handles the
|
|
requests.
|
|
The application protocol in this example uses TCP-sockets as channel and
|
|
'newline' to separate the messages in the byte stream.
|
|
An application protocol is a set of rules for the order in which messages are
|
|
exchanged.
|
|
['std::istream] is used to extract the messages from the character stream .
|
|
Message 'exit' terminates the session.
|
|
|
|
class session : private boost::noncopyable
|
|
{
|
|
private:
|
|
void handle_read_( coro_t::caller_type & self)
|
|
{
|
|
// create stream-buffer reading from socket
|
|
inbuf buf( socket_);
|
|
std::istream s( & buf);
|
|
|
|
// messages are separated by 'newline'
|
|
std::string msg;
|
|
std::getline( s, msg);
|
|
std::cout << msg << std::endl;
|
|
|
|
// terminate session for message 'exit'
|
|
// else do asynchronous read
|
|
if ( "exit" == msg)
|
|
io_service_.post(
|
|
boost::bind(
|
|
& session::destroy_, this) );
|
|
else
|
|
start();
|
|
}
|
|
|
|
void destroy_()
|
|
{ delete this; }
|
|
|
|
boost::asio::io_service & io_service_;
|
|
boost::asio::ip::tcp::socket socket_;
|
|
|
|
public:
|
|
session( boost::asio::io_service & io_service) :
|
|
io_service_( io_service),
|
|
socket_( io_service_)
|
|
{ std::cout << "service(): " << socket_.remote_endpoint() << std::endl; }
|
|
|
|
~session()
|
|
{ std::cout << "~service(): " << socket_.remote_endpoint() << std::endl; }
|
|
|
|
boost::asio::ip::tcp::socket & socket()
|
|
{ return socket_; }
|
|
|
|
void start()
|
|
{
|
|
// register on io_service for asynchronous read
|
|
io_service_.async_read(
|
|
socket_,
|
|
boost::bind(
|
|
& session::handle_read_, this->shared_from_this(), _1, _2) );
|
|
}
|
|
};
|
|
|
|
|
|
Function __getline__ returns only if a 'newline' was read from the socket.
|
|
Therefore the application will block until 'newline' is received by the socket.
|
|
The stream-buffer used by the stream maintains an internal buffer which gets
|
|
(re-)filled by its function __underflow__. __underflow__ does the
|
|
read-operation on the socket. The C++ IO-streams framework does not provide an
|
|
easy way to create an continuation which represents reading bytes from the socket.
|
|
|
|
|
|
Coroutines help in this case to make the application non-blocking even if no
|
|
'newline' was received.
|
|
Class ['session] creates a coroutine which uses __handle_read__ as
|
|
__coro_fn__. On a new created ['session] ['start()] called starting the
|
|
coroutine. In the __coro_fn__ __handle_read__ the messages are received
|
|
via __getline__ in a loop until 'exit' is delivered.
|
|
|
|
class session : private boost::noncopyable
|
|
{
|
|
private:
|
|
void handle_read_( coro_t::caller_type & ca)
|
|
{
|
|
// create stream-buffer with coroutine
|
|
inbuf buf( socket_, coro_, ca);
|
|
std::istream s( & buf);
|
|
|
|
std::string msg;
|
|
do
|
|
{
|
|
// read message
|
|
// we not block if no newline was received yet
|
|
std::getline( s, msg);
|
|
std::cout << msg << std::endl;
|
|
} while ( msg != "exit");
|
|
io_service_.post(
|
|
boost::bind(
|
|
& session::destroy_, this) );
|
|
}
|
|
|
|
void destroy_()
|
|
{ delete this; }
|
|
|
|
coro_t coro_;
|
|
boost::asio::io_service & io_service_;
|
|
boost::asio::ip::tcp::socket socket_;
|
|
|
|
public:
|
|
session( boost::asio::io_service & io_service) :
|
|
coro_(),
|
|
io_service_( io_service),
|
|
socket_( io_service_)
|
|
{ std::cout << "service(): " << socket_.remote_endpoint() << std::endl; }
|
|
|
|
~session()
|
|
{ std::cout << "~service(): " << socket_.remote_endpoint() << std::endl; }
|
|
|
|
boost::asio::ip::tcp::socket & socket()
|
|
{ return socket_; }
|
|
|
|
void start()
|
|
{
|
|
// create and start a coroutine
|
|
// handle_read_() is used as coroutine-function
|
|
coro_ = coro_t( boost::bind( & session::handle_read_, this, _1) );
|
|
}
|
|
};
|
|
|
|
|
|
The stream-buffer is created with __coro_caller__ and handles suspend/resume of this
|
|
code path depending on if bytes can be read from the socket.
|
|
|
|
class inbuf : public std::streambuf,
|
|
private boost::noncopyable
|
|
{
|
|
private:
|
|
static const std::streamsize pb_size;
|
|
|
|
enum
|
|
{ bf_size = 16 };
|
|
|
|
int fetch_()
|
|
{
|
|
std::streamsize num = std::min(
|
|
static_cast< std::streamsize >( gptr() - eback() ), pb_size);
|
|
|
|
std::memmove(
|
|
buffer_ + ( pb_size - num),
|
|
gptr() - num, num);
|
|
|
|
// read bytes from the socket into internal buffer 'buffer_'
|
|
// make coro_t::operator() as callback, invoked if some
|
|
// bytes are read into 'buffer_'
|
|
s_.async_read_some(
|
|
boost::asio::buffer( buffer_ + pb_size, bf_size - pb_size),
|
|
boost::bind( & coro_t::operator(), & coro_, _1, _2) );
|
|
// suspend this coroutine
|
|
ca_();
|
|
|
|
// coroutine was resumed by boost::asio::io_sevice
|
|
boost::system::error_code ec;
|
|
std::size_t n = 0;
|
|
|
|
// check arguments
|
|
boost::tie( ec, n) = ca_.get();
|
|
|
|
// check if an error occurred
|
|
if ( ec)
|
|
{
|
|
setg( 0, 0, 0);
|
|
return -1;
|
|
}
|
|
|
|
setg( buffer_ + pb_size - num, buffer_ + pb_size, buffer_ + pb_size + n);
|
|
return n;
|
|
}
|
|
|
|
boost::asio::ip::tcp::socket & s_;
|
|
coro_t & coro_;
|
|
coro_t::caller_type & ca_;
|
|
char buffer_[bf_size];
|
|
|
|
protected:
|
|
virtual int underflow()
|
|
{
|
|
if ( gptr() < egptr() )
|
|
return traits_type::to_int_type( * gptr() );
|
|
|
|
if ( 0 > fetch_() )
|
|
return traits_type::eof();
|
|
else
|
|
return traits_type::to_int_type( * gptr() );
|
|
}
|
|
|
|
public:
|
|
inbuf(
|
|
boost::asio::ip::tcp::socket & s,
|
|
coro_t & coro,
|
|
coro_t::caller_type & ca) :
|
|
s_( s), coro_( coro), ca_( ca), buffer_()
|
|
{ setg( buffer_ + 4, buffer_ + 4, buffer_ + 4); }
|
|
};
|
|
const std::streamsize inbuf::pb_size = 4;
|
|
|
|
|
|
__fetch__ uses __coro_op__ as callback for the asynchronous read-operation on the
|
|
socket and suspends itself (['ca_()] jumps back to __start__). If some bytes are
|
|
available in the socket receive buffer __io_service__ copies the bytes to the
|
|
internal buffer ['buffer_] and invokes the callback which resumes the coroutine
|
|
(['ca_()] returns).
|
|
|
|
|
|
[endsect]
|