From f275125b9353d6766fbdfe33cf376f8300ff776b Mon Sep 17 00:00:00 2001 From: Oliver Kowalke Date: Sun, 2 Aug 2015 20:48:52 +0200 Subject: [PATCH] fixes for unbounded_channel --- doc/channel.qbk | 2 + include/boost/fiber/unbounded_channel.hpp | 31 +- test/Jamfile.v2 | 15 + test/test_unbounded_channel.cpp | 336 ++++++++++++++++++++++ 4 files changed, 379 insertions(+), 5 deletions(-) create mode 100644 test/test_unbounded_channel.cpp diff --git a/doc/channel.qbk b/doc/channel.qbk index 870e07cf..afecee77 100644 --- a/doc/channel.qbk +++ b/doc/channel.qbk @@ -88,6 +88,7 @@ channel operations return the state of the channel. bool is_empty(); + channel_op_status push( value_type const& va); channel_op_status push( value_type && va); channel_op_status pop( value_type & va); @@ -153,6 +154,7 @@ non-empty.]] [member_heading unbounded_channel..push] + channel_op_status push( value_type const& va); channel_op_status push( value_type && va); [variablelist diff --git a/include/boost/fiber/unbounded_channel.hpp b/include/boost/fiber/unbounded_channel.hpp index 13795f0c..8fead881 100644 --- a/include/boost/fiber/unbounded_channel.hpp +++ b/include/boost/fiber/unbounded_channel.hpp @@ -18,10 +18,11 @@ #include #include -#include -#include -#include #include +#include +#include +#include +#include #ifdef BOOST_HAS_ABI_HEADERS # include BOOST_ABI_PREFIX @@ -45,6 +46,13 @@ private: T va; ptr nxt; + explicit node( T const& t, allocator_type & alloc_) : + use_count( 0), + alloc( alloc_), + va( t), + nxt() { + } + explicit node( T && t, allocator_type & alloc_) : use_count( 0), alloc( alloc_), @@ -120,12 +128,12 @@ private: tail_ = & new_node->nxt; } - value_type value_pop_() { + value_type & value_pop_() { BOOST_ASSERT( ! is_empty_() ); try { typename node::ptr old_head = pop_head_(); - return std::move( old_head->va); + return old_head->va; } catch (...) { close_(); throw; @@ -173,6 +181,13 @@ public: return is_empty_(); } + channel_op_status push( value_type const& va) { + typename node::ptr new_node( + new ( alloc_.allocate( 1) ) node( va, alloc_) ); + std::unique_lock< mutex > lk( mtx_); + return push_( new_node, lk); + } + channel_op_status push( value_type && va) { typename node::ptr new_node( new ( alloc_.allocate( 1) ) node( std::forward< value_type >( va), alloc_) ); @@ -213,10 +228,16 @@ public: std::unique_lock< mutex > lk( mtx_); if ( is_closed_() && is_empty_() ) { + // let other fibers run + lk.unlock(); + this_fiber::yield(); return channel_op_status::closed; } if ( is_empty_() ) { + // let other fibers run + lk.unlock(); + this_fiber::yield(); return channel_op_status::empty; } diff --git a/test/Jamfile.v2 b/test/Jamfile.v2 index 312ed7e4..7b27ca64 100644 --- a/test/Jamfile.v2 +++ b/test/Jamfile.v2 @@ -117,6 +117,21 @@ run test_barrier.cpp : cxx11_variadic_templates cxx14_initialized_lambda_captures ] ; +run test_unbounded_channel.cpp : + : : + [ requires cxx11_constexpr + cxx11_decltype + cxx11_deleted_functions + cxx11_explicit_conversion_operators + cxx11_hdr_tuple cxx11_lambdas + cxx11_noexcept + cxx11_nullptr + cxx11_template_aliases + cxx11_rvalue_references + cxx11_variadic_macros + cxx11_variadic_templates + cxx14_initialized_lambda_captures ] ; + run test_fss.cpp : : : [ requires cxx11_constexpr diff --git a/test/test_unbounded_channel.cpp b/test/test_unbounded_channel.cpp new file mode 100644 index 00000000..c2658bfa --- /dev/null +++ b/test/test_unbounded_channel.cpp @@ -0,0 +1,336 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include + +#include +#include + +#include + +bool value1 = false; + +void test_push() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( 1) ); + BOOST_CHECK( ! c.is_empty() ); +} + +void test_push_closed() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( ! c.is_closed() ); + c.close(); + BOOST_CHECK( c.is_closed() ); + BOOST_CHECK( boost::fibers::channel_op_status::closed == c.push( 1) ); +} + +void test_pop() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop( v2) ); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_pop_closed() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( ! c.is_closed() ); + c.close(); + BOOST_CHECK( c.is_closed() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop( v2) ); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); + BOOST_CHECK( boost::fibers::channel_op_status::closed == c.pop( v2) ); +} + +void test_pop_success() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + boost::fibers::fiber f1([&c,&v2](){ + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop( v2) ); + BOOST_CHECK( c.is_empty() ); + }); + boost::fibers::fiber f2([&c,v1](){ + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + }); + f1.join(); + BOOST_CHECK( c.is_empty() ); + f2.join(); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_value_pop() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + v2 = c.value_pop(); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_value_pop_closed() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( ! c.is_closed() ); + c.close(); + BOOST_CHECK( c.is_closed() ); + v2 = c.value_pop(); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); + bool thrown = false; + try { + c.value_pop(); + } catch ( boost::fibers::fiber_exception const&) { + thrown = true; + } + BOOST_CHECK( thrown); +} + +void test_value_pop_success() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + boost::fibers::fiber f1([&c,&v2](){ + v2 = c.value_pop(); + BOOST_CHECK( c.is_empty() ); + }); + boost::fibers::fiber f2([&c,v1](){ + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + }); + f1.join(); + BOOST_CHECK( c.is_empty() ); + f2.join(); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_try_pop() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.try_pop( v2) ); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_try_pop_closed() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( ! c.is_closed() ); + c.close(); + BOOST_CHECK( c.is_closed() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.try_pop( v2) ); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); + BOOST_CHECK( boost::fibers::channel_op_status::closed == c.try_pop( v2) ); +} + +void test_try_pop_success() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + boost::fibers::fiber f1([&c,&v2](){ + while ( boost::fibers::channel_op_status::success != c.try_pop( v2) ); + BOOST_CHECK( c.is_empty() ); + }); + boost::fibers::fiber f2([&c,v1](){ + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + }); + f1.join(); + BOOST_CHECK( c.is_empty() ); + f2.join(); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_pop_wait_for() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_for( v2, std::chrono::seconds( 1) ) ); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_pop_wait_for_closed() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( ! c.is_closed() ); + c.close(); + BOOST_CHECK( c.is_closed() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_for( v2, std::chrono::seconds( 1) ) ); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); + BOOST_CHECK( boost::fibers::channel_op_status::closed == c.pop_wait_for( v2, std::chrono::seconds( 1) ) ); +} + +void test_pop_wait_for_success() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + boost::fibers::fiber f1([&c,&v2](){ + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_for( v2, std::chrono::seconds( 1) ) ); + BOOST_CHECK( c.is_empty() ); + }); + boost::fibers::fiber f2([&c,v1](){ + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + }); + f1.join(); + BOOST_CHECK( c.is_empty() ); + f2.join(); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_pop_wait_for_timeout() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v = 0; + boost::fibers::fiber f([&c,&v](){ + BOOST_CHECK( boost::fibers::channel_op_status::timeout == c.pop_wait_for( v, std::chrono::seconds( 1) ) ); + BOOST_CHECK( c.is_empty() ); + }); + f.join(); +} + +void test_pop_wait_until() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_until( v2, + std::chrono::system_clock::now() + std::chrono::seconds( 1) ) ); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_pop_wait_until_closed() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + BOOST_CHECK( ! c.is_closed() ); + c.close(); + BOOST_CHECK( c.is_closed() ); + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_until( v2, + std::chrono::system_clock::now() + std::chrono::seconds( 1) ) ); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); + BOOST_CHECK( boost::fibers::channel_op_status::closed == c.pop_wait_until( v2, + std::chrono::system_clock::now() + std::chrono::seconds( 1) ) ); +} + +void test_pop_wait_until_success() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v1 = 2, v2 = 0; + boost::fibers::fiber f1([&c,&v2](){ + BOOST_CHECK( boost::fibers::channel_op_status::success == c.pop_wait_until( v2, + std::chrono::system_clock::now() + std::chrono::seconds( 1) ) ); + BOOST_CHECK( c.is_empty() ); + }); + boost::fibers::fiber f2([&c,v1](){ + BOOST_CHECK( boost::fibers::channel_op_status::success == c.push( v1) ); + BOOST_CHECK( ! c.is_empty() ); + }); + f1.join(); + BOOST_CHECK( c.is_empty() ); + f2.join(); + BOOST_CHECK( c.is_empty() ); + BOOST_CHECK_EQUAL( v1, v2); +} + +void test_pop_wait_until_timeout() +{ + boost::fibers::unbounded_channel< int > c; + BOOST_CHECK( c.is_empty() ); + int v = 0; + boost::fibers::fiber f([&c,&v](){ + BOOST_CHECK( boost::fibers::channel_op_status::timeout == c.pop_wait_until( v, + std::chrono::system_clock::now() + std::chrono::seconds( 1) ) ); + BOOST_CHECK( c.is_empty() ); + }); + f.join(); +} + +boost::unit_test::test_suite * init_unit_test_suite( int, char* []) +{ + boost::unit_test::test_suite * test = + BOOST_TEST_SUITE("Boost.Fiber: unbounded_channel test suite"); + + test->add( BOOST_TEST_CASE( & test_push) ); + test->add( BOOST_TEST_CASE( & test_push_closed) ); + test->add( BOOST_TEST_CASE( & test_pop) ); + test->add( BOOST_TEST_CASE( & test_pop_closed) ); + test->add( BOOST_TEST_CASE( & test_pop_success) ); + test->add( BOOST_TEST_CASE( & test_value_pop) ); + test->add( BOOST_TEST_CASE( & test_value_pop_closed) ); + test->add( BOOST_TEST_CASE( & test_value_pop_success) ); + test->add( BOOST_TEST_CASE( & test_try_pop) ); + test->add( BOOST_TEST_CASE( & test_try_pop_closed) ); + test->add( BOOST_TEST_CASE( & test_try_pop_success) ); + test->add( BOOST_TEST_CASE( & test_pop_wait_for) ); + test->add( BOOST_TEST_CASE( & test_pop_wait_for_closed) ); + test->add( BOOST_TEST_CASE( & test_pop_wait_for_success) ); + test->add( BOOST_TEST_CASE( & test_pop_wait_for_timeout) ); + test->add( BOOST_TEST_CASE( & test_pop_wait_until) ); + test->add( BOOST_TEST_CASE( & test_pop_wait_until_closed) ); + test->add( BOOST_TEST_CASE( & test_pop_wait_until_success) ); + test->add( BOOST_TEST_CASE( & test_pop_wait_until_timeout) ); + + return test; +}