mirror of
https://github.com/boostorg/thread.git
synced 2026-02-21 03:22:10 +00:00
415 lines
12 KiB
Plaintext
415 lines
12 KiB
Plaintext
[section:synchronized_queues Synchronized Queues -- EXPERIMENTAL]
|
|
|
|
[warning These features are experimental and subject to change in future versions. There are not too much tests yet, so it is possible that you can find out some trivial bugs :(]
|
|
|
|
[note These features are based on the [@http://www.open-std.org/JTC1/SC22/WG21/docs/papers/2013/n3533.html [*N3533 - C++ Concurrent Queues]] C++1y proposal from Lawrence Crowl and Chris Mysen and [@http://www.manning.com/williams/ [*C++ Concurrency in Action]] from Anthony Williams.]
|
|
|
|
|
|
[section:tutorial Tutorial]
|
|
|
|
Concurrent queues are a well know mechanism for communicating data between different threads.
|
|
|
|
Concurrent queues have inherently copy/move semantics for the data handling operation. Reference-returning interfaces are forbidden as multiple access to these references can not be thread-safe.
|
|
|
|
[endsect]
|
|
|
|
[section:ref Reference]
|
|
|
|
[section:sync_queue_req Synchronized Queue Model]
|
|
|
|
The BasicConcurrentQueue concept models .
|
|
A type `Q` meets the BasicConcurrentQueue requirements if the following expressions are well-formed and have the specified semantics
|
|
|
|
* `q` denotes a value of type `Q`,
|
|
* `e` denotes a value of type Q::value_type,
|
|
* `u` denotes a value of type Q::size_type,
|
|
* `lve` denotes a lvalue referece of type Q::value_type,
|
|
* `rve` denotes a rvalue referece of type Q::value_type:
|
|
* `spe` denotes a shared_ptr<Q::value_type>
|
|
|
|
Basic expressions
|
|
|
|
* Q::value_type
|
|
* Q::size_type
|
|
* `q.push(e);`
|
|
* `q.push(rve);`
|
|
* `q.pull(lre);`
|
|
* `lre = q.pull();`
|
|
* `spe = q.ptr_pull();`
|
|
* `b = q.empty();`
|
|
* `u = q.size();`
|
|
|
|
Non-waiting operations
|
|
|
|
* `b = q.try_push(e);`
|
|
* `b = q.try_push(rve);`
|
|
* `b = q.try_pull(lre);`
|
|
|
|
Non-blocking operations are provided by BlockingQueues
|
|
|
|
* `b = q.try_push(nb, e);`
|
|
* `b = q.try_push(nb, rve);`
|
|
* `b = q.try_pull(nb, lre);`
|
|
|
|
Bounded queues add the following valid expressions
|
|
|
|
* `Q q(u);`
|
|
* `b = q.full();`
|
|
* `u = q.capacity();`
|
|
|
|
Closed queues
|
|
|
|
Threads using a queue for communication need some mechanism to signal when the queue is no longer needed. The usual approach is add an additional out-of-band signal. However, this approach suffers from the flaw that threads waiting on either full or empty queues need to be woken up when the queue is no longer needed. Rather than require an out-of-band signal, we chose to directly support such a signal in the queue itself, which considerably simplifies coding.
|
|
|
|
To achieve this signal, a thread may close a queue. Once closed, no new elements may be pushed onto the queue. Push operations on a closed queue will either return queue_op_status::closed (when they have a queue_op_status return type), set the closed parameter if it has one or throw sync_queue::closed (when they do not). Elements already on the queue may be popped off. When a queue is empty and closed, pop operations will either return queue_op_status::closed (when they have a status return) or throw queue_op_status::closed (when they do not).
|
|
|
|
* `q.close();`
|
|
* `b = q.closed();`
|
|
|
|
Basic expressions
|
|
|
|
* Q::value_type
|
|
* `q.push(e,c);`
|
|
* `q.push(rve,c);`
|
|
* `q.pull(lre,c);`
|
|
* `spe = q.ptr_pull(c);`
|
|
|
|
Non-waiting operations
|
|
|
|
* `b = q.try_push(e, c);`
|
|
* `b = q.try_push(rve, c);`
|
|
* `b = q.try_pull(lre, c);`
|
|
|
|
Non-blocking operations are provided by BlockingQueues
|
|
|
|
* `b = q.try_push(nb, e, c);`
|
|
* `b = q.try_push(nb, rve, c);`
|
|
* `b = q.try_pull(nb, lre, c);`
|
|
|
|
|
|
[section:exception Blocking Queues Throw specification]
|
|
|
|
All the functions that can throw are defined as if we had in addition to its specific Throw specification the following:
|
|
|
|
[variablelist
|
|
[[Throws:] [Any exception thrown by the internal locking.]]
|
|
]
|
|
|
|
[endsect]
|
|
|
|
|
|
[section:exception2 Closed Queues Throw specification]
|
|
|
|
All the modifier functions that have no a specific closed parameter can throw are defined as if we had in addition to its specific Throw specification the following:
|
|
|
|
[variablelist
|
|
[[Throws:] [Any exception thrown by the internal locking.]]
|
|
]
|
|
|
|
[endsect]
|
|
|
|
|
|
[section:push `q.push(e);`]
|
|
|
|
[variablelist
|
|
|
|
[[Effects:] [Waits until the queue is not full (for bounded queues) and then push the `e` onto the queue copying it.]]
|
|
|
|
[[Synchronization:] [Prior pull-like operations on the same object synchronizes with this operation.]]
|
|
|
|
[[Postcondition:] [`! q.empty()`.]]
|
|
|
|
[[Return type:] [`void`.]]
|
|
|
|
[[Throws:] [If the queue was closed, throws sync_queue_is_closed. Any exception thrown by the the copy of `e`.]]
|
|
|
|
[[Exception safety:] [If an exception is thrown then the queue state is unmodified.]]
|
|
|
|
]
|
|
|
|
[endsect]
|
|
[section:push_m `q.push(rve);`]
|
|
|
|
[variablelist
|
|
|
|
[[Effects:] [Waits until the queue is not full (for bounded queues) and then push the `e` onto the queue moving it.]]
|
|
|
|
[[Synchronization:] [Prior pull-like operations on the same object synchronizes with this operation.]]
|
|
|
|
[[Postcondition:] [`! q.empty()`.]]
|
|
|
|
[[Return type:] [`void`.]]
|
|
|
|
[[Throws:] [If the queue is closed, throws sync_queue_is_closed. Any exception thrown by the the copy of `e`.]]
|
|
|
|
[[Exception safety:] [If an exception is thrown then the queue state is unmodified.]]
|
|
|
|
]
|
|
|
|
[endsect]
|
|
|
|
[section:try_push `q.try_push(e);`]
|
|
|
|
[variablelist
|
|
|
|
[[Effects:] [If the queue `q` is not full, push the `e` onto the queue copying it.]]
|
|
|
|
[[Synchronization:] [Prior pull-like operations on the same object synchronizes with this operation when the operation succeeds. ]]
|
|
|
|
[[Return type:] [`bool`.]]
|
|
|
|
[[Return:] [If the queue `q` is full return `false`, otherwise return `true`;]]
|
|
|
|
[[Postcondition:] [If the call returns `true`, `! q.empty()`.]]
|
|
|
|
[[Throws:] [If the queue is closed, throws sync_queue_is_closed. Any exception thrown by the the copy of `e`.]]
|
|
|
|
[[Exception safety:] [If an exception is thrown then the queue state is unmodified.]]
|
|
|
|
]
|
|
|
|
[endsect]
|
|
[section:try_push_m `q.try_push(rve());`]
|
|
|
|
[variablelist
|
|
|
|
[[Effects:] [If the queue `q` is not full, push the `e` onto the queue moving it.]]
|
|
|
|
[[Synchronization:] [Prior pull-like operations on the same object synchronizes with this operation.]]
|
|
|
|
[[Return type:] [`bool`.]]
|
|
|
|
[[Return:] [If the queue `q` is full return `false`, otherwise return `true`;]]
|
|
|
|
[[Postcondition:] [If the call returns `true`, `! q.empty()`.]]
|
|
|
|
[[Throws:] [If the queue is closed, throws sync_queue_is_closed. Any exception thrown by the the copy of `e`.]]
|
|
|
|
[[Exception safety:] [If an exception is thrown then the queue state is unmodified.]]
|
|
|
|
]
|
|
|
|
[endsect]
|
|
[section:pull `e = q.pull()`]
|
|
|
|
[variablelist
|
|
|
|
[[Requires:] [Q::value_type is no throw copy movable. This is needed to ensure the exception safety]]
|
|
|
|
[[Effects:] [Waits until the queue is not empty and then pull the element from the queue `q` and moves the pulled element.]]
|
|
|
|
[[Synchronization:] [Prior pull-like operations on the same object synchronizes with this operation.]]
|
|
|
|
[[Postcondition:] [`! q.full()`.]]
|
|
|
|
[[Return type:] [`Q::value_type`.]]
|
|
|
|
[[Return:] [The pulled element.]]
|
|
|
|
[[Throws:] [Any exception thrown by the copy of `e`.]]
|
|
|
|
[[Exception safety:] [If an exception is thrown then the queue state is unmodified.]]
|
|
|
|
]
|
|
|
|
[endsect]
|
|
|
|
|
|
[endsect]
|
|
|
|
[section:sync_bounded_queue_ref Synchronized Bounded Queue]
|
|
|
|
#include <boost/thread/sync_bounded_queue.hpp>
|
|
|
|
namespace boost
|
|
{
|
|
struct sync_queue_is_closed : std::exception {};
|
|
|
|
template <typename ValueType>
|
|
class sync_bounded_queue;
|
|
|
|
// Stream-like operators
|
|
template <typename ValueType>
|
|
sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, ValueType&& elem);
|
|
template <typename ValueType>
|
|
sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, ValueType const&elem);
|
|
template <typename ValueType>
|
|
sync_bounded_queue<ValueType>& operator>>(sync_bounded_queue<ValueType>& sbq, ValueType &elem);
|
|
}
|
|
|
|
[section:sync_queue_is_closed Class `sync_queue_is_closed`]
|
|
|
|
#include <boost/thread/sync_bounded_queue.hpp>
|
|
|
|
namespace boost
|
|
{
|
|
struct sync_queue_is_closed : std::exception {};
|
|
}
|
|
|
|
[endsect]
|
|
|
|
[section:sync_bounded_queue Class `sync_bounded_queue<>`]
|
|
|
|
#include <boost/thread/sync_bounded_queue.hpp>
|
|
namespace boost
|
|
{
|
|
template <typename ValueType>
|
|
class sync_bounded_queue
|
|
{
|
|
public:
|
|
typedef ValueType value_type;
|
|
typedef std::size_t size_type;
|
|
|
|
sync_bounded_queue(sync_bounded_queue const&) = delete;
|
|
sync_bounded_queue& operator=(sync_bounded_queue const&) = delete;
|
|
explicit sync_bounded_queue(size_type max_elems);
|
|
template <typename Range>
|
|
sync_bounded_queue(size_type max_elems, Range range);
|
|
~sync_bounded_queue();
|
|
|
|
// Observers
|
|
bool empty() const;
|
|
bool full() const;
|
|
size_type capacity() const;
|
|
size_type size() const;
|
|
bool closed() const;
|
|
|
|
// Modifiers
|
|
void push(const value_type& x);
|
|
void push(value_type&& x);
|
|
bool try_push(const value_type& x);
|
|
bool try_push(value_type&& x);
|
|
bool try_push(no_block_tag, const value_type& x);
|
|
bool try_push(no_block_tag, value_type&& x);
|
|
|
|
void pull(value_type&);
|
|
// enable_if is_nothrow_movable<value_type>
|
|
value_type pull();
|
|
shared_ptr<ValueType> ptr_pull();
|
|
bool try_pull(value_type&);
|
|
bool try_pull(no_block_tag,value_type&);
|
|
shared_ptr<ValueType> try_pull();
|
|
|
|
void close();
|
|
};
|
|
}
|
|
|
|
[endsect]
|
|
|
|
[section:stream_out_operators Non-Member Function `operator<<()`]
|
|
|
|
#include <boost/thread/sync_bounded_queue.hpp>
|
|
namespace boost
|
|
{
|
|
template <typename ValueType>
|
|
sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, ValueType&& elem);
|
|
template <typename ValueType>
|
|
sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, ValueType const&elem);
|
|
}
|
|
|
|
[endsect]
|
|
[section:stream_in_operators Non-Member Function `operator>>()`]
|
|
|
|
#include <boost/thread/sync_bounded_queue.hpp>
|
|
namespace boost
|
|
{
|
|
template <typename ValueType>
|
|
sync_bounded_queue<ValueType>& operator>>(sync_bounded_queue<ValueType>& sbq, ValueType &elem);
|
|
}
|
|
|
|
[endsect]
|
|
[endsect]
|
|
|
|
[section:sync_queue_ref Synchronized Unbounded Queue]
|
|
|
|
#include <boost/thread/sync_queue.hpp>
|
|
namespace boost
|
|
{
|
|
template <typename ValueType>
|
|
class sync_queue;
|
|
|
|
// Stream-like operators
|
|
template <typename ValueType>
|
|
sync_queue<ValueType>& operator<<(sync_queue<ValueType>& sbq, ValueType&& elem);
|
|
template <typename ValueType>
|
|
sync_queue<ValueType>& operator<<(sync_queue<ValueType>& sbq, ValueType const&elem);
|
|
template <typename ValueType>
|
|
sync_queue<ValueType>& operator>>(sync_queue<ValueType>& sbq, ValueType &elem);
|
|
}
|
|
|
|
[section:sync_queue Class `sync_queue<>`]
|
|
|
|
#include <boost/thread/sync_queue.hpp>
|
|
|
|
namespace boost
|
|
{
|
|
template <typename ValueType>
|
|
class sync_queue
|
|
{
|
|
public:
|
|
typedef ValueType value_type;
|
|
typedef std::size_t size_type;
|
|
|
|
sync_queue(sync_queue const&) = delete;
|
|
sync_queue& operator=(sync_queue const&) = delete;
|
|
sync_queue();
|
|
explicit template <typename Range>
|
|
sync_queue(Range range);
|
|
~sync_queue();
|
|
|
|
// Observers
|
|
bool empty() const;
|
|
bool full() const;
|
|
size_type size() const;
|
|
bool closed() const;
|
|
|
|
// Modifiers
|
|
void push(const value_type& x);
|
|
void push(value_type&& x);
|
|
bool try_push(const value_type& x);
|
|
bool try_push(value_type&&) x);
|
|
bool try_push(no_block_tag, const value_type& x);
|
|
bool try_push(no_block_tag, value_type&& x);
|
|
|
|
void pull(value_type&);
|
|
// enable_if is_nothrow_movable<value_type>
|
|
value_type pull();
|
|
shared_ptr<ValueType> ptr_pull();
|
|
bool try_pull(value_type&);
|
|
bool try_pull(no_block_tag,value_type&);
|
|
shared_ptr<ValueType> try_pull();
|
|
|
|
void close();
|
|
};
|
|
}
|
|
|
|
[endsect]
|
|
|
|
[section:stream_out_operators Non-Member Function `operator<<()`]
|
|
|
|
#include <boost/thread/sync_queue.hpp>
|
|
namespace boost
|
|
{
|
|
template <typename ValueType>
|
|
sync_queue<ValueType>& operator<<(sync_queue<ValueType>& sbq, ValueType&& elem);
|
|
template <typename ValueType>
|
|
sync_queue<ValueType>& operator<<(sync_queue<ValueType>& sbq, ValueType const&elem);
|
|
}
|
|
|
|
[endsect]
|
|
[section:stream_in_operators Non-Member Function `operator>>()`]
|
|
|
|
#include <boost/thread/sync_queue.hpp>
|
|
namespace boost
|
|
{
|
|
template <typename ValueType>
|
|
sync_queue<ValueType>& operator>>(sync_queue<ValueType>& sbq, ValueType &elem);
|
|
}
|
|
|
|
[endsect]
|
|
|
|
[endsect]
|
|
|
|
[endsect]
|
|
[endsect]
|