diff --git a/include/boost/redis/config.hpp b/include/boost/redis/config.hpp index ed8b419f..06c81002 100644 --- a/include/boost/redis/config.hpp +++ b/include/boost/redis/config.hpp @@ -88,12 +88,20 @@ struct config { */ std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1}; - /** @brief Maximum size of a socket read, in bytes. + /** @brief Maximum size of the read-buffer in bytes. * * Sets a limit on how much data is allowed to be read into the * read buffer. It can be used to prevent DDOS. */ std::size_t max_read_size = (std::numeric_limits::max)(); + + /** @brief read_buffer_append_size + * + * The size by which the read buffer grows when more space is + * needed. There is no need to set this too high because memory is + * reused and the growth will tend to zero. + */ + std::size_t read_buffer_append_size = 4096; }; } // namespace boost::redis diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 17c151a5..ac078665 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -57,56 +57,6 @@ namespace boost::redis { namespace detail { -template -class append_some_op { -private: - AsyncReadStream& stream_; - DynamicBuffer buf_; - std::size_t size_ = 0; - std::size_t tmp_ = 0; - asio::coroutine coro_{}; - -public: - append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size) - : stream_{stream} - , buf_{std::move(buf)} - , size_{size} - { } - - template - void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) - { - BOOST_ASIO_CORO_REENTER(coro_) - { - tmp_ = buf_.size(); - buf_.grow(size_); - - BOOST_ASIO_CORO_YIELD - stream_.async_read_some(buf_.data(tmp_, size_), std::move(self)); - if (ec) { - self.complete(ec, 0); - return; - } - - buf_.shrink(buf_.size() - tmp_ - n); - self.complete({}, n); - } - } -}; - -template -auto async_append_some( - AsyncReadStream& stream, - DynamicBuffer buffer, - std::size_t size, - CompletionToken&& token) -{ - return asio::async_compose( - append_some_op{stream, buffer, size}, - token, - stream); -} - template using exec_notifier_type = asio::experimental::channel< Executor, @@ -209,33 +159,18 @@ struct writer_op { template struct reader_op { - using dyn_buffer_type = asio::dynamic_string_buffer< - char, - std::char_traits, - std::allocator>; - - // TODO: Move this to config so the user can fine tune? - static constexpr std::size_t buffer_growth_hint = 4096; - Conn* conn_; - detail::reader_fsm fsm_; public: reader_op(Conn& conn) noexcept : conn_{&conn} - , fsm_{conn.mpx_} { } template void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) { - using dyn_buffer_type = asio::dynamic_string_buffer< - char, - std::char_traits, - std::allocator>; - for (;;) { - auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); + auto act = conn_->read_fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); conn_->logger_.on_fsm_resume(act); @@ -245,11 +180,10 @@ public: continue; case reader_fsm::action::type::needs_more: case reader_fsm::action::type::append_some: - async_append_some( - conn_->stream_, - dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size}, - conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint), - std::move(self)); + { + auto const buf = conn_->read_fsm_.get_append_buffer(); + conn_->stream_.async_read_some(asio::buffer(buf), std::move(self)); + } return; case reader_fsm::action::type::notify_push_receiver: if (conn_->receive_channel_.try_send(ec, act.push_size_)) { @@ -343,6 +277,7 @@ public: // If we were successful, run all the connection tasks if (!ec) { conn_->mpx_.reset(); + conn_->read_fsm_.reset(); // Note: Order is important here because the writer might // trigger an async_write before the async_hello thereby @@ -450,6 +385,7 @@ public: , reconnect_timer_{ex} , receive_channel_{ex, 256} , health_checker_{ex} + , read_fsm_{mpx_} , logger_{std::move(lgr)} { set_receive_response(ignore); @@ -553,6 +489,7 @@ public: cfg_ = cfg; health_checker_.set_config(cfg); handshaker_.set_config(cfg); + read_fsm_.set_config({cfg_.read_buffer_append_size, cfg_.max_read_size}); return asio::async_compose( detail::run_op{this}, @@ -951,6 +888,7 @@ private: config cfg_; detail::multiplexer mpx_; + detail::reader_fsm read_fsm_; detail::connection_logger logger_; }; diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index 8b01409e..fde726f3 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -8,13 +8,13 @@ #define BOOST_REDIS_MULTIPLEXER_HPP #include -#include -#include -#include +#include +#include +#include #include #include -#include +#include #include #include @@ -32,7 +32,8 @@ namespace detail { using tribool = std::optional; -struct multiplexer { +class multiplexer { +public: using adapter_type = std::function; using pipeline_adapter_type = std::function< void(std::size_t, resp3::node_view const&, system::error_code&)>; @@ -127,7 +128,8 @@ struct multiplexer { // If the tribool contains no value more data is needed, otherwise // if the value is true the message consumed is a push. [[nodiscard]] - auto consume_next(system::error_code& ec) -> std::pair; + auto consume_next(std::string_view data, system::error_code& ec) + -> std::pair; auto add(std::shared_ptr const& ptr) -> void; auto reset() -> void; @@ -156,18 +158,6 @@ struct multiplexer { return std::string_view{write_buffer_}; } - [[nodiscard]] - auto get_read_buffer() noexcept -> std::string& - { - return read_buffer_; - } - - [[nodiscard]] - auto get_read_buffer() const noexcept -> std::string const& - { - return read_buffer_; - } - // TODO: Change signature to receive an adapter instead of a // response. template @@ -191,17 +181,18 @@ private: [[nodiscard]] auto is_waiting_response() const noexcept -> bool; - [[nodiscard]] - auto on_finish_parsing(bool is_push) -> std::size_t; + void commit_usage(bool is_push, std::size_t size); [[nodiscard]] - auto is_next_push() const noexcept -> bool; + auto is_next_push(std::string_view data) const noexcept -> bool; // Releases the number of requests that have been released. [[nodiscard]] auto release_push_requests() -> std::size_t; - std::string read_buffer_; + [[nodiscard]] + tribool consume_next_impl(std::string_view data, system::error_code& ec); + std::string write_buffer_; std::deque> reqs_; resp3::parser parser_{}; diff --git a/include/boost/redis/detail/read_buffer.hpp b/include/boost/redis/detail/read_buffer.hpp new file mode 100644 index 00000000..cad248ec --- /dev/null +++ b/include/boost/redis/detail/read_buffer.hpp @@ -0,0 +1,57 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_READ_BUFFER_HPP +#define BOOST_REDIS_READ_BUFFER_HPP + +#include + +#include +#include +#include +#include + +namespace boost::redis::detail { + +class read_buffer { +public: + using span_type = span; + + [[nodiscard]] + system::error_code prepare_append(std::size_t append_size, std::size_t max_buffer_size); + + void commit_append(std::size_t read_size); + + [[nodiscard]] + auto get_append_buffer() noexcept -> span_type; + + [[nodiscard]] + auto get_committed_buffer() const noexcept -> std::string_view; + + [[nodiscard]] + auto get_committed_size() const noexcept -> std::size_t; + + void clear(); + + // Consume committed data. + auto consume_committed(std::size_t size) -> std::size_t; + + void reserve(std::size_t n); + + friend + bool operator==(read_buffer const& lhs, read_buffer const& rhs); + + friend + bool operator!=(read_buffer const& lhs, read_buffer const& rhs); + +private: + std::vector buffer_; + std::size_t append_buf_begin_ = 0; +}; + +} // namespace boost::redis::detail + +#endif // BOOST_REDIS_READ_BUFFER_HPP diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index 575ee97f..09763366 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -18,6 +18,12 @@ namespace boost::redis::detail { class reader_fsm { public: + // See config.hpp for the meaning of these parameters. + struct config { + std::size_t read_buffer_append_size = 4096; + std::size_t max_read_size = -1; + }; + struct action { enum class type { @@ -41,8 +47,20 @@ public: system::error_code ec, asio::cancellation_type_t /*cancel_state*/); + void set_config(config const& cfg) noexcept { cfg_ = cfg; }; + + void reset(); + + [[nodiscard]] + auto get_append_buffer() noexcept + { + return read_buffer_.get_append_buffer(); + } + private: int resume_point_{0}; + read_buffer read_buffer_; + config cfg_; action action_after_resume_; action::type next_read_type_ = action::type::append_some; multiplexer* mpx_ = nullptr; diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 6ce83555..8008265f 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -88,6 +88,9 @@ enum class error /// The configuration specified UNIX sockets with SSL, which is not supported. unix_sockets_ssl_unsupported, + + /// The size of the read buffer would exceed it maximum configured value. + exceeds_maximum_read_buffer_size, }; /** diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index f1f61960..2c2d0b63 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -50,6 +50,8 @@ struct error_category_impl : system::error_category { "supported by the system."; case error::unix_sockets_ssl_unsupported: return "The configuration specified UNIX sockets with SSL, which is not supported."; + case error::exceeds_maximum_read_buffer_size: + return "The size of the read buffer would exceed it maximum configured value"; default: BOOST_ASSERT(false); return "Boost.Redis error."; } } diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp index 021a4e9f..a4007c2e 100644 --- a/include/boost/redis/impl/multiplexer.ipp +++ b/include/boost/redis/impl/multiplexer.ipp @@ -81,7 +81,7 @@ void multiplexer::add(std::shared_ptr const& info) } } -std::pair multiplexer::consume_next(system::error_code& ec) +tribool multiplexer::consume_next_impl(std::string_view data, system::error_code& ec) { // We arrive here in two states: // @@ -93,18 +93,16 @@ std::pair multiplexer::consume_next(system::error_code& ec // 2. On a new message, in which case we have to determine // whether the next messag is a push or a response. // + + BOOST_ASSERT(!data.empty()); if (!on_push_) // Prepare for new message. - on_push_ = is_next_push(); + on_push_ = is_next_push(data); if (on_push_) { - if (!resp3::parse(parser_, read_buffer_, receive_adapter_, ec)) - return std::make_pair(std::nullopt, 0); + if (!resp3::parse(parser_, data, receive_adapter_, ec)) + return std::nullopt; - if (ec) - return std::make_pair(std::make_optional(true), 0); - - auto const size = on_finish_parsing(true); - return std::make_pair(std::make_optional(true), size); + return std::make_optional(true); } BOOST_ASSERT_MSG( @@ -114,13 +112,13 @@ std::pair multiplexer::consume_next(system::error_code& ec BOOST_ASSERT(reqs_.front() != nullptr); BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0); - if (!resp3::parse(parser_, read_buffer_, reqs_.front()->get_adapter(), ec)) - return std::make_pair(std::nullopt, 0); + if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec)) + return std::nullopt; if (ec) { reqs_.front()->notify_error(ec); reqs_.pop_front(); - return std::make_pair(std::make_optional(false), 0); + return std::make_optional(false); } reqs_.front()->commit_response(parser_.get_consumed()); @@ -130,14 +128,31 @@ std::pair multiplexer::consume_next(system::error_code& ec reqs_.pop_front(); } - auto const size = on_finish_parsing(false); - return std::make_pair(std::make_optional(false), size); + return std::make_optional(false); +} + +std::pair multiplexer::consume_next( + std::string_view data, + system::error_code& ec) +{ + auto const ret = consume_next_impl(data, ec); + auto const consumed = parser_.get_consumed(); + if (ec) { + return std::make_pair(ret, consumed); + } + + if (ret.has_value()) { + parser_.reset(); + commit_usage(ret.value(), consumed); + return std::make_pair(ret, consumed); + } + + return std::make_pair(std::nullopt, consumed); } void multiplexer::reset() { write_buffer_.clear(); - read_buffer_.clear(); parser_.reset(); on_push_ = false; cancel_run_called_ = false; @@ -222,35 +237,29 @@ auto multiplexer::cancel_on_conn_lost() -> std::size_t return ret; } -std::size_t multiplexer::on_finish_parsing(bool is_push) +void multiplexer::commit_usage(bool is_push, std::size_t size) { if (is_push) { usage_.pushes_received += 1; - usage_.push_bytes_received += parser_.get_consumed(); + usage_.push_bytes_received += size; + on_push_ = false; } else { usage_.responses_received += 1; - usage_.response_bytes_received += parser_.get_consumed(); + usage_.response_bytes_received += size; } - - on_push_ = false; - read_buffer_.erase(0, parser_.get_consumed()); - auto const size = parser_.get_consumed(); - parser_.reset(); - return size; } -bool multiplexer::is_next_push() const noexcept +bool multiplexer::is_next_push(std::string_view data) const noexcept { - BOOST_ASSERT(!read_buffer_.empty()); - // Useful links to understand the heuristics below. // // - https://github.com/redis/redis/issues/11784 // - https://github.com/redis/redis/issues/6426 // - https://github.com/boostorg/redis/issues/170 - // The message's resp3 type is a push. - if (resp3::to_type(read_buffer_.front()) == resp3::type::push) + // Test if the message resp3 type is a push. + BOOST_ASSERT(!data.empty()); + if (resp3::to_type(data.front()) == resp3::type::push) return true; // This is non-push type and the requests queue is empty. I have diff --git a/include/boost/redis/impl/read_buffer.ipp b/include/boost/redis/impl/read_buffer.ipp new file mode 100644 index 00000000..0c18f857 --- /dev/null +++ b/include/boost/redis/impl/read_buffer.ipp @@ -0,0 +1,87 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include + +#include +#include + +#include + +namespace boost::redis::detail { + +system::error_code +read_buffer::prepare_append(std::size_t append_size, std::size_t max_buffer_size) +{ + BOOST_ASSERT(append_buf_begin_ == buffer_.size()); + + auto const new_size = append_buf_begin_ + append_size; + + if (new_size > max_buffer_size) { + return error::exceeds_maximum_read_buffer_size; + } + + buffer_.resize(new_size); + return {}; +} + +void read_buffer::commit_append(std::size_t read_size) +{ + BOOST_ASSERT(buffer_.size() >= (append_buf_begin_ + read_size)); + buffer_.resize(append_buf_begin_ + read_size); + append_buf_begin_ = buffer_.size(); +} + +auto read_buffer::get_append_buffer() noexcept -> span_type +{ + auto const size = buffer_.size(); + return make_span(buffer_.data() + append_buf_begin_, size - append_buf_begin_); +} + +auto read_buffer::get_committed_buffer() const noexcept -> std::string_view +{ + BOOST_ASSERT(!buffer_.empty()); + return {buffer_.data(), append_buf_begin_}; +} + +auto read_buffer::get_committed_size() const noexcept -> std::size_t { return append_buf_begin_; } + +void read_buffer::clear() +{ + buffer_.clear(); + append_buf_begin_ = 0; +} + +std::size_t read_buffer::consume_committed(std::size_t size) +{ + // Consumes only committed data. + if (size > append_buf_begin_) + size = append_buf_begin_; + + buffer_.erase(buffer_.begin(), buffer_.begin() + size); + BOOST_ASSERT(append_buf_begin_ >= size); + append_buf_begin_ -= size; + return size; +} + +void read_buffer::reserve(std::size_t n) +{ + buffer_.reserve(n); +} + +bool operator==(read_buffer const& lhs, read_buffer const& rhs) +{ + return + lhs.buffer_ == rhs.buffer_ && + lhs.append_buf_begin_ == rhs.append_buf_begin_; +} + +bool operator!=(read_buffer const& lhs, read_buffer const& rhs) +{ + return !(lhs == rhs); +} + +} // namespace boost::redis::detail diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index b248f315..73b03772 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -24,22 +24,32 @@ reader_fsm::action reader_fsm::resume( BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation) for (;;) { - BOOST_REDIS_YIELD(resume_point_, 2, next_read_type_) + ec = read_buffer_.prepare_append(cfg_.read_buffer_append_size, cfg_.max_read_size); + if (ec) { + action_after_resume_ = {action::type::done, 0, ec}; + BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run) + return action_after_resume_; + } + + BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_) + read_buffer_.commit_append(bytes_read); if (ec) { // TODO: If an error occurred but data was read (i.e. // bytes_read != 0) we should try to process that data and // deliver it to the user before calling cancel_run. action_after_resume_ = {action::type::done, bytes_read, ec}; - BOOST_REDIS_YIELD(resume_point_, 3, action::type::cancel_run) + BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run) return action_after_resume_; } next_read_type_ = action::type::append_some; - while (!mpx_->get_read_buffer().empty()) { - res_ = mpx_->consume_next(ec); + while (read_buffer_.get_committed_size() != 0) { + res_ = mpx_->consume_next(read_buffer_.get_committed_buffer(), ec); if (ec) { + // TODO: Perhaps log what has not been consumed to aid + // debugging. action_after_resume_ = {action::type::done, res_.second, ec}; - BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run) + BOOST_REDIS_YIELD(resume_point_, 5, action::type::cancel_run) return action_after_resume_; } @@ -48,6 +58,8 @@ reader_fsm::action reader_fsm::resume( break; } + read_buffer_.consume_committed(res_.second); + if (res_.first.value()) { BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second) if (ec) { @@ -71,4 +83,12 @@ reader_fsm::action reader_fsm::resume( return {action::type::done, 0, system::error_code()}; } +void reader_fsm::reset() +{ + resume_point_ = 0; + next_read_type_ = action::type::append_some; + res_ = {std::make_pair(std::nullopt, 0)}; + read_buffer_.clear(); +} + } // namespace boost::redis::detail diff --git a/include/boost/redis/resp3/impl/parser.ipp b/include/boost/redis/resp3/impl/parser.ipp index 0d126a57..c53920d5 100644 --- a/include/boost/redis/resp3/impl/parser.ipp +++ b/include/boost/redis/resp3/impl/parser.ipp @@ -34,17 +34,6 @@ void parser::reset() sizes_[0] = 2; // The sentinel must be more than 1. } -std::size_t parser::get_suggested_buffer_growth(std::size_t hint) const noexcept -{ - if (!bulk_expected()) - return hint; - - if (hint < bulk_length_ + 2) - return bulk_length_ + 2; - - return hint; -} - std::size_t parser::get_consumed() const noexcept { return consumed_; } bool parser::done() const noexcept diff --git a/include/boost/redis/resp3/parser.hpp b/include/boost/redis/resp3/parser.hpp index 4a95a984..52018074 100644 --- a/include/boost/redis/resp3/parser.hpp +++ b/include/boost/redis/resp3/parser.hpp @@ -67,8 +67,6 @@ public: [[nodiscard]] auto done() const noexcept -> bool; - auto get_suggested_buffer_growth(std::size_t hint) const noexcept -> std::size_t; - auto get_consumed() const noexcept -> std::size_t; auto consume(std::string_view view, system::error_code& ec) noexcept -> result; diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index f3a2ac4b..144b3a4a 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include diff --git a/test/common.cpp b/test/common.cpp index ceb6b35f..3e9b174b 100644 --- a/test/common.cpp +++ b/test/common.cpp @@ -50,7 +50,6 @@ boost::redis::config make_test_config() { boost::redis::config cfg; cfg.addr.host = get_server_hostname(); - cfg.max_read_size = 1000000; return cfg; } @@ -69,3 +68,10 @@ void run_coroutine_test(net::awaitable op, std::chrono::steady_clock::dura throw std::runtime_error("Coroutine test did not finish"); } #endif // BOOST_ASIO_HAS_CO_AWAIT + +void append_read_data(boost::redis::detail::reader_fsm& fsm, std::string_view data) +{ + auto const buffer = fsm.get_append_buffer(); + BOOST_ASSERT(data.size() <= buffer.size()); + std::copy(data.begin(), data.end(), buffer.begin()); +} diff --git a/test/common.hpp b/test/common.hpp index b322dc54..090eef95 100644 --- a/test/common.hpp +++ b/test/common.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -34,3 +35,5 @@ void run( boost::redis::config cfg = make_test_config(), boost::system::error_code ec = boost::asio::error::operation_aborted, boost::redis::operation op = boost::redis::operation::receive); + +void append_read_data(boost::redis::detail::reader_fsm& fsm, std::string_view data); diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp index 32b9a72b..7f3711c5 100644 --- a/test/test_exec_fsm.cpp +++ b/test/test_exec_fsm.cpp @@ -15,6 +15,8 @@ #include #include +#include "common.hpp" + #include #include #include @@ -117,8 +119,7 @@ void test_success() BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response // Simulate a successful read - mpx.get_read_buffer() = "$5\r\nhello\r\n"; - auto req_status = mpx.consume_next(ec); + auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ(ec, error_code()); BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed @@ -159,10 +160,9 @@ void test_parse_error() // The second field should be a number (rather than the empty string). // Note that although part of the buffer was consumed, the multiplexer // currently throws this information away. - mpx.get_read_buffer() = "*2\r\n$5\r\nhello\r\n:\r\n"; - auto req_status = mpx.consume_next(ec); + auto req_status = mpx.consume_next("*2\r\n$5\r\nhello\r\n:\r\n", ec); BOOST_TEST_EQ(ec, error::empty_field); - BOOST_TEST_EQ(req_status.second, 0u); + BOOST_TEST_EQ(req_status.second, 15u); BOOST_TEST_EQ(input.done_calls, 1u); // This will awaken the exec operation, and should complete the operation @@ -218,8 +218,7 @@ void test_not_connected() BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response // Simulate a successful read - mpx.get_read_buffer() = "$5\r\nhello\r\n"; - auto req_status = mpx.consume_next(ec); + auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ(ec, error_code()); BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed @@ -342,8 +341,7 @@ void test_cancel_notwaiting_notterminal() BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name); // Simulate a successful read - mpx.get_read_buffer() = "$5\r\nhello\r\n"; - auto req_status = mpx.consume_next(ec); + auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ_MSG(ec, error_code(), tc.name); BOOST_TEST_EQ_MSG(req_status.first.value(), false, tc.name); // it wasn't a push BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed diff --git a/test/test_low_level.cpp b/test/test_low_level.cpp index edd7cc4b..c50d2b0f 100644 --- a/test/test_low_level.cpp +++ b/test/test_low_level.cpp @@ -528,6 +528,7 @@ BOOST_AUTO_TEST_CASE(cover_error) check_error("boost.redis", boost::redis::error::sync_receive_push_failed); check_error("boost.redis", boost::redis::error::incompatible_node_depth); check_error("boost.redis", boost::redis::error::resp3_hello); + check_error("boost.redis", boost::redis::error::exceeds_maximum_read_buffer_size); } std::string get_type_as_str(boost::redis::resp3::type t) diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 24bf7c67..7ee2e5b0 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -11,9 +11,12 @@ #include #include #include +#include #define BOOST_TEST_MODULE conn_quit #include +#include "common.hpp" + #include #include @@ -30,6 +33,7 @@ using boost::redis::generic_response; using boost::redis::resp3::node; using boost::redis::resp3::to_string; using boost::redis::any_adapter; +using boost::system::error_code; BOOST_AUTO_TEST_CASE(low_level_sync_sans_io) { @@ -258,10 +262,8 @@ BOOST_AUTO_TEST_CASE(multiplexer_push) generic_response resp; mpx.set_receive_response(resp); - mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n"; - boost::system::error_code ec; - auto const ret = mpx.consume_next(ec); + auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec); BOOST_TEST(ret.first.value()); BOOST_CHECK_EQUAL(ret.second, 16u); @@ -282,16 +284,17 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) generic_response resp; mpx.set_receive_response(resp); + std::string msg; // Only part of the message. - mpx.get_read_buffer() = ">2\r\n+one\r"; + msg += ">2\r\n+one\r"; boost::system::error_code ec; - auto ret = mpx.consume_next(ec); + auto ret = mpx.consume_next(msg, ec); BOOST_TEST(!ret.first.has_value()); - mpx.get_read_buffer().append("\n+two\r\n"); - ret = mpx.consume_next(ec); + msg += "\n+two\r\n"; + ret = mpx.consume_next(msg, ec); BOOST_TEST(ret.first.value()); BOOST_CHECK_EQUAL(ret.second, 16u); @@ -378,20 +381,14 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline) BOOST_TEST(item2.done); BOOST_TEST(!item3.done); - // Simulates a socket read by putting some data in the read buffer. - mpx.get_read_buffer().append("+one\r\n"); - // Consumes the next message in the read buffer. boost::system::error_code ec; - auto const ret = mpx.consume_next(ec); + auto const ret = mpx.consume_next("+one\r\n", ec); // The read operation should have been successfull. BOOST_TEST(ret.first.has_value()); BOOST_TEST(ret.second != 0u); - // The read buffer should also be empty now - BOOST_TEST(mpx.get_read_buffer().empty()); - // The last request still did not get a response. BOOST_TEST(item1.done); BOOST_TEST(item2.done); @@ -399,3 +396,64 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline) // TODO: Check the first request was removed from the queue. } + +BOOST_AUTO_TEST_CASE(read_buffer_prepare_error) +{ + using boost::redis::detail::read_buffer; + + read_buffer buf; + + // Usual case, max size is bigger then requested size. + auto ec = buf.prepare_append(10, 10); + BOOST_TEST(!ec); + buf.commit_append(10); + + // Corner case, max size is equal to the requested size. + ec = buf.prepare_append(10, 20); + BOOST_TEST(!ec); + buf.commit_append(10); + buf.consume_committed(20); + + auto const tmp = buf; + + // Error case, max size is smaller to the requested size. + ec = buf.prepare_append(10, 9); + BOOST_TEST(ec == error_code{boost::redis::error::exceeds_maximum_read_buffer_size}); + + // Check that an error call has no side effects. + auto const res = buf == tmp; + BOOST_TEST(res); +} + +BOOST_AUTO_TEST_CASE(read_buffer_prepare_consume_only_committed_data) +{ + using boost::redis::detail::read_buffer; + + read_buffer buf; + + auto ec = buf.prepare_append(10, 10); + BOOST_TEST(!ec); + + // No data has been committed yet so nothing can be consummed. + BOOST_CHECK_EQUAL(buf.consume_committed(5), 0u); + + buf.commit_append(10); + + // All five bytes can be consumed. + BOOST_CHECK_EQUAL(buf.consume_committed(5), 5u); + + // Only the remaining five bytes can be consumed + BOOST_CHECK_EQUAL(buf.consume_committed(7), 5u); +} + +BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size) +{ + using boost::redis::detail::read_buffer; + + read_buffer buf; + + auto ec = buf.prepare_append(10, 10); + BOOST_TEST(!ec); + + BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u); +} diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 4aa0c430..d1a33133 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -12,6 +12,8 @@ #include #include +#include "common.hpp" + namespace net = boost::asio; namespace redis = boost::redis; using boost::system::error_code; @@ -51,13 +53,15 @@ void test_push() BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. - mpx.get_read_buffer().append(">1\r\n+msg1\r\n"); - mpx.get_read_buffer().append(">1\r\n+msg2 \r\n"); - mpx.get_read_buffer().append(">1\r\n+msg3 \r\n"); - auto const bytes_read = mpx.get_read_buffer().size(); + std::string const payload = + ">1\r\n+msg1\r\n" + ">1\r\n+msg2 \r\n" + ">1\r\n+msg3 \r\n"; + + append_read_data(fsm, payload); // Deliver the 1st push - act = fsm.resume(bytes_read, ec, cancellation_type_t::none); + act = fsm.resume(payload.size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, 11u); BOOST_TEST_EQ(act.ec_, error_code()); @@ -100,20 +104,20 @@ void test_read_needs_more() std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"}; // Passes the first part to the fsm. - mpx.get_read_buffer().append(msg[0]); + append_read_data(fsm, msg[0]); act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::needs_more); BOOST_TEST_EQ(act.ec_, error_code()); // Passes the second part to the fsm. - mpx.get_read_buffer().append(msg[1]); + append_read_data(fsm, msg[1]); act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::needs_more); BOOST_TEST_EQ(act.ec_, error_code()); // Passes the third and last part to the fsm, next it should ask us // to deliver the message. - mpx.get_read_buffer().append(msg[2]); + append_read_data(fsm, msg[2]); act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size()); @@ -141,16 +145,16 @@ void test_read_error() BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. - mpx.get_read_buffer().append(">1\r\n+msg1\r\n"); - auto const bytes_read = mpx.get_read_buffer().size(); + std::string const payload = ">1\r\n+msg1\r\n"; + append_read_data(fsm, payload); // Deliver the data - act = fsm.resume(bytes_read, {net::error::operation_aborted}, cancellation_type_t::none); + act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); BOOST_TEST_EQ(act.ec_, error_code()); // Finish - act = fsm.resume(bytes_read, ec, cancellation_type_t::none); + act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } @@ -171,16 +175,16 @@ void test_parse_error() BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. - mpx.get_read_buffer().append(">a\r\n"); - auto const bytes_read = mpx.get_read_buffer().size(); + std::string const payload = ">a\r\n"; + append_read_data(fsm, payload); // Deliver the data - act = fsm.resume(bytes_read, {}, cancellation_type_t::none); + act = fsm.resume(payload.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); BOOST_TEST_EQ(act.ec_, error_code()); // Finish - act = fsm.resume(bytes_read, {}, cancellation_type_t::none); + act = fsm.resume(0, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number}); } @@ -201,16 +205,16 @@ void test_push_deliver_error() BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. - mpx.get_read_buffer().append(">1\r\n+msg1\r\n"); - auto const bytes_read = mpx.get_read_buffer().size(); + std::string const payload = ">1\r\n+msg1\r\n"; + append_read_data(fsm, payload); // Deliver the data - act = fsm.resume(bytes_read, {}, cancellation_type_t::none); + act = fsm.resume(payload.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.ec_, error_code()); // Resumes from notifying a push with an error. - act = fsm.resume(bytes_read, net::error::operation_aborted, cancellation_type_t::none); + act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); // Finish @@ -219,10 +223,39 @@ void test_push_deliver_error() BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } +void test_max_read_buffer_size() +{ + multiplexer mpx; + generic_response resp; + mpx.set_receive_response(resp); + reader_fsm fsm{mpx}; + fsm.set_config({5, 7}); + error_code ec; + action act; + + // Initiate + act = fsm.resume(0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); + act = fsm.resume(0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::append_some); + + // Passes the first part to the fsm. + std::string const part1 = ">3\r\n"; + append_read_data(fsm, part1); + act = fsm.resume(part1.size(), {}, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::cancel_run); + BOOST_TEST_EQ(act.ec_, error_code()); + + act = fsm.resume({}, {}, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::done); + BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); +} + } // namespace int main() { + test_max_read_buffer_size(); test_push_deliver_error(); test_read_needs_more(); test_push();