mirror of
https://github.com/boostorg/fiber.git
synced 2026-02-13 12:22:36 +00:00
Well, almost the same: bounded_channel pop operations include the note about unblocking blocked push operations. Parameterize xchannel_pop (et al.) to allow this divergence between unbounded_channel and bounded_channel pop methods.
433 lines
14 KiB
Plaintext
433 lines
14 KiB
Plaintext
[/
|
|
Copyright Oliver Kowalke 2013.
|
|
Distributed under the Boost Software License, Version 1.0.
|
|
(See accompanying file LICENSE_1_0.txt or copy at
|
|
http://www.boost.org/LICENSE_1_0.txt
|
|
]
|
|
|
|
[section:channels Channels]
|
|
|
|
__boost_fiber__ provides a bounded and a unbounded channel suitable to
|
|
synchonize fibers via message passing.
|
|
|
|
typedef boost::fibers::unbounded_channel< int > channel_t;
|
|
|
|
void send( channel_t & channel) {
|
|
for ( int i = 0; i < 5; ++i) {
|
|
channel.push( i);
|
|
}
|
|
channel.close();
|
|
}
|
|
|
|
void recv( channel_t & channel) {
|
|
int i;
|
|
while ( boost::fibers::channel_op_status::success == channel.pop(i) ) {
|
|
std::cout << "received " << i << std::endl;
|
|
}
|
|
}
|
|
|
|
channel_t channel;
|
|
boost::fibers::fiber f1( std::bind( send, ref( channel) ) );
|
|
boost::fibers::fiber f2( std::bind( recv, ref( channel) ) );
|
|
|
|
f1.join();
|
|
f2.join();
|
|
|
|
[#class_channel_op_status]
|
|
[heading Enumeration `channel_op_status`]
|
|
|
|
channel operations return the state of the channel.
|
|
|
|
enum class channel_op_status {
|
|
success,
|
|
empty,
|
|
full,
|
|
closed,
|
|
timeout
|
|
};
|
|
|
|
[heading `success`]
|
|
[variablelist
|
|
[[Effects:] [Operation was successful.]]
|
|
]
|
|
|
|
[heading `empty`]
|
|
[variablelist
|
|
[[Effects:] [channel is empty, operation failed.]]
|
|
]
|
|
|
|
[heading `full`]
|
|
[variablelist
|
|
[[Effects:] [channel is full, operation failed.]]
|
|
]
|
|
|
|
[heading `closed`]
|
|
[variablelist
|
|
[[Effects:] [channel is closed, operation failed.]]
|
|
]
|
|
|
|
[heading `timeout`]
|
|
[variablelist
|
|
[[Effects:] [The operation did not become ready before specified timeout elapsed.]]
|
|
]
|
|
|
|
[template_heading unbounded_channel]
|
|
|
|
#include <boost/fiber/unbounded_channel.hpp>
|
|
|
|
template< typename T >
|
|
class unbounded_channel {
|
|
public:
|
|
typedef T value_type;
|
|
|
|
unbounded_channel( unbounded_channel const& other) = delete;
|
|
unbounded_channel & operator=( unbounded_channel const& other) = delete;
|
|
|
|
bool is_closed() const;
|
|
void close();
|
|
|
|
bool is_empty();
|
|
|
|
channel_op_status push( value_type && va);
|
|
|
|
channel_op_status pop( value_type & va);
|
|
value_type value_pop();
|
|
channel_op_status try_pop( value_type & va);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for( value_type & va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until( value_type & va,
|
|
std::chrono::time_point< Clock,
|
|
Duration > const& timeout_time);
|
|
};
|
|
|
|
[template xchannel_is_closed[cls]
|
|
[member_heading [cls]..is_closed]
|
|
|
|
bool is_closed() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [`true` if channel has been closed.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [The channel is not closed by default.]]
|
|
]
|
|
]
|
|
[xchannel_is_closed unbounded_channel]
|
|
|
|
[template xchannel_close[cls]
|
|
[member_heading [cls]..close]
|
|
|
|
void close();
|
|
|
|
[variablelist
|
|
[[Effects:] [Deactivates the channel. No values can be put after calling
|
|
`this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()`
|
|
or `this->pop_wait_until()` will return `closed`. Fibers blocked in
|
|
`this->value_pop()` will receive an exception.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [`close()` is like closing a pipe. It informs waiting consumers
|
|
that no more values will arrive.]]
|
|
]
|
|
]
|
|
[xchannel_close unbounded_channel]
|
|
|
|
[template xchannel_is_empty[cls]
|
|
[member_heading [cls]..is_empty]
|
|
|
|
bool is_empty();
|
|
|
|
[variablelist
|
|
[[Effects:] [Returns `true` if the channel currently contains no data.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [This condition is transient. An `is_empty()` channel can become
|
|
non-empty.]]
|
|
]
|
|
]
|
|
[xchannel_is_empty unbounded_channel]
|
|
|
|
[template xchannel_push_effects[enqueues] If `this->is_closed()`, returns
|
|
`closed`. [enqueues] the value in the channel, wakes up a fiber blocked on
|
|
`this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
|
|
`this->pop_wait_until()` and returns `success`.]
|
|
|
|
[member_heading unbounded_channel..push]
|
|
|
|
channel_op_status push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [[xchannel_push_effects Otherwise enqueues]]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[template xchannel_pop[cls unblocking]
|
|
[member_heading [cls]..pop]
|
|
|
|
channel_op_status pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the channel. If the channel `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed (return value
|
|
`success` and `va` contains dequeued value) or the channel gets `close()`d
|
|
(return value `closed`)[unblocking]]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
]
|
|
[xchannel_pop unbounded_channel .]
|
|
|
|
[template xchannel_value_pop[cls unblocking]
|
|
[member_heading [cls]..value_pop]
|
|
|
|
value_type value_pop();
|
|
|
|
[variablelist
|
|
[[Effects:] [Dequeues a value from the channel. If the channel `is_empty()`, the
|
|
fiber gets suspended until at least one new item is `push()`ed or the channel
|
|
gets `close()`d (which throws an exception)[unblocking]]]
|
|
[[Throws:] [`logic_error` if `*this` is closed; __fiber_interrupted__]]
|
|
]
|
|
]
|
|
[xchannel_value_pop unbounded_channel .]
|
|
|
|
[template xchannel_try_pop[cls unblocking]
|
|
[member_heading [cls]..try_pop]
|
|
|
|
channel_op_status try_pop( value_type & va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_empty()`, returns `empty`. If `this->is_closed()`,
|
|
returns `closed`. Otherwise it returns `success` and `va` contains the
|
|
dequeued value[unblocking]]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
]
|
|
[xchannel_try_pop unbounded_channel .]
|
|
|
|
[template xchannel_pop_wait_until_effects[endtime unblocking] If `(!
|
|
this->is_empty())`, immediately dequeues a value from the channel. Otherwise
|
|
the fiber gets suspended until at least one new item is `push()`ed (return
|
|
value `success` and `va` contains dequeued value), or the channel gets
|
|
`close()`d (return value `closed`), or the system time reaches [endtime]
|
|
(return value `timeout`)[unblocking]]
|
|
|
|
[template xchannel_pop_wait_for[cls unblocking]
|
|
[member_heading [cls]..pop_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for( value_type & va,
|
|
std::chrono::duration< Rep, Period > const& timeout_duration)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout
|
|
time as (system time + `timeout_duration`).
|
|
[xchannel_pop_wait_until_effects the computed timeout time..[unblocking]]]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
]
|
|
[xchannel_pop_wait_for unbounded_channel .]
|
|
|
|
[template xchannel_pop_wait_until[cls unblocking]
|
|
[member_heading [cls]..pop_wait_until]
|
|
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until( value_type & va,
|
|
std::chrono::time_point< Clock, Duration > const& timeout_time)
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`.
|
|
[xchannel_pop_wait_until_effects the passed `time_point`..[unblocking]]]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
]
|
|
[xchannel_pop_wait_until unbounded_channel .]
|
|
|
|
|
|
[template_heading bounded_channel]
|
|
|
|
#include <boost/fiber/bounded_channel.hpp>
|
|
|
|
template< typename T >
|
|
class bounded_channel {
|
|
public:
|
|
typedef T value_type;
|
|
|
|
bounded_channel( bounded_channel const& other) = delete;
|
|
bounded_channel & operator=( bounded_channel const& other) = delete;
|
|
|
|
bounded_channel( std::size_t wm);
|
|
bounded_channel( std::size_t hwm, std::size_t lwm);
|
|
|
|
std::size_t upper_bound() const;
|
|
std::size_t lower_bound() const;
|
|
|
|
bool is_closed() const;
|
|
void close();
|
|
|
|
bool is_empty() const;
|
|
bool is_full() const;
|
|
|
|
channel_op_status push( value_type const& va);
|
|
channel_op_status push( value_type && va);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for( value_type const& va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for( value_type && va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until( value_type const& va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until( value_type && va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
channel_op_status try_push( value_type const& va);
|
|
channel_op_status try_push( value_type && va);
|
|
|
|
value_type value_pop();
|
|
channel_op_status pop( value_type & va);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status pop_wait_for( value_type & va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status pop_wait_until( value_type & va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
channel_op_status try_pop( value_type & va);
|
|
};
|
|
|
|
[heading Constructor]
|
|
|
|
bounded_channel( std::size_t wm);
|
|
bounded_channel( std::size_t hwm, std::size_t lwm);
|
|
|
|
[variablelist
|
|
[[Preconditions:] [`hwm >= lwm`]]
|
|
[[Effects:] [Constructs an object of class `bounded_channel`. The constructor
|
|
with two arguments constructs an object of class `bounded_channel` with a
|
|
high-watermark of `hwm` and a low-watermark of `lwm` items. The constructor
|
|
with one argument effectively sets both `hwm` and `lwm` to the same value
|
|
`wm`.]]
|
|
[[Throws:] [`invalid_argument` if `lwm > hwm`.]]
|
|
[[Notes:] [Once the number of values in the channel reaches `hwm`, any call to
|
|
`push()`, `push_wait_for()` or `push_wait_until()` will block until the number
|
|
of values in the channel has dropped below `lwm`. That is, if `lwm < hwm`, the
|
|
channel can be in a state in which `push()`, `push_wait_for()` or `push_wait_until()`
|
|
calls will block (`is_full()` returns `true`) even though the number of values
|
|
in the channel is less than `hwm`.]]
|
|
]
|
|
|
|
[member_heading bounded_channel..upper_bound]
|
|
|
|
std::size_t upper_bound() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [the high-watermark with which `*this` was constructed.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[member_heading bounded_channel..lower_bound]
|
|
|
|
std::size_t lower_bound() const;
|
|
|
|
[variablelist
|
|
[[Returns:] [the low-watermark with which `*this` was constructed.]]
|
|
[[Throws:] [Nothing.]]
|
|
]
|
|
|
|
[xchannel_is_closed bounded_channel]
|
|
[xchannel_close bounded_channel]
|
|
[xchannel_is_empty bounded_channel]
|
|
|
|
[member_heading bounded_channel..is_full]
|
|
|
|
bool is_full() const;
|
|
|
|
[variablelist
|
|
[[Effects:] [Returns `true` if the channel cannot accept more data at present.
|
|
This happens when the number of values in the channel reaches `wm` or `hwm`.
|
|
Once the channel becomes full, however, it continues refusing new values until
|
|
the number of values drops below `lwm`.]]
|
|
[[Throws:] [Nothing.]]
|
|
[[Note:] [This condition is transient.]]
|
|
]
|
|
|
|
[template bounded_channel_push_effects[or] [xchannel_push_effects If `(!
|
|
this->is_full())`, enqueues] Otherwise the calling fiber is suspended until
|
|
the number of values in the channel drops below `lwm` (return value
|
|
`success`)[or] the channel is `close()`d (return value `closed`)]
|
|
|
|
[member_heading bounded_channel..push]
|
|
|
|
channel_op_status push( value_type const& va);
|
|
channel_op_status push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [[bounded_channel_push_effects or].]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..push_wait_for]
|
|
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for( value_type const& va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
template< typename Rep, typename Period >
|
|
channel_op_status push_wait_for( value_type && va,
|
|
std::chrono::duration< Rep, Period > const&
|
|
timeout_duration);
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts `std::chrono::duration` and internally computes a
|
|
time_point as (system time + `timeout_duration`).
|
|
[bounded_channel_push_effects ,], or the system time reaches the computed
|
|
time_point (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..push_wait_until]
|
|
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until( value_type const& va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
template< typename Clock, typename Duration >
|
|
channel_op_status push_wait_until( value_type && va,
|
|
std::chrono::time_point< Clock, Duration > const&
|
|
timeout_time);
|
|
|
|
[variablelist
|
|
[[Effects:] [Accepts an absolute `timeout_time` in any supported time_point
|
|
type. [bounded_channel_push_effects ,], or the system time reaches the passed
|
|
time_point (return value `timeout`).]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[member_heading bounded_channel..try_push]
|
|
|
|
channel_op_status try_push( value_type const& va);
|
|
channel_op_status try_push( value_type && va);
|
|
|
|
[variablelist
|
|
[[Effects:] [If `this->is_full()`, returns `full`.
|
|
[xchannel_push_effects Otherwise enqueues]]]
|
|
[[Throws:] [__fiber_interrupted__]]
|
|
]
|
|
|
|
[template bounded_pop_unblocking[] Once the number of items remaining in the
|
|
channel drops below `lwm`, any fibers blocked on `push()`, `push_wait_for()`
|
|
or `push_wait_until()` may resume.]
|
|
|
|
[xchannel_pop bounded_channel... [bounded_pop_unblocking]]
|
|
[xchannel_value_pop bounded_channel... [bounded_pop_unblocking]]
|
|
[xchannel_try_pop bounded_channel... [bounded_pop_unblocking]]
|
|
[xchannel_pop_wait_for bounded_channel... [bounded_pop_unblocking]]
|
|
[xchannel_pop_wait_until bounded_channel... [bounded_pop_unblocking]]
|
|
|
|
[endsect]
|