From 85092ef741f6481953be86688bb629f4635e9c3f Mon Sep 17 00:00:00 2001 From: Andrey Semashev Date: Sun, 13 Mar 2016 15:58:11 +0300 Subject: [PATCH] Added a new queue overflow policy which allows to return an error code if the queue is full. Added a test for the new policy. Changed the interface slightly to use the common message size typedef and use this typedef to check the message size before enqueueing it. --- include/boost/log/exceptions.hpp | 5 ++ .../sinks/text_ipc_message_queue_backend.hpp | 10 ++- .../utility/ipc/reliable_message_queue.hpp | 68 +++++++++------- src/exceptions.cpp | 16 ++++ src/posix/ipc_reliable_message_queue.cpp | 81 ++++++++++--------- src/windows/ipc_reliable_message_queue.cpp | 79 +++++++++--------- test/run/util_ipc_reliable_mq.cpp | 11 +++ 7 files changed, 162 insertions(+), 108 deletions(-) diff --git a/include/boost/log/exceptions.hpp b/include/boost/log/exceptions.hpp index 2c5476f..32d55d4 100644 --- a/include/boost/log/exceptions.hpp +++ b/include/boost/log/exceptions.hpp @@ -332,6 +332,11 @@ public: * Destructor */ ~logic_error() throw(); + +#ifndef BOOST_LOG_DOXYGEN_PASS + static BOOST_LOG_NORETURN void throw_(const char* file, std::size_t line, const char* descr); + static BOOST_LOG_NORETURN void throw_(const char* file, std::size_t line, std::string const& descr); +#endif }; /*! diff --git a/include/boost/log/sinks/text_ipc_message_queue_backend.hpp b/include/boost/log/sinks/text_ipc_message_queue_backend.hpp index b7096f7..370ccfb 100644 --- a/include/boost/log/sinks/text_ipc_message_queue_backend.hpp +++ b/include/boost/log/sinks/text_ipc_message_queue_backend.hpp @@ -18,6 +18,7 @@ #ifndef BOOST_LOG_SINKS_TEXT_IPC_MESSAGE_QUEUE_BACKEND_HPP_INCLUDED_ #define BOOST_LOG_SINKS_TEXT_IPC_MESSAGE_QUEUE_BACKEND_HPP_INCLUDED_ +#include #include #include #include @@ -28,6 +29,7 @@ #include #include #include +#include #include #ifdef BOOST_HAS_PRAGMA_ONCE @@ -145,7 +147,13 @@ public: void consume(record_view const&, string_type const& formatted_message) { if (m_queue.is_open()) - m_queue.send(formatted_message.data(), static_cast< uint32_t >(formatted_message.size())); + { + typedef typename queue_type::size_type size_type; + const string_type::size_type size = formatted_message.size(); + if (BOOST_UNLIKELY(size > static_cast< string_type::size_type >((std::numeric_limits< size_type >::max)()))) + BOOST_LOG_THROW_DESCR(limitation_error, "Message too long to send to an interprocess queue"); + m_queue.send(formatted_message.data(), static_cast< size_type >(size)); + } } }; diff --git a/include/boost/log/utility/ipc/reliable_message_queue.hpp b/include/boost/log/utility/ipc/reliable_message_queue.hpp index 0a6abe8..c3663db 100644 --- a/include/boost/log/utility/ipc/reliable_message_queue.hpp +++ b/include/boost/log/utility/ipc/reliable_message_queue.hpp @@ -75,10 +75,10 @@ struct enable_if_byte< unsigned char, R > { typedef R type; }; * The queue is considered empty when no messages are enqueued (all blocks are free). The queue is considered full at the point * of enqueueing a message when there is not enough free blocks to accommodate the message. * - * The queue is reliable in that it will not drop messages that are not received by the reader, other than the case when a - * non-empty queue is destroyed by the last user. If a message cannot be enqueued by the writer because the queue is full, - * the queue can either block the writer or throw an exception, depending on the policy specified at the queue creation. - * The policy is object local, i.e. different writers and the reader can have different overflow policies. + * The queue is reliable in that it will not drop successfully sent messages that are not received by the reader, other than the + * case when a non-empty queue is destroyed by the last user. If a message cannot be enqueued by the writer because the queue is + * full, the queue can either block the writer or return an error or throw an exception, depending on the policy specified at + * the queue creation. The policy is object local, i.e. different writers and the reader can have different overflow policies. * * If the queue is empty and the reader attempts to dequeue a message, it will block until a message is enqueued by a writer. * @@ -87,8 +87,8 @@ struct enable_if_byte< unsigned char, R > { typedef R type; }; * or other processes) are unaffected. In order to restore the normal functioning of the queue instance after the \c stop_local * call the user has to invoke \c reset_local. * - * The queue does not guarantee any particular order of received messages from different writers. Messages sent by a particular - * writer will be received in the order of sending. + * The queue does not guarantee any particular order of received messages from different writer threads. Messages sent by a + * particular writer thread will be received in the order of sending. * * Methods of this class are not thread-safe, unless otherwise specified. */ @@ -99,6 +99,7 @@ public: enum operation_result { succeeded, //!< The operation has completed successfully + no_space, //!< The message could not be sent because the queue is full aborted //!< The operation has been aborted because the queue method stop_local() has been called }; @@ -107,21 +108,26 @@ public: { //! Block the send operation when the queue is full block_on_overflow, + //! Return \c operation_result::no_space when the queue is full + fail_on_overflow, //! Throw an exception when the queue is full throw_on_overflow }; + //! Queue message size type + typedef uint32_t size_type; + #if !defined(BOOST_LOG_DOXYGEN_PASS) BOOST_MOVABLE_BUT_NOT_COPYABLE(reliable_message_queue) private: - typedef void (*receive_handler)(void* state, const void* data, uint32_t size); + typedef void (*receive_handler)(void* state, const void* data, size_type size); struct fixed_buffer_state { uint8_t* data; - uint32_t size; + size_type size; }; struct implementation; @@ -158,7 +164,7 @@ public: open_mode::create_only_tag, object_name const& name, uint32_t capacity, - uint32_t block_size, + size_type block_size, overflow_policy oflow_policy = block_on_overflow, permissions const& perms = permissions() ) : @@ -187,7 +193,7 @@ public: open_mode::open_or_create_tag, object_name const& name, uint32_t capacity, - uint32_t block_size, + size_type block_size, overflow_policy oflow_policy = block_on_overflow, permissions const& perms = permissions() ) : @@ -318,7 +324,7 @@ public: ( object_name const& name, uint32_t capacity, - uint32_t block_size, + size_type block_size, overflow_policy oflow_policy = block_on_overflow, permissions const& perms = permissions() ); @@ -343,7 +349,7 @@ public: ( object_name const& name, uint32_t capacity, - uint32_t block_size, + size_type block_size, overflow_policy oflow_policy = block_on_overflow, permissions const& perms = permissions() ); @@ -419,7 +425,7 @@ public: * * \return Allocation block size, in bytes. */ - BOOST_LOG_API uint32_t block_size() const; + BOOST_LOG_API size_type block_size() const; /*! * The method wakes up all threads that are blocked in calls to send() or @@ -486,13 +492,14 @@ public: * \param message_size Size of the message data in bytes. If the size is larger than * the associated message queue capacity, an std::logic_error exception is thrown. * - * \return \c operation_result::succeeded if the operation is successful, or - * \c operation_result::aborted if stop_local() was called. + * \retval \c operation_result::succeeded if the operation is successful + * \retval \c operation_result::no_space if \c overflow_policy::fail_on_overflow is in effect and the queue is full + * \retval \c operation_result::aborted if the call was interrupted by stop_local() * * Throws: std::logic_error in case if the message size exceeds the queue * capacity, system_error in case if a native OS method fails. */ - BOOST_LOG_API operation_result send(void const* message_data, uint32_t message_size); + BOOST_LOG_API operation_result send(void const* message_data, size_type message_size); /*! * The method performs an attempt to send a message to the associated message queue. @@ -517,7 +524,7 @@ public: * Throws: std::logic_error in case if the message size exceeds the queue * capacity, system_error in case if a native OS method fails. */ - BOOST_LOG_API bool try_send(void const* message_data, uint32_t message_size); + BOOST_LOG_API bool try_send(void const* message_data, size_type message_size); /*! * The method takes a message from the associated message queue. When the object is in @@ -535,10 +542,10 @@ public: * \param buffer_size The size of the buffer, in bytes. * \param message_size Receives the size of the received message, in bytes. * - * \return \c operation_result::succeeded if the operation is successful, and \c operation_result::aborted - * if the call was interrupted by stop_local(). + * \retval \c operation_result::succeeded if the operation is successful + * \retval \c operation_result::aborted if the call was interrupted by stop_local() */ - operation_result receive(void* buffer, uint32_t buffer_size, uint32_t& message_size) + operation_result receive(void* buffer, size_type buffer_size, size_type& message_size) { fixed_buffer_state state = { static_cast< uint8_t* >(buffer), buffer_size }; operation_result result = do_receive(&reliable_message_queue::fixed_buffer_receive_handler, &state); @@ -561,16 +568,16 @@ public: * \param buffer The memory buffer to store the received message in. * \param message_size Receives the size of the received message, in bytes. * - * \return \c operation_result::succeeded if the operation is successful, and \c operation_result::aborted - * if the call was interrupted by stop_local(). + * \retval \c operation_result::succeeded if the operation is successful + * \retval \c operation_result::aborted if the call was interrupted by stop_local() */ - template< typename ElementT, uint32_t SizeV > + template< typename ElementT, size_type SizeV > #if !defined(BOOST_LOG_DOXYGEN_PASS) typename aux::enable_if_byte< ElementT, operation_result >::type #else operation_result #endif - receive(ElementT (&buffer)[SizeV], uint32_t& message_size) + receive(ElementT (&buffer)[SizeV], size_type& message_size) { return receive(buffer, SizeV, message_size); } @@ -591,7 +598,8 @@ public: * value type of char, signed char or unsigned char * and support inserting elements at the end. * - * \return \c true if the operation is successful, and \c false otherwise. + * \retval \c operation_result::succeeded if the operation is successful + * \retval \c operation_result::aborted if the call was interrupted by stop_local() */ template< typename ContainerT > #if !defined(BOOST_LOG_DOXYGEN_PASS) @@ -620,7 +628,7 @@ public: * \return \c true if a message is successfully received, and \c false otherwise (e.g., * when the queue is empty). */ - bool try_receive(void* buffer, uint32_t buffer_size, uint32_t& message_size) + bool try_receive(void* buffer, size_type buffer_size, size_type& message_size) { fixed_buffer_state state = { static_cast< uint8_t* >(buffer), buffer_size }; bool result = do_try_receive(&reliable_message_queue::fixed_buffer_receive_handler, &state); @@ -643,13 +651,13 @@ public: * \return \c true if a message is successfully received, and \c false otherwise (e.g., * when the queue is empty). */ - template< typename ElementT, uint32_t SizeV > + template< typename ElementT, size_type SizeV > #if !defined(BOOST_LOG_DOXYGEN_PASS) typename aux::enable_if_byte< ElementT, bool >::type #else bool #endif - try_receive(ElementT (&buffer)[SizeV], uint32_t& message_size) + try_receive(ElementT (&buffer)[SizeV], size_type& message_size) { return try_receive(buffer, SizeV, message_size); } @@ -742,10 +750,10 @@ private: BOOST_LOG_API bool do_try_receive(receive_handler handler, void* state); //! Fixed buffer receive handler - static BOOST_LOG_API void fixed_buffer_receive_handler(void* state, const void* data, uint32_t size); + static BOOST_LOG_API void fixed_buffer_receive_handler(void* state, const void* data, size_type size); //! Receive handler for a container template< typename ContainerT > - static void container_receive_handler(void* state, const void* data, uint32_t size) + static void container_receive_handler(void* state, const void* data, size_type size) { ContainerT* const container = static_cast< ContainerT* >(state); container->insert diff --git a/src/exceptions.cpp b/src/exceptions.cpp index 6a580f2..779b431 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -467,6 +467,22 @@ logic_error::~logic_error() throw() { } +void logic_error::throw_(const char* file, std::size_t line, const char* descr) +{ + boost::throw_exception(boost::enable_error_info(logic_error(descr)) + << boost::throw_file(file) + << boost::throw_line(line) + ); +} + +void logic_error::throw_(const char* file, std::size_t line, std::string const& descr) +{ + boost::throw_exception(boost::enable_error_info(logic_error(descr)) + << boost::throw_file(file) + << boost::throw_line(line) + ); +} + odr_violation::odr_violation() : logic_error("ODR violation detected") { diff --git a/src/posix/ipc_reliable_message_queue.cpp b/src/posix/ipc_reliable_message_queue.cpp index 7cdafcf..07d61fa 100644 --- a/src/posix/ipc_reliable_message_queue.cpp +++ b/src/posix/ipc_reliable_message_queue.cpp @@ -78,12 +78,12 @@ private: enum { data_alignment = 32u }; //! Size of the element data, in bytes - uint32_t m_size; + size_type m_size; //! Returns the block header overhead, in bytes - static BOOST_CONSTEXPR uint32_t get_header_overhead() BOOST_NOEXCEPT + static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT { - return boost::alignment::align_up(sizeof(block_header), data_alignment); + return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment)); } //! Returns a pointer to the element data @@ -110,7 +110,7 @@ private: //! Number of allocation blocks in the queue. const uint32_t m_capacity; //! Size of an allocation block, in bytes. - const uint32_t m_block_size; + const size_type m_block_size; //! Mutex for protecting queue data structures. boost::log::ipc::aux::interprocess_mutex m_mutex; //! Condition variable used to block readers when the queue is empty. @@ -124,7 +124,7 @@ private: //! The current reading position (allocation block index). uint32_t m_get_pos; - header(uint32_t capacity, uint32_t block_size) : + header(uint32_t capacity, size_type block_size) : m_abi_tag(get_abi_tag()), m_capacity(capacity), m_block_size(block_size), @@ -195,7 +195,7 @@ private: //! Queue overflow handling policy const overflow_policy m_overflow_policy; //! The mask for selecting bits that constitute size values from 0 to (block_size - 1) - uint32_t m_block_size_mask; + size_type m_block_size_mask; //! The number of the bit set in block_size (i.e. log base 2 of block_size) uint32_t m_block_size_log2; //! The flag indicates that stop has been requested @@ -211,7 +211,7 @@ public: open_mode::create_only_tag, object_name const& name, uint32_t capacity, - uint32_t block_size, + size_type block_size, overflow_policy oflow_policy, permissions const& perms ) : @@ -232,7 +232,7 @@ public: open_mode::open_or_create_tag, object_name const& name, uint32_t capacity, - uint32_t block_size, + size_type block_size, overflow_policy oflow_policy, permissions const& perms ) : @@ -288,19 +288,19 @@ public: return get_header()->m_capacity; } - uint32_t block_size() const BOOST_NOEXCEPT + size_type block_size() const BOOST_NOEXCEPT { return get_header()->m_block_size; } - operation_result send(void const* message_data, uint32_t message_size) + operation_result send(void const* message_data, size_type message_size) { const uint32_t block_count = estimate_block_count(message_size); header* const hdr = get_header(); if (BOOST_UNLIKELY(block_count > hdr->m_capacity)) - BOOST_THROW_EXCEPTION(logic_error("Message size exceeds the interprocess queue capacity")); + BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity"); if (m_stop) return aborted; @@ -316,25 +316,28 @@ public: if ((hdr->m_capacity - hdr->m_size) >= block_count) break; - if (BOOST_UNLIKELY(m_overflow_policy == throw_on_overflow)) - BOOST_THROW_EXCEPTION(capacity_limit_reached("Interprocess queue is full")); + const overflow_policy oflow_policy = m_overflow_policy; + if (oflow_policy == fail_on_overflow) + return no_space; + else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow)) + BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full"); hdr->m_nonfull_queue.wait(hdr->m_mutex); } - put_message(message_data, message_size, block_count); + enqueue_message(message_data, message_size, block_count); return succeeded; } - bool try_send(void const* message_data, uint32_t message_size) + bool try_send(void const* message_data, size_type message_size) { const uint32_t block_count = estimate_block_count(message_size); header* const hdr = get_header(); if (BOOST_UNLIKELY(block_count > hdr->m_capacity)) - BOOST_THROW_EXCEPTION(logic_error("Message size exceeds the interprocess queue capacity")); + BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity"); if (m_stop) return false; @@ -348,7 +351,7 @@ public: if ((hdr->m_capacity - hdr->m_size) < block_count) return false; - put_message(message_data, message_size, block_count); + enqueue_message(message_data, message_size, block_count); return true; } @@ -373,7 +376,7 @@ public: hdr->m_nonempty_queue.wait(hdr->m_mutex); } - get_message(handler, state); + dequeue_message(handler, state); return succeeded; } @@ -390,7 +393,7 @@ public: if (hdr->m_size == 0u) return false; - get_message(handler, state); + dequeue_message(handler, state); return true; } @@ -429,12 +432,12 @@ private: return static_cast< header* >(m_region.get_address()); } - static std::size_t estimate_region_size(uint32_t capacity, uint32_t block_size) BOOST_NOEXCEPT + static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT { return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size); } - void create_region(uint32_t capacity, uint32_t block_size) + void create_region(uint32_t capacity, size_type block_size) { const std::size_t shmem_size = estimate_region_size(capacity, block_size); m_shared_memory.truncate(shmem_size); @@ -527,7 +530,7 @@ private: } } - void init_block_size(uint32_t block_size) + void init_block_size(size_type block_size) { m_block_size_mask = block_size - 1u; @@ -597,25 +600,25 @@ private: } //! Returns the number of allocation blocks that are required to store user's payload of the specified size - uint32_t estimate_block_count(uint32_t size) const BOOST_NOEXCEPT + uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT { // ceil((size + get_header_overhead()) / block_size) - return (size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2; + return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2); } //! Puts the message to the back of the queue - void put_message(void const* message_data, uint32_t message_size, uint32_t block_count) + void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count) { header* const hdr = get_header(); const uint32_t capacity = hdr->m_capacity; - const uint32_t block_size = hdr->m_block_size; + const size_type block_size = hdr->m_block_size; uint32_t pos = hdr->m_put_pos; block_header* block = hdr->get_block(pos); block->m_size = message_size; - uint32_t write_size = (std::min)((capacity - pos) * block_size - block_header::get_header_overhead(), message_size); + size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size); std::memcpy(block->get_data(), message_data, write_size); pos += block_count; @@ -638,21 +641,21 @@ private: } //! Retrieves the next message and invokes the handler to store the message contents - void get_message(receive_handler handler, void* state) + void dequeue_message(receive_handler handler, void* state) { header* const hdr = get_header(); const uint32_t capacity = hdr->m_capacity; - const uint32_t block_size = hdr->m_block_size; + const size_type block_size = hdr->m_block_size; uint32_t pos = hdr->m_get_pos; block_header* block = hdr->get_block(pos); - uint32_t message_size = block->m_size; + size_type message_size = block->m_size; uint32_t block_count = estimate_block_count(message_size); BOOST_ASSERT(block_count <= hdr->m_size); - uint32_t read_size = (std::min)((capacity - pos) * block_size - block_header::get_header_overhead(), message_size); + size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size); handler(state, block->get_data(), read_size); pos += block_count; @@ -672,14 +675,14 @@ private: } }; -BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, uint32_t block_size, overflow_policy oflow_policy, permissions const& perms) +BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms) { BOOST_ASSERT(m_impl == NULL); if (!boost::log::aux::is_power_of_2(block_size)) BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2")); try { - m_impl = new implementation(open_mode::create_only, name, capacity, boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE), oflow_policy, perms); + m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms); } catch (boost::exception& e) { @@ -692,14 +695,14 @@ BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint3 } } -BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, uint32_t block_size, overflow_policy oflow_policy, permissions const& perms) +BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms) { BOOST_ASSERT(m_impl == NULL); if (!boost::log::aux::is_power_of_2(block_size)) BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2")); try { - m_impl = new implementation(open_mode::open_or_create, name, capacity, boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE), oflow_policy, perms); + m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms); } catch (boost::exception& e) { @@ -756,7 +759,7 @@ BOOST_LOG_API uint32_t reliable_message_queue::capacity() const return m_impl->capacity(); } -BOOST_LOG_API uint32_t reliable_message_queue::block_size() const +BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const { BOOST_ASSERT(m_impl != NULL); return m_impl->block_size(); @@ -796,7 +799,7 @@ BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT m_impl = NULL; } -BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, uint32_t message_size) +BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size) { BOOST_ASSERT(m_impl != NULL); try @@ -810,7 +813,7 @@ BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::s } } -BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, uint32_t message_size) +BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size) { BOOST_ASSERT(m_impl != NULL); try @@ -853,7 +856,7 @@ BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handle } //! Fixed buffer receive handler -BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, uint32_t size) +BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size) { fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state); if (BOOST_UNLIKELY(size > p->size)) diff --git a/src/windows/ipc_reliable_message_queue.cpp b/src/windows/ipc_reliable_message_queue.cpp index 7ccf91e..4175f24 100644 --- a/src/windows/ipc_reliable_message_queue.cpp +++ b/src/windows/ipc_reliable_message_queue.cpp @@ -72,12 +72,12 @@ private: enum { data_alignment = 32u }; //! Size of the element data, in bytes - uint32_t m_size; + size_type m_size; //! Returns the block header overhead, in bytes - static BOOST_CONSTEXPR uint32_t get_header_overhead() BOOST_NOEXCEPT + static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT { - return static_cast< uint32_t >(boost::alignment::align_up(sizeof(block_header), data_alignment)); + return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment)); } //! Returns a pointer to the element data @@ -104,7 +104,7 @@ private: //! Number of allocation blocks in the queue. const uint32_t m_capacity; //! Size of an allocation block, in bytes. - const uint32_t m_block_size; + const size_type m_block_size; //! Shared state of the mutex for protecting queue data structures. boost::log::ipc::aux::interprocess_mutex::shared_state m_mutex_state; //! Shared state of the condition variable used to block writers when the queue is full. @@ -116,7 +116,7 @@ private: //! The current reading position (allocation block index). uint32_t m_get_pos; - header(uint32_t capacity, uint32_t block_size) : + header(uint32_t capacity, size_type block_size) : m_abi_tag(get_abi_tag()), m_capacity(capacity), m_block_size(block_size), @@ -184,7 +184,7 @@ private: //! Queue overflow handling policy const overflow_policy m_overflow_policy; //! The mask for selecting bits that constitute size values from 0 to (block_size - 1) - uint32_t m_block_size_mask; + size_type m_block_size_mask; //! The number of the bit set in block_size (i.e. log base 2 of block_size) uint32_t m_block_size_log2; @@ -207,7 +207,7 @@ public: open_mode::create_only_tag, object_name const& name, uint32_t capacity, - uint32_t block_size, + size_type block_size, overflow_policy oflow_policy, permissions const& perms ) : @@ -230,7 +230,7 @@ public: open_mode::open_or_create_tag, object_name const& name, uint32_t capacity, - uint32_t block_size, + size_type block_size, overflow_policy oflow_policy, permissions const& perms ) : @@ -280,19 +280,19 @@ public: return get_header()->m_capacity; } - uint32_t block_size() const BOOST_NOEXCEPT + size_type block_size() const BOOST_NOEXCEPT { return get_header()->m_block_size; } - operation_result send(void const* message_data, uint32_t message_size) + operation_result send(void const* message_data, size_type message_size) { const uint32_t block_count = estimate_block_count(message_size); header* const hdr = get_header(); if (BOOST_UNLIKELY(block_count > hdr->m_capacity)) - BOOST_THROW_EXCEPTION(logic_error("Message size exceeds the interprocess queue capacity")); + BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity"); if (!lock_queue()) return aborted; @@ -304,26 +304,29 @@ public: if ((hdr->m_capacity - hdr->m_size) >= block_count) break; - if (BOOST_UNLIKELY(m_overflow_policy == throw_on_overflow)) - BOOST_THROW_EXCEPTION(capacity_limit_reached("Interprocess queue is full")); + const overflow_policy oflow_policy = m_overflow_policy; + if (oflow_policy == fail_on_overflow) + return no_space; + else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow)) + BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full"); if (!m_nonfull_queue.wait(unlock, m_stop.get())) return aborted; } - put_message(message_data, message_size, block_count); + enqueue_message(message_data, message_size, block_count); return succeeded; } - bool try_send(void const* message_data, uint32_t message_size) + bool try_send(void const* message_data, size_type message_size) { const uint32_t block_count = estimate_block_count(message_size); header* const hdr = get_header(); if (BOOST_UNLIKELY(block_count > hdr->m_capacity)) - BOOST_THROW_EXCEPTION(logic_error("Message size exceeds the interprocess queue capacity")); + BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity"); if (!lock_queue()) return aborted; @@ -333,7 +336,7 @@ public: if ((hdr->m_capacity - hdr->m_size) < block_count) return false; - put_message(message_data, message_size, block_count); + enqueue_message(message_data, message_size, block_count); return true; } @@ -361,7 +364,7 @@ public: unlock.engage(m_mutex); } - get_message(handler, state); + dequeue_message(handler, state); return succeeded; } @@ -377,7 +380,7 @@ public: if (hdr->m_size == 0u) return false; - get_message(handler, state); + dequeue_message(handler, state); return true; } @@ -405,7 +408,7 @@ private: return static_cast< header* >(m_shared_memory.address()); } - static std::size_t estimate_region_size(uint32_t capacity, uint32_t block_size) BOOST_NOEXCEPT + static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT { return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size); } @@ -438,7 +441,7 @@ private: m_stop.init(h); } - void create_queue(std::wstring const& name, uint32_t capacity, uint32_t block_size, permissions const& perms) + void create_queue(std::wstring const& name, uint32_t capacity, size_type block_size, permissions const& perms) { // Initialize synchronization primitives before initializing the header as the openers will wait for it to be initialized header* const hdr = get_header(); @@ -499,7 +502,7 @@ private: init_block_size(hdr->m_block_size); } - void init_block_size(uint32_t block_size) + void init_block_size(size_type block_size) { m_block_size_mask = block_size - 1u; @@ -546,25 +549,25 @@ private: } //! Returns the number of allocation blocks that are required to store user's payload of the specified size - uint32_t estimate_block_count(uint32_t size) const BOOST_NOEXCEPT + uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT { // ceil((size + get_header_overhead()) / block_size) - return (size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2; + return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2); } //! Puts the message to the back of the queue - void put_message(void const* message_data, uint32_t message_size, uint32_t block_count) + void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count) { header* const hdr = get_header(); const uint32_t capacity = hdr->m_capacity; - const uint32_t block_size = hdr->m_block_size; + const size_type block_size = hdr->m_block_size; uint32_t pos = hdr->m_put_pos; block_header* block = hdr->get_block(pos); block->m_size = message_size; - uint32_t write_size = (std::min)((capacity - pos) * block_size - block_header::get_header_overhead(), message_size); + size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size); std::memcpy(block->get_data(), message_data, write_size); pos += block_count; @@ -587,21 +590,21 @@ private: } //! Retrieves the next message and invokes the handler to store the message contents - void get_message(receive_handler handler, void* state) + void dequeue_message(receive_handler handler, void* state) { header* const hdr = get_header(); const uint32_t capacity = hdr->m_capacity; - const uint32_t block_size = hdr->m_block_size; + const size_type block_size = hdr->m_block_size; uint32_t pos = hdr->m_get_pos; block_header* block = hdr->get_block(pos); - uint32_t message_size = block->m_size; + size_type message_size = block->m_size; uint32_t block_count = estimate_block_count(message_size); BOOST_ASSERT(block_count <= hdr->m_size); - uint32_t read_size = (std::min)((capacity - pos) * block_size - block_header::get_header_overhead(), message_size); + size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size); handler(state, block->get_data(), read_size); pos += block_count; @@ -621,7 +624,7 @@ private: } }; -BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, uint32_t block_size, overflow_policy oflow_policy, permissions const& perms) +BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms) { BOOST_ASSERT(m_impl == NULL); if (!boost::log::aux::is_power_of_2(block_size)) @@ -637,14 +640,14 @@ BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint3 } } -BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, uint32_t block_size, overflow_policy oflow_policy, permissions const& perms) +BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms) { BOOST_ASSERT(m_impl == NULL); if (!boost::log::aux::is_power_of_2(block_size)) BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2")); try { - m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< uint32_t >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms); + m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms); } catch (boost::exception& e) { @@ -693,7 +696,7 @@ BOOST_LOG_API uint32_t reliable_message_queue::capacity() const return m_impl->capacity(); } -BOOST_LOG_API uint32_t reliable_message_queue::block_size() const +BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const { BOOST_ASSERT(m_impl != NULL); return m_impl->block_size(); @@ -733,7 +736,7 @@ BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT m_impl = NULL; } -BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, uint32_t message_size) +BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size) { BOOST_ASSERT(m_impl != NULL); try @@ -747,7 +750,7 @@ BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::s } } -BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, uint32_t message_size) +BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size) { BOOST_ASSERT(m_impl != NULL); try @@ -790,7 +793,7 @@ BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handle } //! Fixed buffer receive handler -BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, uint32_t size) +BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size) { fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state); if (BOOST_UNLIKELY(size > p->size)) diff --git a/test/run/util_ipc_reliable_mq.cpp b/test/run/util_ipc_reliable_mq.cpp index 686b93a..3bff870 100644 --- a/test/run/util_ipc_reliable_mq.cpp +++ b/test/run/util_ipc_reliable_mq.cpp @@ -209,6 +209,17 @@ BOOST_AUTO_TEST_CASE(message_passing) BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0); } + // send() with an error code on overflow + { + queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::fail_on_overflow); + BOOST_TEST_PASSPOINT(); + BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded); + BOOST_TEST_PASSPOINT(); + + queue_t::operation_result res = queue_a.send(message1, sizeof(message1) - 1u); + BOOST_CHECK_EQUAL(res, queue_t::no_space); + } + // send() with an exception on overflow { queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::throw_on_overflow);