From 8feaf646ccb9702f2d808d1291bcca238125e649 Mon Sep 17 00:00:00 2001 From: Jan Gaspar Date: Mon, 19 Dec 2005 23:11:09 +0000 Subject: [PATCH] added bounded buffer [SVN r2785] --- include/boost/circular_buffer/base.hpp | 4 +- test/Jamfile | 1 + test/base_test.cpp | 2 +- test/bounded_buffer.cpp | 97 +++++++++++++++ test/cb_comparison.cpp | 159 +++++++++++++++++++++++++ test/iterator_test.cpp | 2 +- test/space_optimized_test.cpp | 2 +- 7 files changed, 262 insertions(+), 5 deletions(-) create mode 100644 test/bounded_buffer.cpp create mode 100644 test/cb_comparison.cpp diff --git a/include/boost/circular_buffer/base.hpp b/include/boost/circular_buffer/base.hpp index 9b15dd0..ef7dbbb 100644 --- a/include/boost/circular_buffer/base.hpp +++ b/include/boost/circular_buffer/base.hpp @@ -97,10 +97,10 @@ public: // Container specific types - //! An array range. + //! An array range. TODO - better doc typedef std::pair array_range; - //! A range of a const array. + //! A range of a const array. TODO - better doc typedef std::pair const_array_range; // Helper types diff --git a/test/Jamfile b/test/Jamfile index a7580cf..c3d7670 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -14,5 +14,6 @@ DEPENDS all : circular_buffer ; : [ run base_test.cpp ] [ run space_optimized_test.cpp ] [ run iterator_test.cpp ] + [ compile bounded_buffer.cpp ] ; } diff --git a/test/base_test.cpp b/test/base_test.cpp index 66a4b1f..3310e7a 100644 --- a/test/base_test.cpp +++ b/test/base_test.cpp @@ -482,7 +482,7 @@ void exception_safety_test() { } // test main -test_suite* init_unit_test_suite(int argc, char* argv[]) { +test_suite* init_unit_test_suite(int /*argc*/, char*[] /*argv*/) { test_suite* tests = BOOST_TEST_SUITE("Unit tests for the circular_buffer."); add_common_tests(tests); diff --git a/test/bounded_buffer.cpp b/test/bounded_buffer.cpp new file mode 100644 index 0000000..dd7b1e1 --- /dev/null +++ b/test/bounded_buffer.cpp @@ -0,0 +1,97 @@ + +#include +#include +#include +#include +#include +#include +#include +#include + +template +class bounded_buffer { +public: + + typedef boost::circular_buffer buffer_type; + typedef typename buffer_type::size_type size_type; + typedef typename buffer_type::value_type value_type; + + bounded_buffer(size_type capacity) : m_unread(0), m_buffer(capacity) {} + + void push_back(const value_type& item) { + boost::mutex::scoped_lock lock(m_mutex); + m_full_condition.wait(lock, boost::bind(&bounded_buffer::is_not_full, this)); + m_buffer.push_back(item); + ++m_unread; + m_empty_condition.notify_one(); + } + + value_type pop_front() { + boost::mutex::scoped_lock lock(m_mutex); + m_empty_condition.wait(lock, boost::bind(&bounded_buffer::is_not_empty, this)); + value_type& item = m_buffer[m_buffer.size() - (m_unread--)]; + m_full_condition.notify_one(); + return item; + } + +private: + + bool is_not_empty() const { return m_unread > 0; } + bool is_not_full() const { return m_unread < m_buffer.capacity(); } + + size_type m_unread; + buffer_type m_buffer; + boost::mutex m_mutex; + boost::condition m_empty_condition; + boost::condition m_full_condition; +}; + +void go_sleep(unsigned int sec) { + boost::xtime t; + if (boost::TIME_UTC != boost::xtime_get(&t, boost::TIME_UTC)) + throw boost::unsupported_thread_option(); + t.sec += sec; + boost::thread::sleep(t); +} + +void produce(bounded_buffer* buffer) { + go_sleep(1); + std::cout << "producer thread: will write first 50 items"; + std::cout << std::endl << std::endl; + int i = 1; + for (; i <= 50; ++i) { + buffer->push_back(i); + } + go_sleep(1); + std::cout << std::endl << std::endl; + std::cout << "producer thread: will wait for 5 seconds and then write another 50 items" << std::endl; + std::cout << std::endl << std::endl; + go_sleep(5); + for (; i <= 100; ++i) { + buffer->push_back(i); + } +} + +void consume(bounded_buffer* buffer) { + std::cout << "consumer thread: will wait for 5 seconds" << std::endl; + go_sleep(5); + for (int i = 1; i <= 100; ++i) { + std::cout << buffer->pop_front() << ' '; + } + std::cout << std::endl << std::endl; + std::cout << "consumer thread: just have finished reading all 100 items"; + std::cout << std::endl << std::endl; +} + +int main(int argc, char* argv[]) +{ + bounded_buffer buffer(10); + + boost::thread consumer(boost::bind(&consume, &buffer)); + boost::thread producer(boost::bind(&produce, &buffer)); + + consumer.join(); + producer.join(); + + return 0; +} diff --git a/test/cb_comparison.cpp b/test/cb_comparison.cpp new file mode 100644 index 0000000..997c1b2 --- /dev/null +++ b/test/cb_comparison.cpp @@ -0,0 +1,159 @@ + +#define BOOST_CB_DISABLE_DEBUG 1 + + +#include "boost/circular_buffer.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +const unsigned int queue_size = 500L; +const unsigned int total_elements = queue_size * 1000L; + +template +class bounded_buffer { +public: + + typedef boost::circular_buffer buffer_type; + typedef typename buffer_type::size_type size_type; + typedef typename buffer_type::value_type value_type; + + bounded_buffer(size_type capacity) : m_unread(0), m_buffer(capacity) {} + + void push_back(const value_type& item) { + boost::mutex::scoped_lock lock(m_mutex); + m_full_condition.wait(lock, boost::bind(&bounded_buffer::is_not_full, this)); + m_empty_condition.notify_one(); + m_buffer.push_back(item); + ++m_unread; + } + + value_type pop_front() { + boost::mutex::scoped_lock lock(m_mutex); + m_empty_condition.wait(lock, boost::bind(&bounded_buffer::is_not_empty, this)); + m_full_condition.notify_one(); + return m_buffer[m_buffer.size() - (m_unread--)]; + } + +private: + + bool is_not_empty() const { return m_unread > 0;} + bool is_not_full() const { return m_unread < m_buffer.capacity();} + + size_type m_unread; + buffer_type m_buffer; + boost::mutex m_mutex; + boost::condition m_empty_condition; + boost::condition m_full_condition; +}; + +template +class deque_bounded_buffer { +public: + + typedef std::deque buffer_type; + typedef typename buffer_type::size_type size_type; + typedef typename buffer_type::value_type value_type; + + deque_bounded_buffer(size_type capacity) : m_capacity(capacity), m_buffer(capacity) {} + + void push_back(const value_type& item) { + boost::mutex::scoped_lock lock(m_mutex); + m_full_condition.wait(lock, boost::bind(&deque_bounded_buffer::is_not_full, this)); + m_empty_condition.notify_one(); + m_buffer.push_back(item); + } + + value_type pop_front() { + boost::mutex::scoped_lock lock(m_mutex); + m_empty_condition.wait(lock, boost::bind(&deque_bounded_buffer::is_not_empty, this)); + m_full_condition.notify_one(); + value_type item = m_buffer.front(); + m_buffer.pop_front(); + return item; + } + +private: + + bool is_not_empty() const { return m_buffer.size() > 0; } + bool is_not_full() const { return m_buffer.size() < m_capacity; } + + const size_type m_capacity; + buffer_type m_buffer; + boost::mutex m_mutex; + boost::condition m_empty_condition; + boost::condition m_full_condition; +}; + +template +class reader { + + typedef typename Queue::value_type value_type; + Queue& m_q; + +public: + reader(Queue& q) : m_q(q) { } + + void operator()() { + for (int i = 0; i < total_elements; ++i) { + value_type item = m_q.pop_front(); + } + } +}; + +template +class writer { + + typedef typename Queue::value_type value_type; + Queue& m_q; + +public: + writer(Queue& q) : m_q(q) {} + + void operator()() { + for (int i = 0; i < total_elements; ++i) { + m_q.push_back(value_type()); + } + } +}; + +template +void fifo_test() +{ + boost::progress_timer t; + + Queue q(queue_size); + + reader reader(q); + writer writer(q); + + boost::thread read(reader); + boost::thread write(writer); + + read.join(); + write.join(); +} + +int main(int argc, char* argv[]) +{ + std::cout << "bounded_buffer: "; + fifo_test< bounded_buffer >(); + + std::cout << "deque_bounded_buffer: "; + fifo_test< deque_bounded_buffer >(); + + std::cout << "bounded_buffer: "; + fifo_test< bounded_buffer >(); + + std::cout << "deque_bounded_buffer: "; + fifo_test< deque_bounded_buffer >(); + + return 0; +} diff --git a/test/iterator_test.cpp b/test/iterator_test.cpp index d6416c2..de0bca6 100644 --- a/test/iterator_test.cpp +++ b/test/iterator_test.cpp @@ -21,7 +21,7 @@ void validity_test() { } // test main -test_suite* init_unit_test_suite(int argc, char* argv[]) { +test_suite* init_unit_test_suite(int /*argc*/, char*[] /*argv*/) { test_suite* tests = BOOST_TEST_SUITE("Unit tests for the iterator of the circular_buffer."); diff --git a/test/space_optimized_test.cpp b/test/space_optimized_test.cpp index e5dac10..7277013 100644 --- a/test/space_optimized_test.cpp +++ b/test/space_optimized_test.cpp @@ -61,7 +61,7 @@ void min_capacity_test() { } // test main -test_suite* init_unit_test_suite(int argc, char* argv[]) { +test_suite* init_unit_test_suite(int /*argc*/, char*[] /*argv*/) { test_suite* tests = BOOST_TEST_SUITE("Unit tests for the circular_buffer_space_optimized."); add_common_tests(tests);