2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

More code review changes.

This commit is contained in:
Marcelo Zimbres
2025-07-27 22:14:31 +02:00
parent 16bf57cf33
commit a76a621b0b
11 changed files with 123 additions and 114 deletions

View File

@@ -19,18 +19,6 @@
namespace boost::redis {
/** @brief Parse events that an adapter must support.
*/
enum class parse_event
{
/// Called before the parser starts processing data
init,
/// Called for each and every node of RESP3 data
node,
/// Called when done processing a complete RESP3 message
done
};
/** @brief A type-erased reference to a response.
*
* A type-erased response adapter. It can be executed using @ref connection::async_exec.
@@ -41,63 +29,86 @@ enum class parse_event
*
* @code
* co_await conn.async_exec(req, resp);
* co_await conn.async_exec(req, any_adapter(...));
* co_await conn.async_exec(req, any_response(resp));
* @endcode
*/
using any_adapter = std::function<void(parse_event, resp3::node_view const&, system::error_code&)>;
namespace detail {
template <class T>
auto make_any_adapter(T& resp) -> any_adapter
{
using namespace boost::redis::adapter;
return [adapter = boost_redis_adapt(
resp)](parse_event ev, resp3::node_view const& nd, system::error_code& ec) mutable {
switch (ev) {
case parse_event::init: adapter.on_init(); break;
case parse_event::node: adapter.on_node(nd, ec); break;
case parse_event::done: adapter.on_done(); break;
}
};
}
class any_adapter_wrapper {
class any_adapter {
public:
any_adapter_wrapper(any_adapter adapter = {}, std::size_t expected_responses = 0u)
: adapter_{std::move(adapter)}
, expected_responses_{expected_responses}
/** @brief Parse events that an adapter must support.
*/
enum class parse_event
{
/// Called before the parser starts processing data
init,
/// Called for each and every node of RESP3 data
node,
/// Called when done processing a complete RESP3 message
done
};
/// The type erased implementation type.
using impl_t = std::function<void(parse_event, resp3::node_view const&, system::error_code&)>;
template <class T>
static auto create_impl(T& resp) -> impl_t
{
using namespace boost::redis::adapter;
return [adapter2 = boost_redis_adapt(resp)](
any_adapter::parse_event ev,
resp3::node_view const& nd,
system::error_code& ec) mutable {
switch (ev) {
case parse_event::init: adapter2.on_init(); break;
case parse_event::node: adapter2.on_node(nd, ec); break;
case parse_event::done: adapter2.on_done(); break;
}
};
}
/// Contructs from a type erased adaper
any_adapter(impl_t fn = [](parse_event, resp3::node_view const&, system::error_code&) { })
: impl_{std::move(fn)}
{ }
/**
* @brief Constructor.
*
* Creates a type-erased response adapter from `resp` by calling
* `boost_redis_adapt`. `T` must be a valid Redis response type.
* Any type passed to @ref connection::async_exec qualifies.
*
* This object stores a reference to `resp`, which must be kept alive
* while `*this` is being used.
*/
template <class T, class = std::enable_if_t<!std::is_same_v<T, any_adapter>>>
explicit any_adapter(T& resp)
: impl_(create_impl(resp))
{ }
/// Calls the implementation with the arguments `impl_(parse_event::init, ...);`
void on_init()
{
system::error_code ec;
adapter_(parse_event::init, {}, ec);
impl_(parse_event::init, {}, ec);
};
/// Calls the implementation with the arguments `impl_(parse_event::done, ...);`
void on_done()
{
system::error_code ec;
adapter_(parse_event::done, {}, ec);
BOOST_ASSERT(expected_responses_ != 0u);
expected_responses_ -= 1;
impl_(parse_event::done, {}, ec);
};
/// Calls the implementation with the arguments `impl_(parse_event::node, ...);`
void on_node(resp3::node_view const& nd, system::error_code& ec)
{
adapter_(parse_event::node, nd, ec);
impl_(parse_event::node, nd, ec);
};
auto get_remaining_responses() const -> std::size_t { return expected_responses_; }
private:
any_adapter adapter_;
std::size_t expected_responses_ = 0;
impl_t impl_;
};
} // namespace detail
} // namespace boost::redis
#endif

View File

@@ -138,7 +138,7 @@ struct connection_impl {
, health_checker_{ex}
, logger_{std::move(lgr)}
{
set_receive_response(ignore);
set_receive_adapter(any_adapter{ignore});
writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
// Reserve some memory to avoid excessive memory allocations in
@@ -200,13 +200,7 @@ struct connection_impl {
writer_timer_);
}
template <class Response>
void set_receive_response(Response& response)
{
auto adapter = detail::make_any_adapter(response);
mpx_.set_receive_response(std::move(adapter));
}
void set_receive_adapter(any_adapter adapter) { mpx_.set_receive_adapter(std::move(adapter)); }
};
template <class Executor>
@@ -506,11 +500,10 @@ public:
executor_type ex,
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
logger lgr = {})
: impl_(
std::make_unique<detail::connection_impl<Executor>>(
std::move(ex),
std::move(ctx),
std::move(lgr)))
: impl_(std::make_unique<detail::connection_impl<Executor>>(
std::move(ex),
std::move(ctx),
std::move(lgr)))
{ }
/** @brief Constructor from an executor and a logger.
@@ -770,7 +763,7 @@ public:
{
return this->async_exec(
req,
detail::make_any_adapter(resp),
any_adapter{resp},
std::forward<CompletionToken>(token));
}
@@ -897,9 +890,9 @@ public:
/// Sets the response object of @ref async_receive operations.
template <class Response>
void set_receive_response(Response& response)
void set_receive_response(Response& resp)
{
impl_->set_receive_response(response);
impl_->set_receive_adapter(any_adapter{resp});
}
/// Returns connection usage information.
@@ -1076,7 +1069,10 @@ public:
template <class Response = ignore_t, class CompletionToken = asio::deferred_t>
auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
{
return async_exec(req, detail::make_any_adapter(resp), std::forward<CompletionToken>(token));
return async_exec(
req,
any_adapter{resp},
std::forward<CompletionToken>(token));
}
/**

View File

@@ -51,7 +51,10 @@ public:
}
BOOST_ASIO_CORO_YIELD
conn_->async_exec(checker_->req_, make_any_adapter(checker_->resp_), std::move(self));
conn_->async_exec(
checker_->req_,
any_adapter{checker_->resp_},
std::move(self));
if (ec) {
conn_->logger_.trace("ping_op (3)", ec);
checker_->wait_timer_.cancel();

View File

@@ -85,9 +85,11 @@ public:
auto get_read_size() const -> std::size_t { return read_size_; }
auto get_remaining_responses() const -> std::size_t { return remaining_responses_; }
auto commit_response(std::size_t read_size) -> void;
auto get_adapter() -> any_adapter_wrapper& { return adapter_; }
auto get_adapter() -> any_adapter& { return adapter_; }
private:
enum class status
@@ -99,8 +101,7 @@ public:
};
request const* req_;
any_adapter_wrapper adapter_;
any_adapter adapter_;
std::function<void()> done_;
// Contains the number of commands that haven't been read yet.
@@ -153,7 +154,7 @@ public:
return std::string_view{write_buffer_};
}
void set_receive_response(any_adapter adapter);
void set_receive_adapter(any_adapter adapter);
[[nodiscard]]
auto get_usage() const noexcept -> usage
@@ -186,7 +187,7 @@ private:
bool on_push_ = false;
bool cancel_run_called_ = false;
usage usage_;
any_adapter_wrapper receive_adapter_;
any_adapter receive_adapter_;
};
auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr<multiplexer::elem>;

View File

@@ -42,8 +42,9 @@ struct hello_op {
BOOST_ASIO_CORO_YIELD
conn_->async_exec(
handshaker_->hello_req_,
make_any_adapter(handshaker_->hello_resp_),
any_adapter{handshaker_->hello_resp_},
std::move(self));
conn_->logger_.on_hello(ec, handshaker_->hello_resp_);
if (ec) {

View File

@@ -13,7 +13,8 @@ namespace boost::redis::detail {
multiplexer::elem::elem(request const& req, any_adapter adapter)
: req_{&req}
, adapter_{any_adapter_wrapper{adapter, req.get_expected_responses()}}
, adapter_{std::move(adapter)}
, remaining_responses_{req.get_expected_responses()}
, status_{status::waiting}
, ec_{}
, read_size_{0}
@@ -104,7 +105,7 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code
"Not waiting for a response (using MONITOR command perhaps?)");
BOOST_ASSERT(!reqs_.empty());
BOOST_ASSERT(reqs_.front() != nullptr);
BOOST_ASSERT(reqs_.front()->get_adapter().get_remaining_responses() != 0);
BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0);
if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec))
return std::nullopt;
@@ -116,7 +117,7 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code
}
reqs_.front()->commit_response(parser_.get_consumed());
if (reqs_.front()->get_adapter().get_remaining_responses() == 0) {
if (reqs_.front()->get_remaining_responses() == 0) {
// Done with this request.
reqs_.front()->notify_done();
reqs_.pop_front();
@@ -268,7 +269,7 @@ bool multiplexer::is_next_push(std::string_view data) const noexcept
// The request does not expect any response but we got one. This
// may happen if for example, subscribe with wrong syntax.
if (reqs_.front()->get_adapter().get_remaining_responses() == 0)
if (reqs_.front()->get_remaining_responses() == 0)
return true;
// Added to deal with MONITOR and also to fix PR170 which
@@ -308,9 +309,9 @@ bool multiplexer::is_waiting_response() const noexcept
bool multiplexer::is_writing() const noexcept { return !write_buffer_.empty(); }
void multiplexer::set_receive_response(any_adapter adapter)
void multiplexer::set_receive_adapter(any_adapter adapter)
{
receive_adapter_ = any_adapter_wrapper{std::move(adapter), static_cast<std::size_t>(-1)};
receive_adapter_ = std::move(adapter);
}
auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr<multiplexer::elem>

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
@@ -16,7 +16,7 @@ using boost::redis::generic_response;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::any_adapter;
using boost::redis::detail::make_any_adapter;
using boost::redis::any_adapter;
BOOST_AUTO_TEST_CASE(any_adapter_response_types)
{
@@ -25,17 +25,17 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types)
response<int, std::string> r2;
generic_response r3;
BOOST_CHECK_NO_THROW(make_any_adapter(r1));
BOOST_CHECK_NO_THROW(make_any_adapter(r2));
BOOST_CHECK_NO_THROW(make_any_adapter(r3));
BOOST_CHECK_NO_THROW(make_any_adapter(ignore));
BOOST_CHECK_NO_THROW(any_adapter{r1});
BOOST_CHECK_NO_THROW(any_adapter{r2});
BOOST_CHECK_NO_THROW(any_adapter{r3});
BOOST_CHECK_NO_THROW(any_adapter{ignore});
}
BOOST_AUTO_TEST_CASE(any_adapter_copy_move)
{
// any_adapter can be copied/moved
response<int, std::string> r;
auto ad1 = make_any_adapter(r);
auto ad1 = any_adapter{r};
// copy constructor
auto ad2 = any_adapter(ad1);

View File

@@ -31,7 +31,7 @@ using boost::redis::ignore;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::detail::make_any_adapter;
using boost::redis::any_adapter;
using boost::system::error_code;
using namespace std::chrono_literals;
@@ -230,7 +230,7 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter)
bool finished = false;
conn->async_exec(req, make_any_adapter(res), [&](error_code ec, std::size_t) {
conn->async_exec(req, res, [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->cancel();
finished = true;

View File

@@ -83,10 +83,7 @@ struct elem_and_request {
{
// Empty requests are not valid. The request needs to be populated before creating the element
req.push("get", "mykey");
elm = std::make_shared<multiplexer::elem>(
req,
[](parse_event, resp3::node_view const&, error_code&) { });
elm = std::make_shared<multiplexer::elem>(req, any_adapter{});
elm->set_done_callback([this] {
++done_calls;

View File

@@ -20,19 +20,19 @@
#include <iostream>
#include <string>
using boost::redis::request;
using boost::redis::config;
using boost::redis::detail::push_hello;
using boost::redis::response;
using boost::redis::adapter::adapt2;
using boost::redis::adapter::result;
using boost::redis::resp3::detail::deserialize;
using boost::redis::ignore_t;
using boost::redis::config;
using boost::redis::detail::multiplexer;
using boost::redis::detail::push_hello;
using boost::redis::generic_response;
using boost::redis::ignore_t;
using boost::redis::request;
using boost::redis::resp3::detail::deserialize;
using boost::redis::resp3::node;
using boost::redis::resp3::to_string;
using boost::redis::detail::make_any_adapter;
using boost::redis::response;
using boost::redis::any_adapter;
using boost::system::error_code;
#define RESP3_SET_PART1 "~6\r\n+orange\r"
@@ -265,7 +265,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push)
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(make_any_adapter(resp));
mpx.set_receive_adapter(any_adapter{resp});
boost::system::error_code ec;
auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec);
@@ -287,7 +287,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(make_any_adapter(resp));
mpx.set_receive_adapter(any_adapter{resp});
std::string msg;
// Only part of the message.
@@ -323,7 +323,7 @@ struct test_item {
// to Redis.
req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg");
elem_ptr = std::make_shared<multiplexer::elem>(req, make_any_adapter(resp));
elem_ptr = std::make_shared<multiplexer::elem>(req, any_adapter{resp});
elem_ptr->set_done_callback([this]() {
done = true;
@@ -470,8 +470,7 @@ BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
BOOST_AUTO_TEST_CASE(check_counter_adapter)
{
using boost::redis::parse_event;
using boost::redis::detail::any_adapter_wrapper;
using boost::redis::any_adapter;
using boost::redis::resp3::parse;
using boost::redis::resp3::parser;
using boost::redis::resp3::node_view;
@@ -481,15 +480,15 @@ BOOST_AUTO_TEST_CASE(check_counter_adapter)
int node = 0;
int done = 0;
auto counter_adapter = [&](parse_event ev, node_view const&, error_code&) mutable {
auto counter_adapter = [&](any_adapter::parse_event ev, node_view const&, error_code&) mutable {
switch (ev) {
case parse_event::init: init++; break;
case parse_event::node: node++; break;
case parse_event::done: done++; break;
case any_adapter::parse_event::init: init++; break;
case any_adapter::parse_event::node: node++; break;
case any_adapter::parse_event::done: done++; break;
}
};
any_adapter_wrapper wrapped{counter_adapter, 1};
any_adapter wrapped{any_adapter::impl_t{counter_adapter}};
error_code ec;
parser p;

View File

@@ -23,7 +23,7 @@ using redis::detail::reader_fsm;
using redis::detail::multiplexer;
using redis::detail::read_buffer;
using redis::generic_response;
using redis::detail::make_any_adapter;
using redis::any_adapter;
using action = redis::detail::reader_fsm::action;
namespace boost::redis::detail {
@@ -45,7 +45,7 @@ void test_push()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(make_any_adapter(resp));
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -93,7 +93,7 @@ void test_read_needs_more()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(make_any_adapter(resp));
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -139,7 +139,7 @@ void test_read_error()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(make_any_adapter(resp));
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -170,7 +170,7 @@ void test_parse_error()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(make_any_adapter(resp));
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -201,7 +201,7 @@ void test_push_deliver_error()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(make_any_adapter(resp));
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -237,7 +237,7 @@ void test_max_read_buffer_size()
rbuf.set_config({5, 7});
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(make_any_adapter(resp));
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;