[/ Copyright Oliver Kowalke 2013. Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt ] [section:queues Queues] __boost_fiber__ provides a bounded and a unbounded queue suitable to synchonize fibers via message passing. typedef boost::fibers::unbounded_queue< int > queue_t; void send( queue_t & queue) { for ( int i = 0; i < 5; ++i) queue.push( i); queue.close(); } void recv( queue_t & queue) { int i; while ( boost::fibers::queue_op_status::success == queue.pop(i) ) { std::cout << "received " << i << std::endl; } } queue_t queue; boost::fibers::fiber f1( boost::bind( send, ref( queue) ) ); boost::fibers::fiber f2( boost::bind( recv, ref( queue) ) ); f1.join(); f2.join(); [#class_queue_op_status] [heading Enumeration `queue_op_status`] Queue operations return the state of the queue. enum class queue_op_status { success, empty, full, closed, timeout }; [heading `success`] [variablelist [[Effects:] [Operation was successful.]] ] [heading `empty`] [variablelist [[Effects:] [Queue is empty, operation failed.]] ] [heading `full`] [variablelist [[Effects:] [Queue is full, operation failed.]] ] [heading `closed`] [variablelist [[Effects:] [Queue is closed, operation failed.]] ] [heading `timeout`] [variablelist [[Effects:] [The operation did not become ready before specified timeout elapsed.]] ] [template_heading unbounded_queue] #include template< typename T > class unbounded_queue { public: typedef T value_type; unbounded_queue( unbounded_queue const& other) = delete; unbounded_queue & operator=( unbounded_queue const& other) = delete; bool is_closed() const; void close(); bool is_empty(); void push( value_type const& va); void push( value_type && va); queue_op_status pop( value_type & va); template< typename Rep, typename Period > queue_op_status pop_wait_for( value_type & va, chrono::duration< Rep, Period > const& timeout_duration); queue_op_status pop_wait_until( value_type & va, clock_type::time_point const& timeout_time); queue_op_status try_pop( value_type & va); }; [member_heading unbounded_queue..is_closed] bool is_closed() const; [variablelist [[Returns:] [`true` if queue has been closed.]] [[Throws:] [Nothing.]] [[Note:] [The queue is not closed by default.]] ] [member_heading unbounded_queue..close] void close(); [variablelist [[Effects:] [Deactivates the queue. No values can be put after calling `this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` will return `closed`.]] [[Throws:] [Nothing.]] [[Note:] [`close()` is like closing a pipe. It informs waiting consumers that no more values will arrive.]] ] [member_heading unbounded_queue..is_empty] bool is_empty(); [variablelist [[Effects:] [Returns `true` if the queue currently contains no data.]] [[Throws:] [Nothing.]] [[Note:] [This condition is transient. An `is_empty()` queue can become non-empty.]] ] [member_heading unbounded_queue..push] void push( value_type const& va); void push( value_type && va); [variablelist [[Effects:] [Enqueues the value in the queue and wakes up a fiber blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()`.]] [[Throws:] [Nothing.]] ] [member_heading unbounded_queue..pop] queue_op_status pop( value_type & va); [variablelist [[Effects:] [Dequeues a value from the queue. If the queue `is_empty()`, the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value) or the queue gets `close()`d (return value `closed`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading unbounded_queue..pop_wait_for] template< typename Rep, typename Period > queue_op_status pop_wait_for( value_type & va, chrono::duration< Rep, Period > const& timeout_duration) [variablelist [[Effects:] [Accepts `chrono::duration` and internally computes a `clock_type::time_point` as `(clock_type::now() + timeout_duration)`. If `(! this->is_empty())`, immediately dequeues a value from the queue. Otherwise the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value), or the queue gets `close()`d (return value `closed`), or the time as reported by `clock_type::now()` reaches the computed `clock_type::time_point` (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading unbounded_queue..pop_wait_until] queue_op_status pop_wait_until( value_type & va, clock_type::time_point const& timeout_time) [variablelist [[Effects:] [Accepts a `clock_type::time_point`. If `(! this->is_empty())`, immediately dequeues a value from the queue. Otherwise the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value), or the queue gets `close()`d (return value `closed`), or the time as reported by `clock_type::now()` reaches the passed `clock_type::time_point` (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading unbounded_queue..try_pop] queue_op_status try_pop( value_type & va); [variablelist [[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`, returns `closed`. Otherwise it returns `success` and `va` contains the dequeued value.]] [[Throws:] [Nothing.]] ] [template_heading bounded_queue] #include template< typename T > class bounded_queue { public: typedef T value_type; bounded_queue( bounded_queue const& other) = delete; bounded_queue & operator=( bounded_queue const& other) = delete; bounded_queue( std::size_t wm); bounded_queue( std::size_t hwm, std::size_t lwm); bool is_closed() const; void close(); bool is_empty(); bool is_full(); queue_op_status push( value_type const& va); queue_op_status push( value_type && va); template< typename Rep, typename Period > queue_op_status push_wait_for( value_type const& va, chrono::duration< Rep, Period > const& timeout_duration); template< typename Rep, typename Period > queue_op_status push_wait_for( value_type && va, chrono::duration< Rep, Period > const& timeout_duration); queue_op_status push_wait_until( value_type const& va, clock_type::time_point const& timeout_time); queue_op_status push_wait_until( value_type && va, clock_type::time_point const& timeout_time); queue_op_status try_push( value_type const& va); queue_op_status try_push( value_type && va); queue_op_status pop( value_type & va); template< typename Rep, typename Period > queue_op_status pop_wait_for( value_type & va, chrono::duration< Rep, Period > const& timeout_duration); queue_op_status pop_wait_until( value_type & va, clock_type::time_point const& timeout_time); queue_op_status try_pop( value_type & va); }; [heading Constructor] bounded_queue( std::size_t wm); bounded_queue( std::size_t hwm, std::size_t lwm); [variablelist [[Preconditions:] [`hwm >= lwm`]] [[Effects:] [Constructs an object of class `bounded_queue`. The constructor with two arguments constructs an object of class `bounded_queue` with a high-watermark of `hwm` and a low-watermark of `lwm` items. The constructor with one argument effectively sets both `hwm` and `lwm` to the same value `wm`.]] [[Throws:] [`invalid_argument` if `lwm > hwm`.]] [[Notes:] [Once the number of values in the queue reaches `hwm`, any call to `push()`, `push_wait_for()` or `push_wait_until()` will block until the number of values in the queue has dropped below `lwm`. That is, if `lwm < hwm`, the queue can be in a state in which `push()`, `push_wait_for()` or `push_wait_until()` calls will block (`is_full()` returns `true`) even though the number of values in the queue is less than `hwm`.]] ] [member_heading bounded_queue..is_closed] bool is_closed() const; [variablelist [[Returns:] [`true` if queue has been closed.]] [[Throws:] [Nothing.]] [[Note:] [The queue is not closed by default.]] ] [member_heading bounded_queue..close] void close(); [variablelist [[Effects:] [Deactivates the queue. No values can be put after calling `this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` will return `closed`.]] [[Throws:] [Nothing.]] [[Note:] [`close()` is like closing a pipe. It informs waiting consumers that no more values will arrive.]] ] [member_heading bounded_queue..is_empty] bool is_empty(); [variablelist [[Effects:] [Returns `true` if the queue currently contains no data.]] [[Throws:] [Nothing.]] [[Note:] [This condition is transient. An `is_empty()` queue can become non-empty.]] ] [member_heading bounded_queue..is_full] bool is_full(); [variablelist [[Effects:] [Returns `true` if the queue cannot accept more data at present. This happens when the number of values in the queue reaches `wm` or `hwm`. Once the queue becomes full, however, it continues refusing new values until the number of values drops below `lwm`.]] [[Throws:] [Nothing.]] [[Note:] [This condition is transient.]] ] [member_heading bounded_queue..push] queue_op_status push( value_type const& va); queue_op_status push( value_type && va); [variablelist [[Effects:] [If `this->is_closed()`, returns `closed`. If `(! this->is_full())`, enqueues the value in the queue, wakes up a fiber blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`. Otherwise the fiber gets suspended until the number of values in the queue drops below `lwm` (return value `success`), or the queue is `close()`d (return value `closed`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_queue..push_wait_for] template< typename Rep, typename Period > queue_op_status push_wait_for( value_type const& va, chrono::duration< Rep, Period > const& timeout_duration); template< typename Rep, typename Period > queue_op_status push_wait_for( value_type && va, chrono::duration< Rep, Period > const& timeout_duration); [variablelist [[Effects:] [Accepts `chrono::duration` and internally computes a `clock_type::time_point` as `(clock_type::now() + timeout_duration)`. If `this->is_closed()`, returns `closed`. If `(! this->is_full())`, enqueues the value in the queue, wakes up a fiber blocked on `this->pop()` `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`. Otherwise the fiber gets suspended until the number of values in the queue drops below `lwm` (return value `success`), the queue is `close()`d (return value `closed`), or the time as reported by `clock_type::now()` reaches the computed `clock_type::time_point` (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_queue..push_wait_until] queue_op_status push_wait_until( value_type const& va, clock_type::time_point const& timeout_time); queue_op_status push_wait_until( value_type && va, clock_type::time_point const& timeout_time); [variablelist [[Effects:] [Accepts a `clock_type::time_point`. If `this->is_closed()`, returns `closed`. If `(! this->is_full())`, enqueues the value in the queue, wakes up a fiber blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`. Otherwise the fiber gets suspended until the number of values in the queue drops below `lwm` (return value `success`), the queue is `close()`d (return value `closed`), or the time as reported by `clock_type::now()` reaches the passed `clock_type::time_point` (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_queue..try_push] queue_op_status try_push( value_type const& va); queue_op_status try_push( value_type && va); [variablelist [[Effects:] [If `this->is_full()`, returns `full`. If `this->is_closed()`, returns `closed`. Otherwise enqueues the value in the queue, wakes up a fiber blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`.]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_queue..pop] queue_op_status pop( value_type & va); [variablelist [[Effects:] [Dequeues a value from the queue. If the queue `is_empty()`, the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value) or the queue gets `close()`d (return value `closed`). Once the number of items remaining in the queue drops below `lwm`, any fibers blocked on `push()`, `push_wait_for()` or `push_wait_until()` may resume.]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_queue..pop_wait_for] template< typename Rep, typename Period > queue_op_status pop_wait_for( value_type & va, chrono::duration< Rep, Period > const& timeout_duration) [variablelist [[Effects:] [Accepts `chrono::duration` and internally computes a `clock_type::time_point` as `(clock_type::now() + timeout_duration)`. If `(! this->is_empty())`, immediately dequeues a value from the queue. Otherwise the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value), or the queue gets `close()`d (return value `closed`), or the time as reported by `clock_type::now()` reaches the computed `clock_type::time_point` (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_queue..pop_wait_until] queue_op_status pop_wait_until( value_type & va, clock_type::time_point const& timeout_time) [variablelist [[Effects:] [Accepts a `clock_type::time_point`. If `(! this->is_empty())`, immediately dequeues a value from the queue. Otherwise the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value), or the queue gets `close()`d (return value `closed`), or the time as reported by `clock_type::now()` reaches the passed `clock_type::time_point` (return value `timeout`).]] [[Throws:] [__fiber_interrupted__]] ] [member_heading bounded_queue..try_pop] queue_op_status try_pop( value_type & va); [variablelist [[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`, returns `closed`. Otherwise it returns `success` and `va` contains the dequeued value.]] [[Throws:] [Nothing.]] ] [endsect]