2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Code review changes.

This commit is contained in:
Marcelo Zimbres
2025-07-18 00:04:16 +02:00
parent 97d71d1d6b
commit 8ee2213efe
13 changed files with 94 additions and 96 deletions

View File

@@ -88,7 +88,7 @@ struct config {
*/
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};
/** @brief Maximum size of the read-buffer in bytes.
/** @brief Maximum size of the socket read-buffer in bytes.
*
* Sets a limit on how much data is allowed to be read into the
* read buffer. It can be used to prevent DDOS.
@@ -98,8 +98,9 @@ struct config {
/** @brief read_buffer_append_size
*
* The size by which the read buffer grows when more space is
* needed. There is no need to set this too high because memory is
* reused and the growth will tend to zero.
* needed. This can help avoiding some memory allocations. Once the
* maximum size is reached no more memory allocations are made
* since the buffer is reused.
*/
std::size_t read_buffer_append_size = 4096;
};

View File

@@ -160,17 +160,19 @@ struct writer_op {
template <class Conn>
struct reader_op {
Conn* conn_;
detail::reader_fsm fsm_;
public:
reader_op(Conn& conn) noexcept
: conn_{&conn}
, fsm_{conn.read_buffer_, conn.mpx_}
{ }
template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
{
for (;;) {
auto act = conn_->read_fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
conn_->logger_.on_fsm_resume(act);
@@ -181,7 +183,7 @@ public:
case reader_fsm::action::type::needs_more:
case reader_fsm::action::type::append_some:
{
auto const buf = conn_->read_fsm_.get_append_buffer();
auto const buf = conn_->read_buffer_.get_append_buffer();
conn_->stream_.async_read_some(asio::buffer(buf), std::move(self));
}
return;
@@ -276,8 +278,8 @@ public:
// If we were successful, run all the connection tasks
if (!ec) {
conn_->read_buffer_.clear();
conn_->mpx_.reset();
conn_->read_fsm_.reset();
// Note: Order is important here because the writer might
// trigger an async_write before the async_hello thereby
@@ -385,7 +387,6 @@ public:
, reconnect_timer_{ex}
, receive_channel_{ex, 256}
, health_checker_{ex}
, read_fsm_{mpx_}
, logger_{std::move(lgr)}
{
set_receive_response(ignore);
@@ -489,7 +490,11 @@ public:
cfg_ = cfg;
health_checker_.set_config(cfg);
handshaker_.set_config(cfg);
read_fsm_.set_config({cfg_.read_buffer_append_size, cfg_.max_read_size});
read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size});
// Reserve some memory to avoid excessive memory allocations in
// the first reads.
read_buffer_.reserve(4048u);
return asio::async_compose<CompletionToken, void(system::error_code)>(
detail::run_op<this_type>{this},
@@ -887,8 +892,8 @@ private:
resp3_handshaker_type handshaker_;
config cfg_;
detail::read_buffer read_buffer_;
detail::multiplexer mpx_;
detail::reader_fsm read_fsm_;
detail::connection_logger logger_;
};

View File

@@ -14,7 +14,7 @@
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/usage.hpp>
#include <boost/system.hpp>
#include <boost/system/error_code.hpp>
#include <algorithm>
#include <deque>

View File

@@ -8,11 +8,12 @@
#define BOOST_REDIS_READ_BUFFER_HPP
#include <boost/core/span.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <vector>
#include <string_view>
#include <utility>
#include <vector>
namespace boost::redis::detail {
@@ -20,14 +21,20 @@ class read_buffer {
public:
using span_type = span<char>;
[[nodiscard]]
system::error_code prepare_append(std::size_t append_size, std::size_t max_buffer_size);
// See config.hpp for the meaning of these parameters.
struct config {
std::size_t read_buffer_append_size = 4096u;
std::size_t max_read_size = static_cast<std::size_t>(-1);
};
void commit_append(std::size_t read_size);
[[nodiscard]]
auto prepare_append() -> system::error_code;
[[nodiscard]]
auto get_append_buffer() noexcept -> span_type;
void commit_append(std::size_t read_size);
[[nodiscard]]
auto get_committed_buffer() const noexcept -> std::string_view;
@@ -41,13 +48,14 @@ public:
void reserve(std::size_t n);
friend
bool operator==(read_buffer const& lhs, read_buffer const& rhs);
friend bool operator==(read_buffer const& lhs, read_buffer const& rhs);
friend
bool operator!=(read_buffer const& lhs, read_buffer const& rhs);
friend bool operator!=(read_buffer const& lhs, read_buffer const& rhs);
void set_config(config const& cfg) noexcept { cfg_ = cfg; };
private:
config cfg_ = config{};
std::vector<char> buffer_;
std::size_t append_buf_begin_ = 0;
};

View File

@@ -6,7 +6,6 @@
#ifndef BOOST_REDIS_READER_FSM_HPP
#define BOOST_REDIS_READER_FSM_HPP
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/asio/cancellation_type.hpp>
@@ -16,14 +15,10 @@
namespace boost::redis::detail {
class read_buffer;
class reader_fsm {
public:
// See config.hpp for the meaning of these parameters.
struct config {
std::size_t read_buffer_append_size = 4096;
std::size_t max_read_size = -1;
};
struct action {
enum class type
{
@@ -36,31 +31,20 @@ public:
};
type type_ = type::setup_cancellation;
std::size_t push_size_ = 0;
std::size_t push_size_ = 0u;
system::error_code ec_ = {};
};
explicit reader_fsm(multiplexer& mpx) noexcept;
explicit reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept;
action resume(
std::size_t bytes_read,
system::error_code ec,
asio::cancellation_type_t /*cancel_state*/);
void set_config(config const& cfg) noexcept { cfg_ = cfg; };
void reset();
[[nodiscard]]
auto get_append_buffer() noexcept
{
return read_buffer_.get_append_buffer();
}
private:
int resume_point_{0};
read_buffer read_buffer_;
config cfg_;
read_buffer* read_buffer_ = nullptr;
action action_after_resume_;
action::type next_read_type_ = action::type::append_some;
multiplexer* mpx_ = nullptr;

View File

@@ -89,7 +89,7 @@ enum class error
/// The configuration specified UNIX sockets with SSL, which is not supported.
unix_sockets_ssl_unsupported,
/// The size of the read buffer would exceed it maximum configured value.
/// Reading data from the socket would exceed the maximum size allowed of the read buffer.
exceeds_maximum_read_buffer_size,
};

View File

@@ -51,7 +51,8 @@ struct error_category_impl : system::error_category {
case error::unix_sockets_ssl_unsupported:
return "The configuration specified UNIX sockets with SSL, which is not supported.";
case error::exceeds_maximum_read_buffer_size:
return "The size of the read buffer would exceed it maximum configured value";
return "Reading data from the socket would exceed the maximum size allowed of the read "
"buffer.";
default: BOOST_ASSERT(false); return "Boost.Redis error.";
}
}

View File

@@ -13,14 +13,13 @@
namespace boost::redis::detail {
system::error_code
read_buffer::prepare_append(std::size_t append_size, std::size_t max_buffer_size)
system::error_code read_buffer::prepare_append()
{
BOOST_ASSERT(append_buf_begin_ == buffer_.size());
auto const new_size = append_buf_begin_ + append_size;
auto const new_size = append_buf_begin_ + cfg_.read_buffer_append_size;
if (new_size > max_buffer_size) {
if (new_size > cfg_.max_read_size) {
return error::exceeds_maximum_read_buffer_size;
}
@@ -57,7 +56,8 @@ void read_buffer::clear()
std::size_t read_buffer::consume_committed(std::size_t size)
{
// Consumes only committed data.
// For convenience, if the requested size is larger than the
// committed buffer we cap it to the maximum.
if (size > append_buf_begin_)
size = append_buf_begin_;
@@ -67,21 +67,13 @@ std::size_t read_buffer::consume_committed(std::size_t size)
return size;
}
void read_buffer::reserve(std::size_t n)
{
buffer_.reserve(n);
}
void read_buffer::reserve(std::size_t n) { buffer_.reserve(n); }
bool operator==(read_buffer const& lhs, read_buffer const& rhs)
{
return
lhs.buffer_ == rhs.buffer_ &&
lhs.append_buf_begin_ == rhs.append_buf_begin_;
return lhs.buffer_ == rhs.buffer_ && lhs.append_buf_begin_ == rhs.append_buf_begin_;
}
bool operator!=(read_buffer const& lhs, read_buffer const& rhs)
{
return !(lhs == rhs);
}
bool operator!=(read_buffer const& lhs, read_buffer const& rhs) { return !(lhs == rhs); }
} // namespace boost::redis::detail

View File

@@ -6,12 +6,14 @@
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
namespace boost::redis::detail {
reader_fsm::reader_fsm(multiplexer& mpx) noexcept
: mpx_{&mpx}
reader_fsm::reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept
: read_buffer_{&rbuf}
, mpx_{&mpx}
{ }
reader_fsm::action reader_fsm::resume(
@@ -24,7 +26,7 @@ reader_fsm::action reader_fsm::resume(
BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation)
for (;;) {
ec = read_buffer_.prepare_append(cfg_.read_buffer_append_size, cfg_.max_read_size);
ec = read_buffer_->prepare_append();
if (ec) {
action_after_resume_ = {action::type::done, 0, ec};
BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run)
@@ -32,7 +34,7 @@ reader_fsm::action reader_fsm::resume(
}
BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_)
read_buffer_.commit_append(bytes_read);
read_buffer_->commit_append(bytes_read);
if (ec) {
// TODO: If an error occurred but data was read (i.e.
// bytes_read != 0) we should try to process that data and
@@ -43,8 +45,8 @@ reader_fsm::action reader_fsm::resume(
}
next_read_type_ = action::type::append_some;
while (read_buffer_.get_committed_size() != 0) {
res_ = mpx_->consume_next(read_buffer_.get_committed_buffer(), ec);
while (read_buffer_->get_committed_size() != 0) {
res_ = mpx_->consume_next(read_buffer_->get_committed_buffer(), ec);
if (ec) {
// TODO: Perhaps log what has not been consumed to aid
// debugging.
@@ -58,7 +60,7 @@ reader_fsm::action reader_fsm::resume(
break;
}
read_buffer_.consume_committed(res_.second);
read_buffer_->consume_committed(res_.second);
if (res_.first.value()) {
BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second)
@@ -83,12 +85,4 @@ reader_fsm::action reader_fsm::resume(
return {action::type::done, 0, system::error_code()};
}
void reader_fsm::reset()
{
resume_point_ = 0;
next_read_type_ = action::type::append_some;
res_ = {std::make_pair(std::nullopt, 0)};
read_buffer_.clear();
}
} // namespace boost::redis::detail

View File

@@ -69,9 +69,9 @@ void run_coroutine_test(net::awaitable<void> op, std::chrono::steady_clock::dura
}
#endif // BOOST_ASIO_HAS_CO_AWAIT
void append_read_data(boost::redis::detail::reader_fsm& fsm, std::string_view data)
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data)
{
auto const buffer = fsm.get_append_buffer();
auto const buffer = rbuf.get_append_buffer();
BOOST_ASSERT(data.size() <= buffer.size());
std::copy(data.begin(), data.end(), buffer.begin());
}

View File

@@ -36,4 +36,4 @@ void run(
boost::system::error_code ec = boost::asio::error::operation_aborted,
boost::redis::operation op = boost::redis::operation::receive);
void append_read_data(boost::redis::detail::reader_fsm& fsm, std::string_view data);
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data);

View File

@@ -7,11 +7,11 @@
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/detail/resp3_handshaker.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#define BOOST_TEST_MODULE conn_quit
#include <boost/test/included/unit_test.hpp>
@@ -404,12 +404,14 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_error)
read_buffer buf;
// Usual case, max size is bigger then requested size.
auto ec = buf.prepare_append(10, 10);
buf.set_config({10, 10});
auto ec = buf.prepare_append();
BOOST_TEST(!ec);
buf.commit_append(10);
// Corner case, max size is equal to the requested size.
ec = buf.prepare_append(10, 20);
buf.set_config({10, 20});
ec = buf.prepare_append();
BOOST_TEST(!ec);
buf.commit_append(10);
buf.consume_committed(20);
@@ -417,7 +419,8 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_error)
auto const tmp = buf;
// Error case, max size is smaller to the requested size.
ec = buf.prepare_append(10, 9);
buf.set_config({10, 9});
ec = buf.prepare_append();
BOOST_TEST(ec == error_code{boost::redis::error::exceeds_maximum_read_buffer_size});
// Check that an error call has no side effects.
@@ -431,7 +434,8 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_consume_only_committed_data)
read_buffer buf;
auto ec = buf.prepare_append(10, 10);
buf.set_config({10, 10});
auto ec = buf.prepare_append();
BOOST_TEST(!ec);
// No data has been committed yet so nothing can be consummed.
@@ -452,7 +456,8 @@ BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
read_buffer buf;
auto ec = buf.prepare_append(10, 10);
buf.set_config({10, 10});
auto ec = buf.prepare_append();
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u);

View File

@@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/asio/cancellation_type.hpp>
@@ -20,6 +21,7 @@ using boost::system::error_code;
using net::cancellation_type_t;
using redis::detail::reader_fsm;
using redis::detail::multiplexer;
using redis::detail::read_buffer;
using redis::generic_response;
using action = redis::detail::reader_fsm::action;
@@ -39,10 +41,11 @@ namespace {
void test_push()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -58,7 +61,7 @@ void test_push()
">1\r\n+msg2 \r\n"
">1\r\n+msg3 \r\n";
append_read_data(fsm, payload);
append_read_data(rbuf, payload);
// Deliver the 1st push
act = fsm.resume(payload.size(), ec, cancellation_type_t::none);
@@ -86,10 +89,11 @@ void test_push()
void test_read_needs_more()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -104,20 +108,20 @@ void test_read_needs_more()
std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"};
// Passes the first part to the fsm.
append_read_data(fsm, msg[0]);
append_read_data(rbuf, msg[0]);
act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::needs_more);
BOOST_TEST_EQ(act.ec_, error_code());
// Passes the second part to the fsm.
append_read_data(fsm, msg[1]);
append_read_data(rbuf, msg[1]);
act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::needs_more);
BOOST_TEST_EQ(act.ec_, error_code());
// Passes the third and last part to the fsm, next it should ask us
// to deliver the message.
append_read_data(fsm, msg[2]);
append_read_data(rbuf, msg[2]);
act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size());
@@ -131,10 +135,11 @@ void test_read_needs_more()
void test_read_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -146,7 +151,7 @@ void test_read_error()
// The fsm is asking for data.
std::string const payload = ">1\r\n+msg1\r\n";
append_read_data(fsm, payload);
append_read_data(rbuf, payload);
// Deliver the data
act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none);
@@ -161,10 +166,11 @@ void test_read_error()
void test_parse_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -176,7 +182,7 @@ void test_parse_error()
// The fsm is asking for data.
std::string const payload = ">a\r\n";
append_read_data(fsm, payload);
append_read_data(rbuf, payload);
// Deliver the data
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
@@ -191,10 +197,11 @@ void test_parse_error()
void test_push_deliver_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -206,7 +213,7 @@ void test_push_deliver_error()
// The fsm is asking for data.
std::string const payload = ">1\r\n+msg1\r\n";
append_read_data(fsm, payload);
append_read_data(rbuf, payload);
// Deliver the data
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
@@ -225,11 +232,12 @@ void test_push_deliver_error()
void test_max_read_buffer_size()
{
read_buffer rbuf;
rbuf.set_config({5, 7});
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
reader_fsm fsm{mpx};
fsm.set_config({5, 7});
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -241,7 +249,7 @@ void test_max_read_buffer_size()
// Passes the first part to the fsm.
std::string const part1 = ">3\r\n";
append_read_data(fsm, part1);
append_read_data(rbuf, part1);
act = fsm.resume(part1.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
BOOST_TEST_EQ(act.ec_, error_code());