2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-13 12:22:36 +00:00

fixes for unbounded_channel

This commit is contained in:
Oliver Kowalke
2015-08-02 20:48:52 +02:00
parent 6a0b14e3e8
commit f275125b93
4 changed files with 379 additions and 5 deletions

View File

@@ -88,6 +88,7 @@ channel operations return the state of the channel.
bool is_empty();
channel_op_status push( value_type const& va);
channel_op_status push( value_type && va);
channel_op_status pop( value_type & va);
@@ -153,6 +154,7 @@ non-empty.]]
[member_heading unbounded_channel..push]
channel_op_status push( value_type const& va);
channel_op_status push( value_type && va);
[variablelist

View File

@@ -18,10 +18,11 @@
#include <boost/config.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/condition.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/channel_op_status.hpp>
#include <boost/fiber/condition.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
@@ -45,6 +46,13 @@ private:
T va;
ptr nxt;
explicit node( T const& t, allocator_type & alloc_) :
use_count( 0),
alloc( alloc_),
va( t),
nxt() {
}
explicit node( T && t, allocator_type & alloc_) :
use_count( 0),
alloc( alloc_),
@@ -120,12 +128,12 @@ private:
tail_ = & new_node->nxt;
}
value_type value_pop_() {
value_type & value_pop_() {
BOOST_ASSERT( ! is_empty_() );
try {
typename node::ptr old_head = pop_head_();
return std::move( old_head->va);
return old_head->va;
} catch (...) {
close_();
throw;
@@ -173,6 +181,13 @@ public:
return is_empty_();
}
channel_op_status push( value_type const& va) {
typename node::ptr new_node(
new ( alloc_.allocate( 1) ) node( va, alloc_) );
std::unique_lock< mutex > lk( mtx_);
return push_( new_node, lk);
}
channel_op_status push( value_type && va) {
typename node::ptr new_node(
new ( alloc_.allocate( 1) ) node( std::forward< value_type >( va), alloc_) );
@@ -213,10 +228,16 @@ public:
std::unique_lock< mutex > lk( mtx_);
if ( is_closed_() && is_empty_() ) {
// let other fibers run
lk.unlock();
this_fiber::yield();
return channel_op_status::closed;
}
if ( is_empty_() ) {
// let other fibers run
lk.unlock();
this_fiber::yield();
return channel_op_status::empty;
}

View File

@@ -117,6 +117,21 @@ run test_barrier.cpp :
cxx11_variadic_templates
cxx14_initialized_lambda_captures ] ;
run test_unbounded_channel.cpp :
: :
[ requires cxx11_constexpr
cxx11_decltype
cxx11_deleted_functions
cxx11_explicit_conversion_operators
cxx11_hdr_tuple cxx11_lambdas
cxx11_noexcept
cxx11_nullptr
cxx11_template_aliases
cxx11_rvalue_references
cxx11_variadic_macros
cxx11_variadic_templates
cxx14_initialized_lambda_captures ] ;
run test_fss.cpp :
: :
[ requires cxx11_constexpr

View File

@@ -0,0 +1,336 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <chrono>
#include <sstream>
#include <string>
#include <boost/assert.hpp>
#include <boost/test/unit_test.hpp>
#include <boost/fiber/all.hpp>
bool value1 = false;
void test_push()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( 1) );
BOOST_CHECK( ! c.is_empty() );
}
void test_push_closed()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( ! c.is_closed() );
c.close();
BOOST_CHECK( c.is_closed() );
BOOST_CHECK( boost::fibers::channel_op_status::closed == c.push( 1) );
}
void test_pop()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop( v2) );
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_pop_closed()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( ! c.is_closed() );
c.close();
BOOST_CHECK( c.is_closed() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop( v2) );
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
BOOST_CHECK( boost::fibers::channel_op_status::closed == c.pop( v2) );
}
void test_pop_success()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
boost::fibers::fiber f1([&c,&v2](){
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop( v2) );
BOOST_CHECK( c.is_empty() );
});
boost::fibers::fiber f2([&c,v1](){
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
});
f1.join();
BOOST_CHECK( c.is_empty() );
f2.join();
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_value_pop()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
v2 = c.value_pop();
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_value_pop_closed()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( ! c.is_closed() );
c.close();
BOOST_CHECK( c.is_closed() );
v2 = c.value_pop();
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
bool thrown = false;
try {
c.value_pop();
} catch ( boost::fibers::fiber_exception const&) {
thrown = true;
}
BOOST_CHECK( thrown);
}
void test_value_pop_success()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
boost::fibers::fiber f1([&c,&v2](){
v2 = c.value_pop();
BOOST_CHECK( c.is_empty() );
});
boost::fibers::fiber f2([&c,v1](){
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
});
f1.join();
BOOST_CHECK( c.is_empty() );
f2.join();
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_try_pop()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.try_pop( v2) );
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_try_pop_closed()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( ! c.is_closed() );
c.close();
BOOST_CHECK( c.is_closed() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.try_pop( v2) );
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
BOOST_CHECK( boost::fibers::channel_op_status::closed == c.try_pop( v2) );
}
void test_try_pop_success()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
boost::fibers::fiber f1([&c,&v2](){
while ( boost::fibers::channel_op_status::success != c.try_pop( v2) );
BOOST_CHECK( c.is_empty() );
});
boost::fibers::fiber f2([&c,v1](){
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
});
f1.join();
BOOST_CHECK( c.is_empty() );
f2.join();
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_pop_wait_for()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_for( v2, std::chrono::seconds( 1) ) );
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_pop_wait_for_closed()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( ! c.is_closed() );
c.close();
BOOST_CHECK( c.is_closed() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_for( v2, std::chrono::seconds( 1) ) );
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
BOOST_CHECK( boost::fibers::channel_op_status::closed == c.pop_wait_for( v2, std::chrono::seconds( 1) ) );
}
void test_pop_wait_for_success()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
boost::fibers::fiber f1([&c,&v2](){
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_for( v2, std::chrono::seconds( 1) ) );
BOOST_CHECK( c.is_empty() );
});
boost::fibers::fiber f2([&c,v1](){
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
});
f1.join();
BOOST_CHECK( c.is_empty() );
f2.join();
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_pop_wait_for_timeout()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v = 0;
boost::fibers::fiber f([&c,&v](){
BOOST_CHECK( boost::fibers::channel_op_status::timeout == c.pop_wait_for( v, std::chrono::seconds( 1) ) );
BOOST_CHECK( c.is_empty() );
});
f.join();
}
void test_pop_wait_until()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_until( v2,
std::chrono::system_clock::now() + std::chrono::seconds( 1) ) );
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_pop_wait_until_closed()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
BOOST_CHECK( ! c.is_closed() );
c.close();
BOOST_CHECK( c.is_closed() );
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_until( v2,
std::chrono::system_clock::now() + std::chrono::seconds( 1) ) );
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
BOOST_CHECK( boost::fibers::channel_op_status::closed == c.pop_wait_until( v2,
std::chrono::system_clock::now() + std::chrono::seconds( 1) ) );
}
void test_pop_wait_until_success()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v1 = 2, v2 = 0;
boost::fibers::fiber f1([&c,&v2](){
BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_until( v2,
std::chrono::system_clock::now() + std::chrono::seconds( 1) ) );
BOOST_CHECK( c.is_empty() );
});
boost::fibers::fiber f2([&c,v1](){
BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) );
BOOST_CHECK( ! c.is_empty() );
});
f1.join();
BOOST_CHECK( c.is_empty() );
f2.join();
BOOST_CHECK( c.is_empty() );
BOOST_CHECK_EQUAL( v1, v2);
}
void test_pop_wait_until_timeout()
{
boost::fibers::unbounded_channel< int > c;
BOOST_CHECK( c.is_empty() );
int v = 0;
boost::fibers::fiber f([&c,&v](){
BOOST_CHECK( boost::fibers::channel_op_status::timeout == c.pop_wait_until( v,
std::chrono::system_clock::now() + std::chrono::seconds( 1) ) );
BOOST_CHECK( c.is_empty() );
});
f.join();
}
boost::unit_test::test_suite * init_unit_test_suite( int, char* [])
{
boost::unit_test::test_suite * test =
BOOST_TEST_SUITE("Boost.Fiber: unbounded_channel test suite");
test->add( BOOST_TEST_CASE( & test_push) );
test->add( BOOST_TEST_CASE( & test_push_closed) );
test->add( BOOST_TEST_CASE( & test_pop) );
test->add( BOOST_TEST_CASE( & test_pop_closed) );
test->add( BOOST_TEST_CASE( & test_pop_success) );
test->add( BOOST_TEST_CASE( & test_value_pop) );
test->add( BOOST_TEST_CASE( & test_value_pop_closed) );
test->add( BOOST_TEST_CASE( & test_value_pop_success) );
test->add( BOOST_TEST_CASE( & test_try_pop) );
test->add( BOOST_TEST_CASE( & test_try_pop_closed) );
test->add( BOOST_TEST_CASE( & test_try_pop_success) );
test->add( BOOST_TEST_CASE( & test_pop_wait_for) );
test->add( BOOST_TEST_CASE( & test_pop_wait_for_closed) );
test->add( BOOST_TEST_CASE( & test_pop_wait_for_success) );
test->add( BOOST_TEST_CASE( & test_pop_wait_for_timeout) );
test->add( BOOST_TEST_CASE( & test_pop_wait_until) );
test->add( BOOST_TEST_CASE( & test_pop_wait_until_closed) );
test->add( BOOST_TEST_CASE( & test_pop_wait_until_success) );
test->add( BOOST_TEST_CASE( & test_pop_wait_until_timeout) );
return test;
}