Boost C++ Libraries Home Libraries People FAQ More

PrevUpHomeNext

Integrating Fibers with Asynchronous Callbacks

Overview

One of the primary benefits of Boost.Fiber is the ability to use asynchronous operations for efficiency, while at the same time structuring the calling code as if the operations were synchronous. Asynchronous operations provide completion notification in a variety of ways, but most involve a callback function of some kind. This section discusses tactics for interfacing Boost.Fiber with an arbitrary async operation.

For purposes of illustration, consider the following hypothetical API:

class AsyncAPI {
public:
    // constructor acquires some resource that can be read and written
    AsyncAPI();

    // callbacks accept an int error code; 0 == success
    typedef int errorcode;

    // write callback only needs to indicate success or failure
    void init_write( std::string const& data,
                     std::function< void( errorcode) > const& callback);

    // read callback needs to accept both errorcode and data
    void init_read( std::function< void( errorcode, std::string const&) > const&);

    // ... other operations ...
};

The significant points about each of init_write() and init_read() are:

We would like to wrap these asynchronous methods in functions that appear synchronous by blocking the calling fiber until the operation completes. This lets us use the wrapper function's return value to deliver relevant data.

[Tip] Tip

promise<> and future<> are your friends here.

Return Errorcode

The AsyncAPI::init_write() callback passes only an errorcode. If we simply want the blocking wrapper to return that errorcode, this is an extremely straightforward use of promise<> and future<>:

AsyncAPI::errorcode write_ec( AsyncAPI & api, std::string const& data) {
    boost::fibers::promise< AsyncAPI::errorcode > promise;
    boost::fibers::future< AsyncAPI::errorcode > future( promise.get_future() );
    // We can confidently bind a reference to local variable 'promise' into
    // the lambda callback because we know for a fact we're going to suspend
    // (preserving the lifespan of both 'promise' and 'future') until the
    // callback has fired.
    api.init_write( data,
                    [&promise]( AsyncAPI::errorcode ec){
                        promise.set_value( ec);
                    });
    return future.get();
}

All we have to do is:

  1. Instantiate a promise<> of correct type.
  2. Obtain its future<>.
  3. Arrange for the callback to call promise::set_value().
  4. Block on future::get().
[Note] Note

This tactic for resuming a pending fiber works even if the callback is called on a different thread than the one on which the initiating fiber is running. In fact, the example program's dummy AsyncAPI implementation illustrates that: it simulates async I/O by launching a new thread that sleeps briefly and then calls the relevant callback.

Success or Exception

A wrapper more aligned with modern C++ practice would use an exception, rather than an errorcode, to communicate failure to its caller. This is straightforward to code in terms of write_ec():

void write( AsyncAPI & api, std::string const& data) {
    AsyncAPI::errorcode ec = write_ec( api, data);
    if ( ec) {
        throw make_exception("write", ec);
    }
}

The point is that since each fiber has its own stack, you need not repeat messy boilerplate: normal encapsulation works.

Return Errorcode or Data

Things get a bit more interesting when the async operation's callback passes multiple data items of interest. One approach would be to use std::pair<> to capture both:

std::pair< AsyncAPI::errorcode, std::string > read_ec( AsyncAPI & api) {
    typedef std::pair< AsyncAPI::errorcode, std::string > result_pair;
    boost::fibers::promise< result_pair > promise;
    boost::fibers::future< result_pair > future( promise.get_future() );
    // We promise that both 'promise' and 'future' will survive until our
    // lambda has been called.
    api.init_read( [&promise]( AsyncAPI::errorcode ec, std::string const& data){
                       promise.set_value( result_pair( ec, data) );
                   });
    return future.get();
}

Once you bundle the interesting data in std::pair<>, the code is effectively identical to write_ec(). You can call it like this:

std::tie( ec, data) = read_ec( api);

Data or Exception

But a more natural API for a function that obtains data is to return only the data on success, throwing an exception on error.

As with write() above, it's certainly possible to code a read() wrapper in terms of read_ec(). But since a given application is unlikely to need both, let's code read() from scratch, leveraging promise::set_exception():

std::string read( AsyncAPI & api) {
    boost::fibers::promise< std::string > promise;
    boost::fibers::future< std::string > future( promise.get_future() );
    // Both 'promise' and 'future' will survive until our lambda has been
    // called.
    api.init_read( [&promise]( AsyncAPI::errorcode ec, std::string const& data){
                       if ( ! ec) {
                           promise.set_value( data);
                       } else {
                           promise.set_exception(
                                   std::make_exception_ptr(
                                       make_exception("read", ec) ) );
                       }
                   });
    return future.get();
}

future::get() will do the right thing, either returning std::string or throwing an exception.

Success/Error Virtual Methods

One classic approach to completion notification is to define an abstract base class with success() and error() methods. Code wishing to perform async I/O must derive a subclass, override each of these methods and pass the async operation a pointer to a subclass instance. The abstract base class might look like this:

// every async operation receives a subclass instance of this abstract base
// class through which to communicate its result
struct Response {
    typedef std::shared_ptr< Response > ptr;

    // called if the operation succeeds
    virtual void success( std::string const& data) = 0;

    // called if the operation fails
    virtual void error( AsyncAPIBase::errorcode ec) = 0;
};

Now the AsyncAPI operation might look more like this:

// derive Response subclass, instantiate, pass Response::ptr
void init_read( Response::ptr);

We can address this by writing a one-size-fits-all PromiseResponse:

class PromiseResponse: public Response {
public:
    // called if the operation succeeds
    virtual void success( std::string const& data) {
        promise_.set_value( data);
    }

    // called if the operation fails
    virtual void error( AsyncAPIBase::errorcode ec) {
        promise_.set_exception(
                std::make_exception_ptr(
                    make_exception("read", ec) ) );
    }

    boost::fibers::future< std::string > get_future() {
        return promise_.get_future();
    }

private:
    boost::fibers::promise< std::string >   promise_;
};

Now we can simply obtain the future<> from that PromiseResponse and wait on its get():

std::string read( AsyncAPI & api) {
    // Because init_read() requires a shared_ptr, we must allocate our
    // ResponsePromise on the heap, even though we know its lifespan.
    auto promisep( std::make_shared< PromiseResponse >() );
    boost::fibers::future< std::string > future( promisep->get_future() );
    // Both 'promisep' and 'future' will survive until our lambda has been
    // called.
    api.init_read( promisep);
    return future.get();
}

The source code above is found in adapt_callbacks.cpp and adapt_method_calls.cpp.

Then There's Boost.Asio

Since the simplest form of Boost.Asio asynchronous operation completion token is a callback function, we could apply the same tactics for Asio as for our hypothetical AsyncAPI asynchronous operations.

Fortunately we need not. Boost.Asio incorporates a mechanism by which the caller can customize the notification behavior of every async operation. Therefore we can construct a completion token which, when passed to a Boost.Asio async operation, requests blocking for the calling fiber. The underlying implementation uses the same mechanism as described above.

boost::fibers::asio::yield is such a completion token. yield is an instance of yield_t:

[fibers_asio_yield]

which is a promise_completion_token:

[fibers_asio_yield_t]

promise_completion_token is common to both yield and use_future. (The interested reader is encouraged to learn more about use_future in example source code.)

promise_completion_token is in fact only a placeholder, a way to trigger Boost.Asio customization. It can bind a custom allocator or boost::system::error_code for use by the actual handler.

[fibers_asio_promise_completion_token]

Asio customization is engaged by specializing boost::asio::handler_type<> for yield_t:

[asio_handler_type]

(There are actually four different specializations in detail/yield.hpp, one for each of the four Asio async callback signatures we expect to have to support.)

The above directs Asio to use yield_handler as the actual handler for an async operation to which yield is passed.

yield_handler is simply an alias for promise_handler, because promise_handler is shared with the use_future machinery:

[fibers_asio_yield_handler]

promise_handler isa promise_handler_base:

[fibers_asio_promise_handler_base]

As promised, promise_handler_base binds a promise<> of appropriate type. (We store a shared_ptr< promise< T > > because the promise_handler instance is copied on its way into underlying Asio machinery.)

Asio, having consulted the handler_type<> traits specialization, instantiates a yield_handler (aka promise_handler) as the async operation's callback:

[fibers_asio_promise_handler]

Like the lambda callback in our read(AsyncAPI&) presented earlier, promise_handler::operator()() either calls promise::set_value() or promise::set_exception() (via promise_handler_base::should_set_value()).

The source code above is found in yield.hpp, promise_completion_token.hpp, detail/yield.hpp and detail/promise_handler.hpp.


PrevUpHomeNext