mirror of
https://github.com/boostorg/fiber.git
synced 2026-02-13 12:22:36 +00:00
The descriptions of [un]bounded_channel::push() and the other push variants are very similar, and must be kept consistent. Similarly, all the pop variants must be kept consistent. Use QuickBook templates to supply much of the wording for these methods.
494 lines
17 KiB
Plaintext
494 lines
17 KiB
Plaintext
[/
|
|
Copyright Oliver Kowalke 2013.
|
|
Distributed under the Boost Software License, Version 1.0.
|
|
(See accompanying file LICENSE_1_0.txt or copy at
|
|
http://www.boost.org/LICENSE_1_0.txt
|
|
]
|
|
|
|
[section:channels Channels]
|
|
|
|
__boost_fiber__ provides a bounded and a unbounded channel suitable to
|
|
synchonize fibers via message passing.
|
|
|
|
typedef boost::fibers::unbounded_channel< int > channel_t;
|
|
|
|
void send( channel_t & channel) {
|
|
for ( int i = 0; i < 5; ++i) {
|
|
channel.push( i);
|
|
}
|
|
channel.close();
|
|
}
|
|
|
|
void recv( channel_t & channel) {
|
|
int i;
|
|
while ( boost::fibers::channel_op_status::success == channel.pop(i) ) {
|
|
std::cout << "received " << i << std::endl;
|
|
}
|
|
}
|
|
|
|
channel_t channel;
|
|
boost::fibers::fiber f1( std::bind( send, ref( channel) ) );
|
|
boost::fibers::fiber f2( std::bind( recv, ref( channel) ) );
|
|
|
|
f1.join();
|
|
f2.join();
|
|
|
|
[#class_channel_op_status]
|
|
[heading Enumeration `channel_op_status`]
|
|
|
|
channel operations return the state of the channel.
|
|
|
|
enum class channel_op_status {
|
|
success,
|
|
empty,
|
|
full,
|
|
closed,
|
|
timeout
|
|
};
|
|
|
|
[heading `success`]
|
|
[variablelist
|
|
[[Effects:] [Operation was successful.]]
|
|
]
|
|
|
|
[heading `empty`]
|
|
[variablelist
|
|
[[Effects:] [channel is empty, operation failed.]]
|
|
]
|
|
|
|
[heading `full`]
|
|
[variablelist
|
|
[[Effects:] [channel is full, operation failed.]]
|
|
]
|
|
|
|
[heading `closed`]
|
|
[variablelist
|
|
[[Effects:] [channel is closed, operation failed.]]
|
|
]
|
|
|
|
[heading `timeout`]
|
|
[variablelist
|
|
[[Effects:] [The operation did not become ready before specified timeout elapsed.]]
|
|
]
|
|
|
|
[template_heading unbounded_channel]
|
|
|
|
#include <boost/fiber/unbounded_channel.hpp>
|
|
|
|
template< typename T >
|
|
class unbounded_channel {
|
|
public:
|
|
typedef T value_type;
|
|
|
|
unbounded_channel( unbounded_channel const& other) = delete;
|
|
unbounded_channel & operator=( unbounded_channel const& other) = delete;
|
|
|
|
bool is_closed() const;
|
|
void close();
|
|
|
|
bool is_empty();
|
|
|
|
channel_op_status push( value_type && va);
|
|
|
|
channel_op_status pop( value_type & va);
|
|
value_type value_pop();
|
|
channel_op_status try_pop( value_type & va);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for( value_type & va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until( value_type & va,
|
|
std::chrono::time_point< Clock,
|
|
Duration > const& timeout_time);
|
|
};
|
|
|
|
[template xchannel_is_closed[cls]
|
|
[member_heading [cls]..is_closed]
|
|
|
|
bool is_closed() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [`true` if channel has been closed.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [The channel is not closed by default.]]
|
|
]
|
|
]
|
|
[xchannel_is_closed unbounded_channel]
|
|
|
|
[template xchannel_close[cls]
|
|
[member_heading [cls]..close]
|
|
|
|
void close();
|
|
|
|
[variablelist
|
|
[[Effects:] [Deactivates the channel. No values can be put after calling
|
|
`this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()`
|
|
or `this->pop_wait_until()` will return `closed`. Fibers blocked in
|
|
`this->value_pop()` will receive an exception.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [`close()` is like closing a pipe. It informs waiting consumers
|
|
that no more values will arrive.]]
|
|
]
|
|
]
|
|
[xchannel_close unbounded_channel]
|
|
|
|
[template xchannel_is_empty[cls]
|
|
[member_heading [cls]..is_empty]
|
|
|
|
bool is_empty();
|
|
|
|
[variablelist
|
|
[[Effects:] [Returns `true` if the channel currently contains no data.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [This condition is transient. An `is_empty()` channel can become
|
|
non-empty.]]
|
|
]
|
|
]
|
|
[xchannel_is_empty unbounded_channel]
|
|
|
|
[template xchannel_push_effects[enqueues] If `this->is_closed()`, returns
|
|
`closed`. [enqueues] the value in the channel, wakes up a fiber blocked on
|
|
`this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
|
|
`this->pop_wait_until()` and returns `success`.]
|
|
|
|
[member_heading unbounded_channel..push]
|
|
|
|
channel_op_status push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [[xchannel_push_effects Otherwise enqueues]]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[template xchannel_pop[cls]
|
|
[member_heading [cls]..pop]
|
|
|
|
channel_op_status pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the channel. If the channel `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed (return value
|
|
`success` and `va` contains dequeued value) or the channel gets `close()`d
|
|
(return value `closed`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
]
|
|
[xchannel_pop unbounded_channel]
|
|
|
|
[template xchannel_value_pop[cls]
|
|
[member_heading [cls]..value_pop]
|
|
|
|
value_type value_pop();
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the channel. If the channel `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed or the channel
|
|
gets `close()`d (which throws an exception).]]
|
|
[[Throws:] [`logic_error` if `*this` is closed; __fiber_interrupted__]]
|
|
]
|
|
]
|
|
[xchannel_value_pop unbounded_channel]
|
|
|
|
[template xchannel_try_pop[cls]
|
|
[member_heading [cls]..try_pop]
|
|
|
|
channel_op_status try_pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`,
|
|
returns `closed`. Otherwise it returns `success` and `va` contains the
|
|
dequeued value.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
]
|
|
[xchannel_try_pop unbounded_channel]
|
|
|
|
[template xchannel_pop_wait_until_effects[endtime] If `(! this->is_empty())`,
|
|
immediately dequeues a value from the channel. Otherwise the fiber gets
|
|
suspended until at least one new item is `push()`ed (return value `success`
|
|
and `va` contains dequeued value), or the channel gets `close()`d (return
|
|
value `closed`), or the system time reaches [endtime] (return value
|
|
`timeout`).]
|
|
|
|
[template xchannel_pop_wait_for[cls]
|
|
[member_heading [cls]..pop_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for( value_type & va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout
|
|
time as `(`system time + `timeout_duration)`.
|
|
[xchannel_pop_wait_until_effects the computed timeout time]]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
]
|
|
[xchannel_pop_wait_for unbounded_channel]
|
|
|
|
[template xchannel_pop_wait_until[cls]
|
|
[member_heading [cls]..pop_wait_until]
|
|
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until( value_type & va,
|
|
std::chrono::time_point< Clock, Duration > const& timeout_time)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`.
|
|
[xchannel_pop_wait_until_effects the passed `time_point`]]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
]
|
|
[xchannel_pop_wait_until unbounded_channel]
|
|
|
|
|
|
[template_heading bounded_channel]
|
|
|
|
#include <boost/fiber/bounded_channel.hpp>
|
|
|
|
template< typename T >
|
|
class bounded_channel {
|
|
public:
|
|
typedef T value_type;
|
|
|
|
bounded_channel( bounded_channel const& other) = delete;
|
|
bounded_channel & operator=( bounded_channel const& other) = delete;
|
|
|
|
bounded_channel( std::size_t wm);
|
|
bounded_channel( std::size_t hwm, std::size_t lwm);
|
|
|
|
std::size_t upper_bound() const;
|
|
std::size_t lower_bound() const;
|
|
|
|
bool is_closed() const;
|
|
void close();
|
|
|
|
bool is_empty() const;
|
|
bool is_full() const;
|
|
|
|
channel_op_status push( value_type const& va);
|
|
channel_op_status push( value_type && va);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for( value_type const& va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for( value_type && va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until( value_type const& va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until( value_type && va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
channel_op_status try_push( value_type const& va);
|
|
channel_op_status try_push( value_type && va);
|
|
|
|
value_type value_pop();
|
|
channel_op_status pop( value_type & va);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for( value_type & va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until( value_type & va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
channel_op_status try_pop( value_type & va);
|
|
};
|
|
|
|
[heading Constructor]
|
|
|
|
bounded_channel( std::size_t wm);
|
|
bounded_channel( std::size_t hwm, std::size_t lwm);
|
|
|
|
[variablelist
|
|
[[Preconditions:] [`hwm >= lwm`]]
|
|
[[Effects:] [Constructs an object of class `bounded_channel`. The constructor
|
|
with two arguments constructs an object of class `bounded_channel` with a
|
|
high-watermark of `hwm` and a low-watermark of `lwm` items. The constructor
|
|
with one argument effectively sets both `hwm` and `lwm` to the same value
|
|
`wm`.]]
|
|
[[Throws:] [`invalid_argument` if `lwm > hwm`.]]
|
|
[[Notes:] [Once the number of values in the channel reaches `hwm`, any call to
|
|
`push()`, `push_wait_for()` or `push_wait_until()` will block until the number
|
|
of values in the channel has dropped below `lwm`. That is, if `lwm < hwm`, the
|
|
channel can be in a state in which `push()`, `push_wait_for()` or `push_wait_until()`
|
|
calls will block (`is_full()` returns `true`) even though the number of values
|
|
in the channel is less than `hwm`.]]
|
|
]
|
|
|
|
[member_heading bounded_channel..upper_bound]
|
|
|
|
std::size_t upper_bound() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [the high-watermark with which `*this` was constructed.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[member_heading bounded_channel..lower_bound]
|
|
|
|
std::size_t lower_bound() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [the low-watermark with which `*this` was constructed.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[xchannel_is_closed bounded_channel]
|
|
[xchannel_close bounded_channel]
|
|
[xchannel_is_empty bounded_channel]
|
|
|
|
[member_heading bounded_channel..is_full]
|
|
|
|
bool is_full() const;
|
|
|
|
[variablelist
|
|
[[Effects:] [Returns `true` if the channel cannot accept more data at present.
|
|
This happens when the number of values in the channel reaches `wm` or `hwm`.
|
|
Once the channel becomes full, however, it continues refusing new values until
|
|
the number of values drops below `lwm`.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [This condition is transient.]]
|
|
]
|
|
|
|
[template bounded_channel_push_effects[or] [xchannel_push_effects If `(!
|
|
this->is_full())`, enqueues] Otherwise the fiber gets suspended until the
|
|
number of values in the channel drops below `lwm` (return value
|
|
`success`)[or] the channel is `close()`d (return value `closed`)]
|
|
|
|
[member_heading bounded_channel..push]
|
|
|
|
channel_op_status push( value_type const& va);
|
|
channel_op_status push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [[bounded_channel_push_effects or].]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..push_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for( value_type const& va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for( value_type && va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `std::chrono::duration` and internally computes a
|
|
time_point as `(`system time + `timeout_duration)`.
|
|
[bounded_channel_push_effects ,], or the system time reaches the computed
|
|
time_point (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..push_wait_until]
|
|
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until( value_type const& va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until( value_type && va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts an absolute `timeout_time` in any supported time_point
|
|
type. [bounded_channel_push_effects ,], or the system time reaches the passed
|
|
time_point (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..try_push]
|
|
|
|
channel_op_status try_push( value_type const& va);
|
|
channel_op_status try_push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_full()`, returns `full`.
|
|
[xchannel_push_effects Otherwise enqueues]]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..pop]
|
|
|
|
channel_op_status pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the channel. If the channel `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed (return value
|
|
`success` and `va` contains dequeued value) or the channel gets `close()`d
|
|
(return value `closed`). Once the number of items remaining in the channel
|
|
drops below `lwm`, any fibers blocked on `push()`, `push_wait_for()` or
|
|
`push_wait_until()` may resume.]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..value_pop]
|
|
|
|
value_type value_pop();
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the channel. If the channel `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed or the channel
|
|
gets `close()`d (which throws an exception). Once the number of items
|
|
remaining in the channel drops below `lwm`, any fibers blocked on `push()`,
|
|
`push_wait_for()` or `push_wait_until()` may resume.]]
|
|
[[Throws:] [`logic_error` if `*this` is closed; __fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..try_pop]
|
|
|
|
channel_op_status try_pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`,
|
|
returns `closed`. Otherwise it returns `success` and `va` contains the
|
|
dequeued value.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[member_heading bounded_channel..pop_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for( value_type & va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `std::chrono::duration` and internally computes a time_point
|
|
as `(now() + timeout_duration)`. If `(! this->is_empty())`, immediately
|
|
dequeues a value from the channel. Otherwise the calling fiber gets suspended
|
|
until at least one new item is `push()`ed (return value `success` and `va`
|
|
contains dequeued value), or the channel gets `close()`d (return value
|
|
`closed`), or the time as reported by `now()` reaches the computed time_point
|
|
(return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..pop_wait_until]
|
|
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until( value_type & va,
|
|
std::chrono::time_point< Clock, Duration > const& timeout_time);
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts an absolute `timeout_time` in any supported time_point
|
|
type. If `(! this->is_empty())`, immediately dequeues a value from the channel.
|
|
Otherwise the calling fiber gets suspended until at least one new item is
|
|
`push()`ed (return value `success` and `va` contains dequeued value), or the
|
|
channel gets `close()`d (return value `closed`), or the time as reported by
|
|
`now()` reaches the passed time_point (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[endsect]
|