2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Merge pull request #283 from boostorg/refactoring_clean_code

Removes async_append_some
This commit is contained in:
Marcelo
2025-07-22 20:51:50 +02:00
committed by GitHub
19 changed files with 411 additions and 193 deletions

View File

@@ -88,12 +88,21 @@ struct config {
*/
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};
/** @brief Maximum size of a socket read, in bytes.
/** @brief Maximum size of the socket read-buffer in bytes.
*
* Sets a limit on how much data is allowed to be read into the
* read buffer. It can be used to prevent DDOS.
*/
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
/** @brief read_buffer_append_size
*
* The size by which the read buffer grows when more space is
* needed. This can help avoiding some memory allocations. Once the
* maximum size is reached no more memory allocations are made
* since the buffer is reused.
*/
std::size_t read_buffer_append_size = 4096;
};
} // namespace boost::redis

View File

@@ -57,56 +57,6 @@
namespace boost::redis {
namespace detail {
template <class AsyncReadStream, class DynamicBuffer>
class append_some_op {
private:
AsyncReadStream& stream_;
DynamicBuffer buf_;
std::size_t size_ = 0;
std::size_t tmp_ = 0;
asio::coroutine coro_{};
public:
append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
: stream_{stream}
, buf_{std::move(buf)}
, size_{size}
{ }
template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
{
BOOST_ASIO_CORO_REENTER(coro_)
{
tmp_ = buf_.size();
buf_.grow(size_);
BOOST_ASIO_CORO_YIELD
stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
buf_.shrink(buf_.size() - tmp_ - n);
self.complete({}, n);
}
}
};
template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
auto async_append_some(
AsyncReadStream& stream,
DynamicBuffer buffer,
std::size_t size,
CompletionToken&& token)
{
return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
append_some_op<AsyncReadStream, DynamicBuffer>{stream, buffer, size},
token,
stream);
}
template <class Executor>
using exec_notifier_type = asio::experimental::channel<
Executor,
@@ -209,31 +159,18 @@ struct writer_op {
template <class Conn>
struct reader_op {
using dyn_buffer_type = asio::dynamic_string_buffer<
char,
std::char_traits<char>,
std::allocator<char>>;
// TODO: Move this to config so the user can fine tune?
static constexpr std::size_t buffer_growth_hint = 4096;
Conn* conn_;
detail::reader_fsm fsm_;
public:
reader_op(Conn& conn) noexcept
: conn_{&conn}
, fsm_{conn.mpx_}
, fsm_{conn.read_buffer_, conn.mpx_}
{ }
template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
{
using dyn_buffer_type = asio::dynamic_string_buffer<
char,
std::char_traits<char>,
std::allocator<char>>;
for (;;) {
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
@@ -245,11 +182,10 @@ public:
continue;
case reader_fsm::action::type::needs_more:
case reader_fsm::action::type::append_some:
async_append_some(
conn_->stream_,
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
std::move(self));
{
auto const buf = conn_->read_buffer_.get_append_buffer();
conn_->stream_.async_read_some(asio::buffer(buf), std::move(self));
}
return;
case reader_fsm::action::type::notify_push_receiver:
if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
@@ -342,6 +278,7 @@ public:
// If we were successful, run all the connection tasks
if (!ec) {
conn_->read_buffer_.clear();
conn_->mpx_.reset();
// Note: Order is important here because the writer might
@@ -553,6 +490,11 @@ public:
cfg_ = cfg;
health_checker_.set_config(cfg);
handshaker_.set_config(cfg);
read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size});
// Reserve some memory to avoid excessive memory allocations in
// the first reads.
read_buffer_.reserve(4048u);
return asio::async_compose<CompletionToken, void(system::error_code)>(
detail::run_op<this_type>{this},
@@ -950,6 +892,7 @@ private:
resp3_handshaker_type handshaker_;
config cfg_;
detail::read_buffer read_buffer_;
detail::multiplexer mpx_;
detail::connection_logger logger_;
};

View File

@@ -8,13 +8,13 @@
#define BOOST_REDIS_MULTIPLEXER_HPP
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/parser.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/usage.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/system/error_code.hpp>
#include <algorithm>
#include <deque>
@@ -32,7 +32,8 @@ namespace detail {
using tribool = std::optional<bool>;
struct multiplexer {
class multiplexer {
public:
using adapter_type = std::function<void(resp3::node_view const&, system::error_code&)>;
using pipeline_adapter_type = std::function<
void(std::size_t, resp3::node_view const&, system::error_code&)>;
@@ -127,7 +128,8 @@ struct multiplexer {
// If the tribool contains no value more data is needed, otherwise
// if the value is true the message consumed is a push.
[[nodiscard]]
auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;
auto consume_next(std::string_view data, system::error_code& ec)
-> std::pair<tribool, std::size_t>;
auto add(std::shared_ptr<elem> const& ptr) -> void;
auto reset() -> void;
@@ -156,18 +158,6 @@ struct multiplexer {
return std::string_view{write_buffer_};
}
[[nodiscard]]
auto get_read_buffer() noexcept -> std::string&
{
return read_buffer_;
}
[[nodiscard]]
auto get_read_buffer() const noexcept -> std::string const&
{
return read_buffer_;
}
// TODO: Change signature to receive an adapter instead of a
// response.
template <class Response>
@@ -191,17 +181,18 @@ private:
[[nodiscard]]
auto is_waiting_response() const noexcept -> bool;
[[nodiscard]]
auto on_finish_parsing(bool is_push) -> std::size_t;
void commit_usage(bool is_push, std::size_t size);
[[nodiscard]]
auto is_next_push() const noexcept -> bool;
auto is_next_push(std::string_view data) const noexcept -> bool;
// Releases the number of requests that have been released.
[[nodiscard]]
auto release_push_requests() -> std::size_t;
std::string read_buffer_;
[[nodiscard]]
tribool consume_next_impl(std::string_view data, system::error_code& ec);
std::string write_buffer_;
std::deque<std::shared_ptr<elem>> reqs_;
resp3::parser parser_{};

View File

@@ -0,0 +1,65 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_READ_BUFFER_HPP
#define BOOST_REDIS_READ_BUFFER_HPP
#include <boost/core/span.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <string_view>
#include <utility>
#include <vector>
namespace boost::redis::detail {
class read_buffer {
public:
using span_type = span<char>;
// See config.hpp for the meaning of these parameters.
struct config {
std::size_t read_buffer_append_size = 4096u;
std::size_t max_read_size = static_cast<std::size_t>(-1);
};
[[nodiscard]]
auto prepare_append() -> system::error_code;
[[nodiscard]]
auto get_append_buffer() noexcept -> span_type;
void commit_append(std::size_t read_size);
[[nodiscard]]
auto get_committed_buffer() const noexcept -> std::string_view;
[[nodiscard]]
auto get_committed_size() const noexcept -> std::size_t;
void clear();
// Consume committed data.
auto consume_committed(std::size_t size) -> std::size_t;
void reserve(std::size_t n);
friend bool operator==(read_buffer const& lhs, read_buffer const& rhs);
friend bool operator!=(read_buffer const& lhs, read_buffer const& rhs);
void set_config(config const& cfg) noexcept { cfg_ = cfg; };
private:
config cfg_ = config{};
std::vector<char> buffer_;
std::size_t append_buf_begin_ = 0;
};
} // namespace boost::redis::detail
#endif // BOOST_REDIS_READ_BUFFER_HPP

View File

@@ -6,7 +6,6 @@
#ifndef BOOST_REDIS_READER_FSM_HPP
#define BOOST_REDIS_READER_FSM_HPP
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/asio/cancellation_type.hpp>
@@ -16,6 +15,8 @@
namespace boost::redis::detail {
class read_buffer;
class reader_fsm {
public:
struct action {
@@ -30,11 +31,11 @@ public:
};
type type_ = type::setup_cancellation;
std::size_t push_size_ = 0;
std::size_t push_size_ = 0u;
system::error_code ec_ = {};
};
explicit reader_fsm(multiplexer& mpx) noexcept;
explicit reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept;
action resume(
std::size_t bytes_read,
@@ -43,6 +44,7 @@ public:
private:
int resume_point_{0};
read_buffer* read_buffer_ = nullptr;
action action_after_resume_;
action::type next_read_type_ = action::type::append_some;
multiplexer* mpx_ = nullptr;

View File

@@ -88,6 +88,9 @@ enum class error
/// The configuration specified UNIX sockets with SSL, which is not supported.
unix_sockets_ssl_unsupported,
/// Reading data from the socket would exceed the maximum size allowed of the read buffer.
exceeds_maximum_read_buffer_size,
};
/**

View File

@@ -50,6 +50,9 @@ struct error_category_impl : system::error_category {
"supported by the system.";
case error::unix_sockets_ssl_unsupported:
return "The configuration specified UNIX sockets with SSL, which is not supported.";
case error::exceeds_maximum_read_buffer_size:
return "Reading data from the socket would exceed the maximum size allowed of the read "
"buffer.";
default: BOOST_ASSERT(false); return "Boost.Redis error.";
}
}

View File

@@ -81,7 +81,7 @@ void multiplexer::add(std::shared_ptr<elem> const& info)
}
}
std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec)
tribool multiplexer::consume_next_impl(std::string_view data, system::error_code& ec)
{
// We arrive here in two states:
//
@@ -93,18 +93,16 @@ std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec
// 2. On a new message, in which case we have to determine
// whether the next messag is a push or a response.
//
BOOST_ASSERT(!data.empty());
if (!on_push_) // Prepare for new message.
on_push_ = is_next_push();
on_push_ = is_next_push(data);
if (on_push_) {
if (!resp3::parse(parser_, read_buffer_, receive_adapter_, ec))
return std::make_pair(std::nullopt, 0);
if (!resp3::parse(parser_, data, receive_adapter_, ec))
return std::nullopt;
if (ec)
return std::make_pair(std::make_optional(true), 0);
auto const size = on_finish_parsing(true);
return std::make_pair(std::make_optional(true), size);
return std::make_optional(true);
}
BOOST_ASSERT_MSG(
@@ -114,13 +112,13 @@ std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec
BOOST_ASSERT(reqs_.front() != nullptr);
BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0);
if (!resp3::parse(parser_, read_buffer_, reqs_.front()->get_adapter(), ec))
return std::make_pair(std::nullopt, 0);
if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec))
return std::nullopt;
if (ec) {
reqs_.front()->notify_error(ec);
reqs_.pop_front();
return std::make_pair(std::make_optional(false), 0);
return std::make_optional(false);
}
reqs_.front()->commit_response(parser_.get_consumed());
@@ -130,14 +128,31 @@ std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec
reqs_.pop_front();
}
auto const size = on_finish_parsing(false);
return std::make_pair(std::make_optional(false), size);
return std::make_optional(false);
}
std::pair<tribool, std::size_t> multiplexer::consume_next(
std::string_view data,
system::error_code& ec)
{
auto const ret = consume_next_impl(data, ec);
auto const consumed = parser_.get_consumed();
if (ec) {
return std::make_pair(ret, consumed);
}
if (ret.has_value()) {
parser_.reset();
commit_usage(ret.value(), consumed);
return std::make_pair(ret, consumed);
}
return std::make_pair(std::nullopt, consumed);
}
void multiplexer::reset()
{
write_buffer_.clear();
read_buffer_.clear();
parser_.reset();
on_push_ = false;
cancel_run_called_ = false;
@@ -222,35 +237,29 @@ auto multiplexer::cancel_on_conn_lost() -> std::size_t
return ret;
}
std::size_t multiplexer::on_finish_parsing(bool is_push)
void multiplexer::commit_usage(bool is_push, std::size_t size)
{
if (is_push) {
usage_.pushes_received += 1;
usage_.push_bytes_received += parser_.get_consumed();
usage_.push_bytes_received += size;
on_push_ = false;
} else {
usage_.responses_received += 1;
usage_.response_bytes_received += parser_.get_consumed();
usage_.response_bytes_received += size;
}
on_push_ = false;
read_buffer_.erase(0, parser_.get_consumed());
auto const size = parser_.get_consumed();
parser_.reset();
return size;
}
bool multiplexer::is_next_push() const noexcept
bool multiplexer::is_next_push(std::string_view data) const noexcept
{
BOOST_ASSERT(!read_buffer_.empty());
// Useful links to understand the heuristics below.
//
// - https://github.com/redis/redis/issues/11784
// - https://github.com/redis/redis/issues/6426
// - https://github.com/boostorg/redis/issues/170
// The message's resp3 type is a push.
if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
// Test if the message resp3 type is a push.
BOOST_ASSERT(!data.empty());
if (resp3::to_type(data.front()) == resp3::type::push)
return true;
// This is non-push type and the requests queue is empty. I have

View File

@@ -0,0 +1,79 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/assert.hpp>
#include <boost/core/make_span.hpp>
#include <utility>
namespace boost::redis::detail {
system::error_code read_buffer::prepare_append()
{
BOOST_ASSERT(append_buf_begin_ == buffer_.size());
auto const new_size = append_buf_begin_ + cfg_.read_buffer_append_size;
if (new_size > cfg_.max_read_size) {
return error::exceeds_maximum_read_buffer_size;
}
buffer_.resize(new_size);
return {};
}
void read_buffer::commit_append(std::size_t read_size)
{
BOOST_ASSERT(buffer_.size() >= (append_buf_begin_ + read_size));
buffer_.resize(append_buf_begin_ + read_size);
append_buf_begin_ = buffer_.size();
}
auto read_buffer::get_append_buffer() noexcept -> span_type
{
auto const size = buffer_.size();
return make_span(buffer_.data() + append_buf_begin_, size - append_buf_begin_);
}
auto read_buffer::get_committed_buffer() const noexcept -> std::string_view
{
BOOST_ASSERT(!buffer_.empty());
return {buffer_.data(), append_buf_begin_};
}
auto read_buffer::get_committed_size() const noexcept -> std::size_t { return append_buf_begin_; }
void read_buffer::clear()
{
buffer_.clear();
append_buf_begin_ = 0;
}
std::size_t read_buffer::consume_committed(std::size_t size)
{
// For convenience, if the requested size is larger than the
// committed buffer we cap it to the maximum.
if (size > append_buf_begin_)
size = append_buf_begin_;
buffer_.erase(buffer_.begin(), buffer_.begin() + size);
BOOST_ASSERT(append_buf_begin_ >= size);
append_buf_begin_ -= size;
return size;
}
void read_buffer::reserve(std::size_t n) { buffer_.reserve(n); }
bool operator==(read_buffer const& lhs, read_buffer const& rhs)
{
return lhs.buffer_ == rhs.buffer_ && lhs.append_buf_begin_ == rhs.append_buf_begin_;
}
bool operator!=(read_buffer const& lhs, read_buffer const& rhs) { return !(lhs == rhs); }
} // namespace boost::redis::detail

View File

@@ -6,12 +6,14 @@
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
namespace boost::redis::detail {
reader_fsm::reader_fsm(multiplexer& mpx) noexcept
: mpx_{&mpx}
reader_fsm::reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept
: read_buffer_{&rbuf}
, mpx_{&mpx}
{ }
reader_fsm::action reader_fsm::resume(
@@ -24,22 +26,32 @@ reader_fsm::action reader_fsm::resume(
BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation)
for (;;) {
BOOST_REDIS_YIELD(resume_point_, 2, next_read_type_)
ec = read_buffer_->prepare_append();
if (ec) {
action_after_resume_ = {action::type::done, 0, ec};
BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run)
return action_after_resume_;
}
BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_)
read_buffer_->commit_append(bytes_read);
if (ec) {
// TODO: If an error occurred but data was read (i.e.
// bytes_read != 0) we should try to process that data and
// deliver it to the user before calling cancel_run.
action_after_resume_ = {action::type::done, bytes_read, ec};
BOOST_REDIS_YIELD(resume_point_, 3, action::type::cancel_run)
BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run)
return action_after_resume_;
}
next_read_type_ = action::type::append_some;
while (!mpx_->get_read_buffer().empty()) {
res_ = mpx_->consume_next(ec);
while (read_buffer_->get_committed_size() != 0) {
res_ = mpx_->consume_next(read_buffer_->get_committed_buffer(), ec);
if (ec) {
// TODO: Perhaps log what has not been consumed to aid
// debugging.
action_after_resume_ = {action::type::done, res_.second, ec};
BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run)
BOOST_REDIS_YIELD(resume_point_, 5, action::type::cancel_run)
return action_after_resume_;
}
@@ -48,6 +60,8 @@ reader_fsm::action reader_fsm::resume(
break;
}
read_buffer_->consume_committed(res_.second);
if (res_.first.value()) {
BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second)
if (ec) {

View File

@@ -34,17 +34,6 @@ void parser::reset()
sizes_[0] = 2; // The sentinel must be more than 1.
}
std::size_t parser::get_suggested_buffer_growth(std::size_t hint) const noexcept
{
if (!bulk_expected())
return hint;
if (hint < bulk_length_ + 2)
return bulk_length_ + 2;
return hint;
}
std::size_t parser::get_consumed() const noexcept { return consumed_; }
bool parser::done() const noexcept

View File

@@ -67,8 +67,6 @@ public:
[[nodiscard]]
auto done() const noexcept -> bool;
auto get_suggested_buffer_growth(std::size_t hint) const noexcept -> std::size_t;
auto get_consumed() const noexcept -> std::size_t;
auto consume(std::string_view view, system::error_code& ec) noexcept -> result;

View File

@@ -11,6 +11,7 @@
#include <boost/redis/impl/ignore.ipp>
#include <boost/redis/impl/logger.ipp>
#include <boost/redis/impl/multiplexer.ipp>
#include <boost/redis/impl/read_buffer.ipp>
#include <boost/redis/impl/reader_fsm.ipp>
#include <boost/redis/impl/request.ipp>
#include <boost/redis/impl/resp3_handshaker.ipp>

View File

@@ -50,7 +50,6 @@ boost::redis::config make_test_config()
{
boost::redis::config cfg;
cfg.addr.host = get_server_hostname();
cfg.max_read_size = 1000000;
return cfg;
}
@@ -69,3 +68,10 @@ void run_coroutine_test(net::awaitable<void> op, std::chrono::steady_clock::dura
throw std::runtime_error("Coroutine test did not finish");
}
#endif // BOOST_ASIO_HAS_CO_AWAIT
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data)
{
auto const buffer = rbuf.get_append_buffer();
BOOST_ASSERT(data.size() <= buffer.size());
std::copy(data.begin(), data.end(), buffer.begin());
}

View File

@@ -1,6 +1,7 @@
#pragma once
#include <boost/redis/connection.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/redis/operation.hpp>
#include <boost/asio/awaitable.hpp>
@@ -34,3 +35,5 @@ void run(
boost::redis::config cfg = make_test_config(),
boost::system::error_code ec = boost::asio::error::operation_aborted,
boost::redis::operation op = boost::redis::operation::receive);
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data);

View File

@@ -15,6 +15,8 @@
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include "common.hpp"
#include <cstddef>
#include <memory>
#include <optional>
@@ -117,8 +119,7 @@ void test_success()
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
// Simulate a successful read
mpx.get_read_buffer() = "$5\r\nhello\r\n";
auto req_status = mpx.consume_next(ec);
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
@@ -159,10 +160,9 @@ void test_parse_error()
// The second field should be a number (rather than the empty string).
// Note that although part of the buffer was consumed, the multiplexer
// currently throws this information away.
mpx.get_read_buffer() = "*2\r\n$5\r\nhello\r\n:\r\n";
auto req_status = mpx.consume_next(ec);
auto req_status = mpx.consume_next("*2\r\n$5\r\nhello\r\n:\r\n", ec);
BOOST_TEST_EQ(ec, error::empty_field);
BOOST_TEST_EQ(req_status.second, 0u);
BOOST_TEST_EQ(req_status.second, 15u);
BOOST_TEST_EQ(input.done_calls, 1u);
// This will awaken the exec operation, and should complete the operation
@@ -218,8 +218,7 @@ void test_not_connected()
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
// Simulate a successful read
mpx.get_read_buffer() = "$5\r\nhello\r\n";
auto req_status = mpx.consume_next(ec);
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
@@ -342,8 +341,7 @@ void test_cancel_notwaiting_notterminal()
BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name);
// Simulate a successful read
mpx.get_read_buffer() = "$5\r\nhello\r\n";
auto req_status = mpx.consume_next(ec);
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
BOOST_TEST_EQ_MSG(ec, error_code(), tc.name);
BOOST_TEST_EQ_MSG(req_status.first.value(), false, tc.name); // it wasn't a push
BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed

View File

@@ -528,6 +528,7 @@ BOOST_AUTO_TEST_CASE(cover_error)
check_error("boost.redis", boost::redis::error::sync_receive_push_failed);
check_error("boost.redis", boost::redis::error::incompatible_node_depth);
check_error("boost.redis", boost::redis::error::resp3_hello);
check_error("boost.redis", boost::redis::error::exceeds_maximum_read_buffer_size);
}
std::string get_type_as_str(boost::redis::resp3::type t)

View File

@@ -7,6 +7,7 @@
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/detail/resp3_handshaker.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/serialization.hpp>
@@ -14,6 +15,8 @@
#define BOOST_TEST_MODULE conn_quit
#include <boost/test/included/unit_test.hpp>
#include "common.hpp"
#include <iostream>
#include <string>
@@ -30,6 +33,7 @@ using boost::redis::generic_response;
using boost::redis::resp3::node;
using boost::redis::resp3::to_string;
using boost::redis::any_adapter;
using boost::system::error_code;
BOOST_AUTO_TEST_CASE(low_level_sync_sans_io)
{
@@ -258,10 +262,8 @@ BOOST_AUTO_TEST_CASE(multiplexer_push)
generic_response resp;
mpx.set_receive_response(resp);
mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n";
boost::system::error_code ec;
auto const ret = mpx.consume_next(ec);
auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec);
BOOST_TEST(ret.first.value());
BOOST_CHECK_EQUAL(ret.second, 16u);
@@ -282,16 +284,17 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
generic_response resp;
mpx.set_receive_response(resp);
std::string msg;
// Only part of the message.
mpx.get_read_buffer() = ">2\r\n+one\r";
msg += ">2\r\n+one\r";
boost::system::error_code ec;
auto ret = mpx.consume_next(ec);
auto ret = mpx.consume_next(msg, ec);
BOOST_TEST(!ret.first.has_value());
mpx.get_read_buffer().append("\n+two\r\n");
ret = mpx.consume_next(ec);
msg += "\n+two\r\n";
ret = mpx.consume_next(msg, ec);
BOOST_TEST(ret.first.value());
BOOST_CHECK_EQUAL(ret.second, 16u);
@@ -378,20 +381,14 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
BOOST_TEST(item2.done);
BOOST_TEST(!item3.done);
// Simulates a socket read by putting some data in the read buffer.
mpx.get_read_buffer().append("+one\r\n");
// Consumes the next message in the read buffer.
boost::system::error_code ec;
auto const ret = mpx.consume_next(ec);
auto const ret = mpx.consume_next("+one\r\n", ec);
// The read operation should have been successfull.
BOOST_TEST(ret.first.has_value());
BOOST_TEST(ret.second != 0u);
// The read buffer should also be empty now
BOOST_TEST(mpx.get_read_buffer().empty());
// The last request still did not get a response.
BOOST_TEST(item1.done);
BOOST_TEST(item2.done);
@@ -399,3 +396,69 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
// TODO: Check the first request was removed from the queue.
}
BOOST_AUTO_TEST_CASE(read_buffer_prepare_error)
{
using boost::redis::detail::read_buffer;
read_buffer buf;
// Usual case, max size is bigger then requested size.
buf.set_config({10, 10});
auto ec = buf.prepare_append();
BOOST_TEST(!ec);
buf.commit_append(10);
// Corner case, max size is equal to the requested size.
buf.set_config({10, 20});
ec = buf.prepare_append();
BOOST_TEST(!ec);
buf.commit_append(10);
buf.consume_committed(20);
auto const tmp = buf;
// Error case, max size is smaller to the requested size.
buf.set_config({10, 9});
ec = buf.prepare_append();
BOOST_TEST(ec == error_code{boost::redis::error::exceeds_maximum_read_buffer_size});
// Check that an error call has no side effects.
auto const res = buf == tmp;
BOOST_TEST(res);
}
BOOST_AUTO_TEST_CASE(read_buffer_prepare_consume_only_committed_data)
{
using boost::redis::detail::read_buffer;
read_buffer buf;
buf.set_config({10, 10});
auto ec = buf.prepare_append();
BOOST_TEST(!ec);
// No data has been committed yet so nothing can be consummed.
BOOST_CHECK_EQUAL(buf.consume_committed(5), 0u);
buf.commit_append(10);
// All five bytes can be consumed.
BOOST_CHECK_EQUAL(buf.consume_committed(5), 5u);
// Only the remaining five bytes can be consumed
BOOST_CHECK_EQUAL(buf.consume_committed(7), 5u);
}
BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
{
using boost::redis::detail::read_buffer;
read_buffer buf;
buf.set_config({10, 10});
auto ec = buf.prepare_append();
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u);
}

View File

@@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/asio/cancellation_type.hpp>
@@ -12,12 +13,15 @@
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include "common.hpp"
namespace net = boost::asio;
namespace redis = boost::redis;
using boost::system::error_code;
using net::cancellation_type_t;
using redis::detail::reader_fsm;
using redis::detail::multiplexer;
using redis::detail::read_buffer;
using redis::generic_response;
using action = redis::detail::reader_fsm::action;
@@ -37,10 +41,11 @@ namespace {
void test_push()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -51,13 +56,15 @@ void test_push()
BOOST_TEST_EQ(act.type_, action::type::append_some);
// The fsm is asking for data.
mpx.get_read_buffer().append(">1\r\n+msg1\r\n");
mpx.get_read_buffer().append(">1\r\n+msg2 \r\n");
mpx.get_read_buffer().append(">1\r\n+msg3 \r\n");
auto const bytes_read = mpx.get_read_buffer().size();
std::string const payload =
">1\r\n+msg1\r\n"
">1\r\n+msg2 \r\n"
">1\r\n+msg3 \r\n";
append_read_data(rbuf, payload);
// Deliver the 1st push
act = fsm.resume(bytes_read, ec, cancellation_type_t::none);
act = fsm.resume(payload.size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, 11u);
BOOST_TEST_EQ(act.ec_, error_code());
@@ -82,10 +89,11 @@ void test_push()
void test_read_needs_more()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -100,20 +108,20 @@ void test_read_needs_more()
std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"};
// Passes the first part to the fsm.
mpx.get_read_buffer().append(msg[0]);
append_read_data(rbuf, msg[0]);
act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::needs_more);
BOOST_TEST_EQ(act.ec_, error_code());
// Passes the second part to the fsm.
mpx.get_read_buffer().append(msg[1]);
append_read_data(rbuf, msg[1]);
act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::needs_more);
BOOST_TEST_EQ(act.ec_, error_code());
// Passes the third and last part to the fsm, next it should ask us
// to deliver the message.
mpx.get_read_buffer().append(msg[2]);
append_read_data(rbuf, msg[2]);
act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size());
@@ -127,10 +135,11 @@ void test_read_needs_more()
void test_read_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -141,26 +150,27 @@ void test_read_error()
BOOST_TEST_EQ(act.type_, action::type::append_some);
// The fsm is asking for data.
mpx.get_read_buffer().append(">1\r\n+msg1\r\n");
auto const bytes_read = mpx.get_read_buffer().size();
std::string const payload = ">1\r\n+msg1\r\n";
append_read_data(rbuf, payload);
// Deliver the data
act = fsm.resume(bytes_read, {net::error::operation_aborted}, cancellation_type_t::none);
act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
BOOST_TEST_EQ(act.ec_, error_code());
// Finish
act = fsm.resume(bytes_read, ec, cancellation_type_t::none);
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted});
}
void test_parse_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -171,26 +181,27 @@ void test_parse_error()
BOOST_TEST_EQ(act.type_, action::type::append_some);
// The fsm is asking for data.
mpx.get_read_buffer().append(">a\r\n");
auto const bytes_read = mpx.get_read_buffer().size();
std::string const payload = ">a\r\n";
append_read_data(rbuf, payload);
// Deliver the data
act = fsm.resume(bytes_read, {}, cancellation_type_t::none);
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
BOOST_TEST_EQ(act.ec_, error_code());
// Finish
act = fsm.resume(bytes_read, {}, cancellation_type_t::none);
act = fsm.resume(0, {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number});
}
void test_push_deliver_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -201,16 +212,16 @@ void test_push_deliver_error()
BOOST_TEST_EQ(act.type_, action::type::append_some);
// The fsm is asking for data.
mpx.get_read_buffer().append(">1\r\n+msg1\r\n");
auto const bytes_read = mpx.get_read_buffer().size();
std::string const payload = ">1\r\n+msg1\r\n";
append_read_data(rbuf, payload);
// Deliver the data
act = fsm.resume(bytes_read, {}, cancellation_type_t::none);
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.ec_, error_code());
// Resumes from notifying a push with an error.
act = fsm.resume(bytes_read, net::error::operation_aborted, cancellation_type_t::none);
act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
// Finish
@@ -219,10 +230,40 @@ void test_push_deliver_error()
BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted});
}
void test_max_read_buffer_size()
{
read_buffer rbuf;
rbuf.set_config({5, 7});
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
// Passes the first part to the fsm.
std::string const part1 = ">3\r\n";
append_read_data(rbuf, part1);
act = fsm.resume(part1.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
BOOST_TEST_EQ(act.ec_, error_code());
act = fsm.resume({}, {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size);
}
} // namespace
int main()
{
test_max_read_buffer_size();
test_push_deliver_error();
test_read_needs_more();
test_push();