mirror of
https://github.com/boostorg/fiber.git
synced 2026-02-02 08:52:07 +00:00
Use std::size_t as argument type for barrier constructor. Add bounded_queue::upper_bound(), lower_bound(), value_pop(). Make is_empty(), is_full() const. Add TimePointType template param to push_wait_until(), pop_wait_until().
488 lines
16 KiB
Plaintext
488 lines
16 KiB
Plaintext
[/
|
|
Copyright Oliver Kowalke 2013.
|
|
Distributed under the Boost Software License, Version 1.0.
|
|
(See accompanying file LICENSE_1_0.txt or copy at
|
|
http://www.boost.org/LICENSE_1_0.txt
|
|
]
|
|
|
|
[section:queues Queues]
|
|
|
|
__boost_fiber__ provides a bounded and a unbounded queue suitable to
|
|
synchonize fibers via message passing.
|
|
|
|
typedef boost::fibers::unbounded_queue< int > queue_t;
|
|
|
|
void send( queue_t & queue)
|
|
{
|
|
for ( int i = 0; i < 5; ++i)
|
|
queue.push( i);
|
|
queue.close();
|
|
}
|
|
|
|
void recv( queue_t & queue)
|
|
{
|
|
int i;
|
|
while ( boost::fibers::queue_op_status::success == queue.pop(i) )
|
|
{ std::cout << "received " << i << std::endl; }
|
|
}
|
|
|
|
queue_t queue;
|
|
boost::fibers::fiber f1( boost::bind( send, ref( queue) ) );
|
|
boost::fibers::fiber f2( boost::bind( recv, ref( queue) ) );
|
|
|
|
f1.join();
|
|
f2.join();
|
|
|
|
[#class_queue_op_status]
|
|
[heading Enumeration `queue_op_status`]
|
|
|
|
Queue operations return the state of the queue.
|
|
|
|
enum class queue_op_status
|
|
{
|
|
success,
|
|
empty,
|
|
full,
|
|
closed,
|
|
timeout
|
|
};
|
|
|
|
[heading `success`]
|
|
[variablelist
|
|
[[Effects:] [Operation was successful.]]
|
|
]
|
|
|
|
[heading `empty`]
|
|
[variablelist
|
|
[[Effects:] [Queue is empty, operation failed.]]
|
|
]
|
|
|
|
[heading `full`]
|
|
[variablelist
|
|
[[Effects:] [Queue is full, operation failed.]]
|
|
]
|
|
|
|
[heading `closed`]
|
|
[variablelist
|
|
[[Effects:] [Queue is closed, operation failed.]]
|
|
]
|
|
|
|
[heading `timeout`]
|
|
[variablelist
|
|
[[Effects:] [The operation did not become ready before specified timeout elapsed.]]
|
|
]
|
|
|
|
[template_heading unbounded_queue]
|
|
|
|
#include <boost/fiber/unbounded_queue.hpp>
|
|
|
|
template< typename T >
|
|
class unbounded_queue
|
|
{
|
|
public:
|
|
typedef T value_type;
|
|
|
|
unbounded_queue( unbounded_queue const& other) = delete;
|
|
unbounded_queue & operator=( unbounded_queue const& other) = delete;
|
|
|
|
bool is_closed() const;
|
|
void close();
|
|
|
|
bool is_empty();
|
|
|
|
void push( value_type const& va);
|
|
void push( value_type && va);
|
|
|
|
queue_op_status pop( value_type & va);
|
|
template< typename Rep, typename Period >
|
|
queue_op_status pop_wait_for( value_type & va,
|
|
chrono::duration< Rep, Period > const& timeout_duration);
|
|
queue_op_status pop_wait_until( value_type & va,
|
|
clock_type::time_point const& timeout_time);
|
|
queue_op_status try_pop( value_type & va);
|
|
};
|
|
|
|
[member_heading unbounded_queue..is_closed]
|
|
|
|
bool is_closed() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [`true` if queue has been closed.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [The queue is not closed by default.]]
|
|
]
|
|
|
|
[member_heading unbounded_queue..close]
|
|
|
|
void close();
|
|
|
|
[variablelist
|
|
[[Effects:] [Deactivates the queue. No values can be put after calling
|
|
`this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()`
|
|
or `this->pop_wait_until()` will return `closed`.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [`close()` is like closing a pipe. It informs waiting consumers
|
|
that no more values will arrive.]]
|
|
]
|
|
|
|
[member_heading unbounded_queue..is_empty]
|
|
|
|
bool is_empty();
|
|
|
|
[variablelist
|
|
[[Effects:] [Returns `true` if the queue currently contains no data.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [This condition is transient. An `is_empty()` queue can become
|
|
non-empty.]]
|
|
]
|
|
|
|
[member_heading unbounded_queue..push]
|
|
|
|
void push( value_type const& va);
|
|
void push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [Enqueues the value in the queue and wakes up a fiber blocked on
|
|
`this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()`.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[member_heading unbounded_queue..pop]
|
|
|
|
queue_op_status pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the queue. If the queue `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed (return value
|
|
`success` and `va` contains dequeued value) or the queue gets `close()`d
|
|
(return value `closed`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading unbounded_queue..pop_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
queue_op_status pop_wait_for( value_type & va,
|
|
chrono::duration< Rep, Period > const& timeout_duration)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `chrono::duration` and internally
|
|
computes a `clock_type::time_point` as `(clock_type::now() +
|
|
timeout_duration)`. If `(! this->is_empty())`, immediately dequeues a value
|
|
from the queue. Otherwise the fiber gets suspended until at least one new item
|
|
is `push()`ed (return value `success` and `va` contains dequeued value), or
|
|
the queue gets `close()`d (return value `closed`), or the time as reported by
|
|
`clock_type::now()` reaches the computed `clock_type::time_point`
|
|
(return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading unbounded_queue..pop_wait_until]
|
|
|
|
queue_op_status pop_wait_until( value_type & va,
|
|
clock_type::time_point const& timeout_time)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts a `clock_type::time_point`. If `(! this->is_empty())`,
|
|
immediately dequeues a value from the queue. Otherwise the fiber gets suspended
|
|
until at least one new item is `push()`ed (return value `success` and `va`
|
|
contains dequeued value), or the queue gets `close()`d (return value `closed`),
|
|
or the time as reported by `clock_type::now()` reaches the passed
|
|
`clock_type::time_point` (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading unbounded_queue..try_pop]
|
|
|
|
queue_op_status try_pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`,
|
|
returns `closed`. Otherwise it returns `success` and `va` contains the
|
|
dequeued value.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
|
|
[template_heading bounded_queue]
|
|
|
|
#include <boost/fiber/bounded_queue.hpp>
|
|
|
|
template< typename T >
|
|
class bounded_queue
|
|
{
|
|
public:
|
|
typedef T value_type;
|
|
|
|
bounded_queue( bounded_queue const& other) = delete;
|
|
bounded_queue & operator=( bounded_queue const& other) = delete;
|
|
|
|
bounded_queue( std::size_t wm);
|
|
bounded_queue( std::size_t hwm, std::size_t lwm);
|
|
|
|
std::size_t upper_bound() const;
|
|
std::size_t lower_bound() const;
|
|
|
|
bool is_closed() const;
|
|
void close();
|
|
|
|
bool is_empty() const;
|
|
bool is_full() const;
|
|
|
|
queue_op_status push( value_type const& va);
|
|
queue_op_status push( value_type && va);
|
|
template< typename Rep, typename Period >
|
|
queue_op_status push_wait_for( value_type const& va,
|
|
chrono::duration< Rep, Period > const& timeout_duration);
|
|
template< typename Rep, typename Period >
|
|
queue_op_status push_wait_for( value_type && va,
|
|
chrono::duration< Rep, Period > const& timeout_duration);
|
|
template< typename TimePointType >
|
|
queue_op_status push_wait_until( value_type const& va,
|
|
TimePointType const& timeout_time);
|
|
template< typename TimePointType >
|
|
queue_op_status push_wait_until( value_type && va,
|
|
TimePointType const& timeout_time);
|
|
queue_op_status try_push( value_type const& va);
|
|
queue_op_status try_push( value_type && va);
|
|
|
|
value_type value_pop();
|
|
queue_op_status pop( value_type & va);
|
|
template< typename Rep, typename Period >
|
|
queue_op_status pop_wait_for( value_type & va,
|
|
chrono::duration< Rep, Period > const& timeout_duration);
|
|
template< typename TimePointType >
|
|
queue_op_status pop_wait_until( value_type & va,
|
|
TimePointType const& timeout_time);
|
|
queue_op_status try_pop( value_type & va);
|
|
};
|
|
|
|
[heading Constructor]
|
|
|
|
bounded_queue( std::size_t wm);
|
|
bounded_queue( std::size_t hwm, std::size_t lwm);
|
|
|
|
[variablelist
|
|
[[Preconditions:] [`hwm >= lwm`]]
|
|
[[Effects:] [Constructs an object of class `bounded_queue`. The constructor
|
|
with two arguments constructs an object of class `bounded_queue` with a
|
|
high-watermark of `hwm` and a low-watermark of `lwm` items. The constructor
|
|
with one argument effectively sets both `hwm` and `lwm` to the same value
|
|
`wm`.]]
|
|
[[Throws:] [`invalid_argument` if `lwm > hwm`.]]
|
|
[[Notes:] [Once the number of values in the queue reaches `hwm`, any call to
|
|
`push()`, `push_wait_for()` or `push_wait_until()` will block until the number
|
|
of values in the queue has dropped below `lwm`. That is, if `lwm < hwm`, the
|
|
queue can be in a state in which `push()`, `push_wait_for()` or `push_wait_until()`
|
|
calls will block (`is_full()` returns `true`) even though the number of values
|
|
in the queue is less than `hwm`.]]
|
|
]
|
|
|
|
[member_heading bounded_queue..upper_bound]
|
|
|
|
std::size_t upper_bound() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [the high-watermark with which `*this` was constructed.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[member_heading bounded_queue..lower_bound]
|
|
|
|
std::size_t lower_bound() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [the low-watermark with which `*this` was constructed.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[member_heading bounded_queue..is_closed]
|
|
|
|
bool is_closed() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [`true` if queue has been closed.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [The queue is not closed by default.]]
|
|
]
|
|
|
|
[member_heading bounded_queue..close]
|
|
|
|
void close();
|
|
|
|
[variablelist
|
|
[[Effects:] [Deactivates the queue. No values can be put after calling
|
|
`this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()` or
|
|
`this->pop_wait_until()` will return `closed`.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [`close()` is like closing a pipe. It informs waiting consumers
|
|
that no more values will arrive.]]
|
|
]
|
|
|
|
[member_heading bounded_queue..is_empty]
|
|
|
|
bool is_empty() const;
|
|
|
|
[variablelist
|
|
[[Effects:] [Returns `true` if the queue currently contains no data.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [This condition is transient. An `is_empty()` queue can become
|
|
non-empty.]]
|
|
]
|
|
|
|
[member_heading bounded_queue..is_full]
|
|
|
|
bool is_full() const;
|
|
|
|
[variablelist
|
|
[[Effects:] [Returns `true` if the queue cannot accept more data at present.
|
|
This happens when the number of values in the queue reaches `wm` or `hwm`.
|
|
Once the queue becomes full, however, it continues refusing new values until
|
|
the number of values drops below `lwm`.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [This condition is transient.]]
|
|
]
|
|
|
|
[member_heading bounded_queue..push]
|
|
|
|
queue_op_status push( value_type const& va);
|
|
queue_op_status push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_closed()`, returns `closed`. If `(!
|
|
this->is_full())`, enqueues the value in the queue, wakes up a fiber blocked
|
|
on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns
|
|
`success`. Otherwise the fiber gets suspended until the number of values in the
|
|
queue drops below `lwm` (return value `success`), or the queue is `close()`d
|
|
(return value `closed`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_queue..push_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
queue_op_status push_wait_for( value_type const& va,
|
|
chrono::duration< Rep, Period > const& timeout_duration);
|
|
template< typename Rep, typename Period >
|
|
queue_op_status push_wait_for( value_type && va,
|
|
chrono::duration< Rep, Period > const& timeout_duration);
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `chrono::duration` and internally computes a time_point
|
|
as `(now() + timeout_duration)`. If `this->is_closed()`, returns `closed`. If
|
|
`(! this->is_full())`, enqueues the value in the queue, wakes up a fiber
|
|
blocked on `this->pop()` `this->pop_wait_for()` or `this->pop_wait_until()`
|
|
and returns `success`. Otherwise the calling fiber is suspended until the
|
|
number of values in the queue drops below `lwm` (return value `success`), the
|
|
queue is `close()`d (return value `closed`), or the time as reported by
|
|
`now()` reaches the computed time_point (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_queue..push_wait_until]
|
|
|
|
template< typename TimePointType >
|
|
queue_op_status push_wait_until( value_type const& va,
|
|
TimePointType const& timeout_time);
|
|
template< typename TimePointType >
|
|
queue_op_status push_wait_until( value_type && va,
|
|
TimePointType const& timeout_time);
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts an absolute `timeout_time` in any supported time_point
|
|
type. If `this->is_closed()`, returns `closed`. If `(! this->is_full())`,
|
|
enqueues the value in the queue, wakes up a fiber blocked on `this->pop()`,
|
|
`this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`.
|
|
Otherwise the calling fiber is suspended until the number of values in the
|
|
queue drops below `lwm` (return value `success`), the queue is `close()`d
|
|
(return value `closed`), or the time as reported by `now()` reaches the passed
|
|
time_point (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_queue..try_push]
|
|
|
|
queue_op_status try_push( value_type const& va);
|
|
queue_op_status try_push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_full()`, returns `full`. If `this->is_closed()`,
|
|
returns `closed`. Otherwise enqueues the value in the queue, wakes up a fiber
|
|
blocked on `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and
|
|
returns `success`.]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_queue..value_pop]
|
|
|
|
value_type value_pop();
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the queue. If the queue `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed or the queue
|
|
gets `close()`d (which throws an exception). Once the number of items
|
|
remaining in the queue drops below `lwm`, any fibers blocked on `push()`,
|
|
`push_wait_for()` or `push_wait_until()` may resume.]]
|
|
[[Throws:] [`logic_error` if `*this` is closed; __fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_queue..pop]
|
|
|
|
queue_op_status pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the queue. If the queue `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed (return value
|
|
`success` and `va` contains dequeued value) or the queue gets `close()`d
|
|
(return value `closed`). Once the number of items remaining in the queue
|
|
drops below `lwm`, any fibers blocked on `push()`, `push_wait_for()` or
|
|
`push_wait_until()` may resume.]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_queue..pop_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
queue_op_status pop_wait_for( value_type & va,
|
|
chrono::duration< Rep, Period > const& timeout_duration)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `chrono::duration` and internally computes a time_point
|
|
as `(now() + timeout_duration)`. If `(! this->is_empty())`, immediately
|
|
dequeues a value from the queue. Otherwise the calling fiber gets suspended
|
|
until at least one new item is `push()`ed (return value `success` and `va`
|
|
contains dequeued value), or the queue gets `close()`d (return value
|
|
`closed`), or the time as reported by `now()` reaches the computed time_point
|
|
(return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_queue..pop_wait_until]
|
|
|
|
template< typename TimePointType >
|
|
queue_op_status pop_wait_until( value_type & va,
|
|
TimePointType const& timeout_time);
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts an absolute `timeout_time` in any supported time_point
|
|
type. If `(! this->is_empty())`, immediately dequeues a value from the queue.
|
|
Otherwise the calling fiber gets suspended until at least one new item is
|
|
`push()`ed (return value `success` and `va` contains dequeued value), or the
|
|
queue gets `close()`d (return value `closed`), or the time as reported by
|
|
`now()` reaches the passed time_point (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_queue..try_pop]
|
|
|
|
queue_op_status try_pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`,
|
|
returns `closed`. Otherwise it returns `success` and `va` contains the
|
|
dequeued value.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[endsect]
|