[/ Copyright Oliver Kowalke 2013. Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt ] [section:channels Channels] __boost_fiber__ provides a bounded and a unbounded channel suitable to synchonize fibers via message passing. typedef boost::fibers::unbounded_channel< int > channel_t; void send( channel_t & channel) { for ( int i = 0; i < 5; ++i) { channel.push( i); } channel.close(); } void recv( channel_t & channel) { int i; while ( boost::fibers::channel_op_status::success == channel.pop(i) ) { std::cout << "received " << i << std::endl; } } channel_t channel; boost::fibers::fiber f1( std::bind( send, ref( channel) ) ); boost::fibers::fiber f2( std::bind( recv, ref( channel) ) ); f1.join(); f2.join(); [#class_channel_op_status] [heading Enumeration `channel_op_status`] channel operations return the state of the channel. enum class channel_op_status { success, empty, full, closed, timeout }; [heading `success`] [variablelist [[Effects:] [Operation was successful.]] ] [heading `empty`] [variablelist [[Effects:] [channel is empty, operation failed.]] ] [heading `full`] [variablelist [[Effects:] [channel is full, operation failed.]] ] [heading `closed`] [variablelist [[Effects:] [channel is closed, operation failed.]] ] [heading `timeout`] [variablelist [[Effects:] [The operation did not become ready before specified timeout elapsed.]] ] [template_heading unbounded_channel] #include template< typename T > class unbounded_channel { public: typedef T value_type; unbounded_channel( unbounded_channel const& other) = delete; unbounded_channel & operator=( unbounded_channel const& other) = delete; bool is_closed() const; void close(); bool is_empty(); void push( value_type const& va); void push( value_type && va); channel_op_status pop( value_type & va); value_type value_pop(); channel_op_status try_pop( value_type & va); template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time); }; [member_heading unbounded_channel..is_closed] bool is_closed() const; [variablelist [[Returns:] [`true` if channel has been closed.]] [[Throws:] [Nothing.]] [[Note:] [The channel is not closed by default.]] ] [member_heading unbounded_channel..close] void close(); [variablelist [[Effects:] [Deactivates the channel. No values can be put after calling `this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` will return `closed`.]] [[Throws:] [Nothing.]] [[Note:] [`close()` is like closing a pipe. It informs waiting consumers that no more values will arrive.]] ] [member_heading unbounded_channel..is_empty] bool is_empty(); [variablelist [[Effects:] [Returns `true` if the channel currently contains no data.]] [[Throws:] [Nothing.]] [[Note:] [This condition is transient. An `is_empty()` channel can become non-empty.]] ] [member_heading unbounded_channel..push] void push( value_type const& va); void push( value_type && va); [variablelist [[Effects:] [Enchannels the value in the channel and wakes up a fiber blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()`.]] [[Throws:] [Nothing.]] ] [member_heading unbounded_channel..pop] channel_op_status pop( value_type & va); [variablelist [[Effects:] [Dechannels a value from the channel. If the channel `is_empty()`, the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dechanneld value) or the channel gets `close()`d (return value `closed`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading unbounded_channel..value_pop] value_type value_pop(); [variablelist [[Effects:] [Dechannels a value from the channel. If the channel `is_empty()`, the fiber gets suspended until at least one new item is `push()`ed or the channel gets `close()`d (which throws an exception).]] [[Throws:] [`logic_error` if `*this` is closed; __fiber_interrupted__]] ] [member_heading unbounded_channel..try_pop] channel_op_status try_pop( value_type & va); [variablelist [[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`, returns `closed`. Otherwise it returns `success` and `va` contains the dechanneld value.]] [[Throws:] [Nothing.]] ] [member_heading unbounded_channel..pop_wait_for] template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration) [variablelist [[Effects:] [Accepts `std::chrono::duration` and internally computes a `clock_type::time_point` as `(clock_type::now() + timeout_duration)`. If `(! this->is_empty())`, immediately dechannels a value from the channel. Otherwise the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dechanneld value), or the channel gets `close()`d (return value `closed`), or the time as reported by `clock_type::now()` reaches the computed `clock_type::time_point` (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading unbounded_channel..pop_wait_until] template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time) [variablelist [[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`. If `(! this->is_empty())`, immediately dechannels a value from the channel. Otherwise the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dechanneld value), or the channel gets `close()`d (return value `closed`), or the time as reported by `clock_type::now()` reaches the passed `clock_type::time_point` (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [template_heading bounded_channel] #include template< typename T > class bounded_channel { public: typedef T value_type; bounded_channel( bounded_channel const& other) = delete; bounded_channel & operator=( bounded_channel const& other) = delete; bounded_channel( std::size_t wm); bounded_channel( std::size_t hwm, std::size_t lwm); std::size_t upper_bound() const; std::size_t lower_bound() const; bool is_closed() const; void close(); bool is_empty() const; bool is_full() const; channel_op_status push( value_type const& va); channel_op_status push( value_type && va); template< typename Rep, typename Period > channel_op_status push_wait_for( value_type const& va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Rep, typename Period > channel_op_status push_wait_for( value_type && va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type const& va, std::chrono::time_point< Clock, Duration > const& timeout_time); template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type && va, std::chrono::time_point< Clock, Duration > const& timeout_time); channel_op_status try_push( value_type const& va); channel_op_status try_push( value_type && va); value_type value_pop(); channel_op_status pop( value_type & va); template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time); channel_op_status try_pop( value_type & va); }; [heading Constructor] bounded_channel( std::size_t wm); bounded_channel( std::size_t hwm, std::size_t lwm); [variablelist [[Preconditions:] [`hwm >= lwm`]] [[Effects:] [Constructs an object of class `bounded_channel`. The constructor with two arguments constructs an object of class `bounded_channel` with a high-watermark of `hwm` and a low-watermark of `lwm` items. The constructor with one argument effectively sets both `hwm` and `lwm` to the same value `wm`.]] [[Throws:] [`invalid_argument` if `lwm > hwm`.]] [[Notes:] [Once the number of values in the channel reaches `hwm`, any call to `push()`, `push_wait_for()` or `push_wait_until()` will block until the number of values in the channel has dropped below `lwm`. That is, if `lwm < hwm`, the channel can be in a state in which `push()`, `push_wait_for()` or `push_wait_until()` calls will block (`is_full()` returns `true`) even though the number of values in the channel is less than `hwm`.]] ] [member_heading bounded_channel..upper_bound] std::size_t upper_bound() const; [variablelist [[Returns:] [the high-watermark with which `*this` was constructed.]] [[Throws:] [Nothing.]] ] [member_heading bounded_channel..lower_bound] std::size_t lower_bound() const; [variablelist [[Returns:] [the low-watermark with which `*this` was constructed.]] [[Throws:] [Nothing.]] ] [member_heading bounded_channel..is_closed] bool is_closed() const; [variablelist [[Returns:] [`true` if channel has been closed.]] [[Throws:] [Nothing.]] [[Note:] [The channel is not closed by default.]] ] [member_heading bounded_channel..close] void close(); [variablelist [[Effects:] [Deactivates the channel. No values can be put after calling `this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` will return `closed`.]] [[Throws:] [Nothing.]] [[Note:] [`close()` is like closing a pipe. It informs waiting consumers that no more values will arrive.]] ] [member_heading bounded_channel..is_empty] bool is_empty() const; [variablelist [[Effects:] [Returns `true` if the channel currently contains no data.]] [[Throws:] [Nothing.]] [[Note:] [This condition is transient. An `is_empty()` channel can become non-empty.]] ] [member_heading bounded_channel..is_full] bool is_full() const; [variablelist [[Effects:] [Returns `true` if the channel cannot accept more data at present. This happens when the number of values in the channel reaches `wm` or `hwm`. Once the channel becomes full, however, it continues refusing new values until the number of values drops below `lwm`.]] [[Throws:] [Nothing.]] [[Note:] [This condition is transient.]] ] [member_heading bounded_channel..push] channel_op_status push( value_type const& va); channel_op_status push( value_type && va); [variablelist [[Effects:] [If `this->is_closed()`, returns `closed`. If `(! this->is_full())`, enchannels the value in the channel, wakes up a fiber blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`. Otherwise the fiber gets suspended until the number of values in the channel drops below `lwm` (return value `success`), or the channel is `close()`d (return value `closed`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_channel..push_wait_for] template< typename Rep, typename Period > channel_op_status push_wait_for( value_type const& va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Rep, typename Period > channel_op_status push_wait_for( value_type && va, std::chrono::duration< Rep, Period > const& timeout_duration); [variablelist [[Effects:] [Accepts `std::chrono::duration` and internally computes a time_point as `(now() + timeout_duration)`. If `this->is_closed()`, returns `closed`. If `(! this->is_full())`, enchannels the value in the channel, wakes up a fiber blocked on `this->pop()` `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`. Otherwise the calling fiber is suspended until the number of values in the channel drops below `lwm` (return value `success`), the channel is `close()`d (return value `closed`), or the time as reported by `now()` reaches the computed time_point (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_channel..push_wait_until] template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type const& va, std::chrono::time_point< Clock, Duration > const& timeout_time); template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type && va, std::chrono::time_point< Clock, Duration > const& timeout_time); [variablelist [[Effects:] [Accepts an absolute `timeout_time` in any supported time_point type. If `this->is_closed()`, returns `closed`. If `(! this->is_full())`, enchannels the value in the channel, wakes up a fiber blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`. Otherwise the calling fiber is suspended until the number of values in the channel drops below `lwm` (return value `success`), the channel is `close()`d (return value `closed`), or the time as reported by `now()` reaches the passed time_point (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_channel..try_push] channel_op_status try_push( value_type const& va); channel_op_status try_push( value_type && va); [variablelist [[Effects:] [If `this->is_full()`, returns `full`. If `this->is_closed()`, returns `closed`. Otherwise enchannels the value in the channel, wakes up a fiber blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`.]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_channel..pop] channel_op_status pop( value_type & va); [variablelist [[Effects:] [Dechannels a value from the channel. If the channel `is_empty()`, the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dechanneld value) or the channel gets `close()`d (return value `closed`). Once the number of items remaining in the channel drops below `lwm`, any fibers blocked on `push()`, `push_wait_for()` or `push_wait_until()` may resume.]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_channel..value_pop] value_type value_pop(); [variablelist [[Effects:] [Dechannels a value from the channel. If the channel `is_empty()`, the fiber gets suspended until at least one new item is `push()`ed or the channel gets `close()`d (which throws an exception). Once the number of items remaining in the channel drops below `lwm`, any fibers blocked on `push()`, `push_wait_for()` or `push_wait_until()` may resume.]] [[Throws:] [`logic_error` if `*this` is closed; __fiber_interrupted__]] ] [member_heading bounded_channel..try_pop] channel_op_status try_pop( value_type & va); [variablelist [[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`, returns `closed`. Otherwise it returns `success` and `va` contains the dechanneld value.]] [[Throws:] [Nothing.]] ] [member_heading bounded_channel..pop_wait_for] template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration) [variablelist [[Effects:] [Accepts `std::chrono::duration` and internally computes a time_point as `(now() + timeout_duration)`. If `(! this->is_empty())`, immediately dechannels a value from the channel. Otherwise the calling fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dechanneld value), or the channel gets `close()`d (return value `closed`), or the time as reported by `now()` reaches the computed time_point (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_channel..pop_wait_until] template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time); [variablelist [[Effects:] [Accepts an absolute `timeout_time` in any supported time_point type. If `(! this->is_empty())`, immediately dechannels a value from the channel. Otherwise the calling fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dechanneld value), or the channel gets `close()`d (return value `closed`), or the time as reported by `now()` reaches the passed time_point (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [endsect]