From a76a621b0b936dc4a50ddfb92a3c3a49457cc659 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 27 Jul 2025 22:14:31 +0200 Subject: [PATCH] More code review changes. --- include/boost/redis/adapter/any_adapter.hpp | 105 ++++++++++-------- include/boost/redis/connection.hpp | 30 +++-- include/boost/redis/detail/health_checker.hpp | 5 +- include/boost/redis/detail/multiplexer.hpp | 11 +- .../boost/redis/detail/resp3_handshaker.hpp | 3 +- include/boost/redis/impl/multiplexer.ipp | 13 ++- test/test_any_adapter.cpp | 14 +-- test/test_conn_exec.cpp | 4 +- test/test_exec_fsm.cpp | 5 +- test/test_low_level_sync_sans_io.cpp | 33 +++--- test/test_reader_fsm.cpp | 14 +-- 11 files changed, 123 insertions(+), 114 deletions(-) diff --git a/include/boost/redis/adapter/any_adapter.hpp b/include/boost/redis/adapter/any_adapter.hpp index 88dd5aa3..a32e2545 100644 --- a/include/boost/redis/adapter/any_adapter.hpp +++ b/include/boost/redis/adapter/any_adapter.hpp @@ -19,18 +19,6 @@ namespace boost::redis { -/** @brief Parse events that an adapter must support. - */ -enum class parse_event -{ - /// Called before the parser starts processing data - init, - /// Called for each and every node of RESP3 data - node, - /// Called when done processing a complete RESP3 message - done -}; - /** @brief A type-erased reference to a response. * * A type-erased response adapter. It can be executed using @ref connection::async_exec. @@ -41,63 +29,86 @@ enum class parse_event * * @code * co_await conn.async_exec(req, resp); - * co_await conn.async_exec(req, any_adapter(...)); + * co_await conn.async_exec(req, any_response(resp)); * @endcode */ -using any_adapter = std::function; - -namespace detail { - -template -auto make_any_adapter(T& resp) -> any_adapter -{ - using namespace boost::redis::adapter; - - return [adapter = boost_redis_adapt( - resp)](parse_event ev, resp3::node_view const& nd, system::error_code& ec) mutable { - switch (ev) { - case parse_event::init: adapter.on_init(); break; - case parse_event::node: adapter.on_node(nd, ec); break; - case parse_event::done: adapter.on_done(); break; - } - }; -} - -class any_adapter_wrapper { +class any_adapter { public: - any_adapter_wrapper(any_adapter adapter = {}, std::size_t expected_responses = 0u) - : adapter_{std::move(adapter)} - , expected_responses_{expected_responses} + /** @brief Parse events that an adapter must support. + */ + enum class parse_event + { + /// Called before the parser starts processing data + init, + /// Called for each and every node of RESP3 data + node, + /// Called when done processing a complete RESP3 message + done + }; + + /// The type erased implementation type. + using impl_t = std::function; + + template + static auto create_impl(T& resp) -> impl_t + { + using namespace boost::redis::adapter; + return [adapter2 = boost_redis_adapt(resp)]( + any_adapter::parse_event ev, + resp3::node_view const& nd, + system::error_code& ec) mutable { + switch (ev) { + case parse_event::init: adapter2.on_init(); break; + case parse_event::node: adapter2.on_node(nd, ec); break; + case parse_event::done: adapter2.on_done(); break; + } + }; + } + + /// Contructs from a type erased adaper + any_adapter(impl_t fn = [](parse_event, resp3::node_view const&, system::error_code&) { }) + : impl_{std::move(fn)} { } + /** + * @brief Constructor. + * + * Creates a type-erased response adapter from `resp` by calling + * `boost_redis_adapt`. `T` must be a valid Redis response type. + * Any type passed to @ref connection::async_exec qualifies. + * + * This object stores a reference to `resp`, which must be kept alive + * while `*this` is being used. + */ + template >> + explicit any_adapter(T& resp) + : impl_(create_impl(resp)) + { } + + /// Calls the implementation with the arguments `impl_(parse_event::init, ...);` void on_init() { system::error_code ec; - adapter_(parse_event::init, {}, ec); + impl_(parse_event::init, {}, ec); }; + /// Calls the implementation with the arguments `impl_(parse_event::done, ...);` void on_done() { system::error_code ec; - adapter_(parse_event::done, {}, ec); - BOOST_ASSERT(expected_responses_ != 0u); - expected_responses_ -= 1; + impl_(parse_event::done, {}, ec); }; + /// Calls the implementation with the arguments `impl_(parse_event::node, ...);` void on_node(resp3::node_view const& nd, system::error_code& ec) { - adapter_(parse_event::node, nd, ec); + impl_(parse_event::node, nd, ec); }; - auto get_remaining_responses() const -> std::size_t { return expected_responses_; } - private: - any_adapter adapter_; - std::size_t expected_responses_ = 0; + impl_t impl_; }; -} // namespace detail - } // namespace boost::redis #endif diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 66972fe7..57df0915 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -138,7 +138,7 @@ struct connection_impl { , health_checker_{ex} , logger_{std::move(lgr)} { - set_receive_response(ignore); + set_receive_adapter(any_adapter{ignore}); writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); // Reserve some memory to avoid excessive memory allocations in @@ -200,13 +200,7 @@ struct connection_impl { writer_timer_); } - template - void set_receive_response(Response& response) - { - auto adapter = detail::make_any_adapter(response); - mpx_.set_receive_response(std::move(adapter)); - } - + void set_receive_adapter(any_adapter adapter) { mpx_.set_receive_adapter(std::move(adapter)); } }; template @@ -506,11 +500,10 @@ public: executor_type ex, asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, logger lgr = {}) - : impl_( - std::make_unique>( - std::move(ex), - std::move(ctx), - std::move(lgr))) + : impl_(std::make_unique>( + std::move(ex), + std::move(ctx), + std::move(lgr))) { } /** @brief Constructor from an executor and a logger. @@ -770,7 +763,7 @@ public: { return this->async_exec( req, - detail::make_any_adapter(resp), + any_adapter{resp}, std::forward(token)); } @@ -897,9 +890,9 @@ public: /// Sets the response object of @ref async_receive operations. template - void set_receive_response(Response& response) + void set_receive_response(Response& resp) { - impl_->set_receive_response(response); + impl_->set_receive_adapter(any_adapter{resp}); } /// Returns connection usage information. @@ -1076,7 +1069,10 @@ public: template auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {}) { - return async_exec(req, detail::make_any_adapter(resp), std::forward(token)); + return async_exec( + req, + any_adapter{resp}, + std::forward(token)); } /** diff --git a/include/boost/redis/detail/health_checker.hpp b/include/boost/redis/detail/health_checker.hpp index a1f6a177..c4a7eccb 100644 --- a/include/boost/redis/detail/health_checker.hpp +++ b/include/boost/redis/detail/health_checker.hpp @@ -51,7 +51,10 @@ public: } BOOST_ASIO_CORO_YIELD - conn_->async_exec(checker_->req_, make_any_adapter(checker_->resp_), std::move(self)); + conn_->async_exec( + checker_->req_, + any_adapter{checker_->resp_}, + std::move(self)); if (ec) { conn_->logger_.trace("ping_op (3)", ec); checker_->wait_timer_.cancel(); diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index 3fbaf755..1c495759 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -85,9 +85,11 @@ public: auto get_read_size() const -> std::size_t { return read_size_; } + auto get_remaining_responses() const -> std::size_t { return remaining_responses_; } + auto commit_response(std::size_t read_size) -> void; - auto get_adapter() -> any_adapter_wrapper& { return adapter_; } + auto get_adapter() -> any_adapter& { return adapter_; } private: enum class status @@ -99,8 +101,7 @@ public: }; request const* req_; - any_adapter_wrapper adapter_; - + any_adapter adapter_; std::function done_; // Contains the number of commands that haven't been read yet. @@ -153,7 +154,7 @@ public: return std::string_view{write_buffer_}; } - void set_receive_response(any_adapter adapter); + void set_receive_adapter(any_adapter adapter); [[nodiscard]] auto get_usage() const noexcept -> usage @@ -186,7 +187,7 @@ private: bool on_push_ = false; bool cancel_run_called_ = false; usage usage_; - any_adapter_wrapper receive_adapter_; + any_adapter receive_adapter_; }; auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr; diff --git a/include/boost/redis/detail/resp3_handshaker.hpp b/include/boost/redis/detail/resp3_handshaker.hpp index 7165a4dc..06eef7ae 100644 --- a/include/boost/redis/detail/resp3_handshaker.hpp +++ b/include/boost/redis/detail/resp3_handshaker.hpp @@ -42,8 +42,9 @@ struct hello_op { BOOST_ASIO_CORO_YIELD conn_->async_exec( handshaker_->hello_req_, - make_any_adapter(handshaker_->hello_resp_), + any_adapter{handshaker_->hello_resp_}, std::move(self)); + conn_->logger_.on_hello(ec, handshaker_->hello_resp_); if (ec) { diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp index db2b44a5..c77e2d86 100644 --- a/include/boost/redis/impl/multiplexer.ipp +++ b/include/boost/redis/impl/multiplexer.ipp @@ -13,7 +13,8 @@ namespace boost::redis::detail { multiplexer::elem::elem(request const& req, any_adapter adapter) : req_{&req} -, adapter_{any_adapter_wrapper{adapter, req.get_expected_responses()}} +, adapter_{std::move(adapter)} +, remaining_responses_{req.get_expected_responses()} , status_{status::waiting} , ec_{} , read_size_{0} @@ -104,7 +105,7 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code "Not waiting for a response (using MONITOR command perhaps?)"); BOOST_ASSERT(!reqs_.empty()); BOOST_ASSERT(reqs_.front() != nullptr); - BOOST_ASSERT(reqs_.front()->get_adapter().get_remaining_responses() != 0); + BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0); if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec)) return std::nullopt; @@ -116,7 +117,7 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code } reqs_.front()->commit_response(parser_.get_consumed()); - if (reqs_.front()->get_adapter().get_remaining_responses() == 0) { + if (reqs_.front()->get_remaining_responses() == 0) { // Done with this request. reqs_.front()->notify_done(); reqs_.pop_front(); @@ -268,7 +269,7 @@ bool multiplexer::is_next_push(std::string_view data) const noexcept // The request does not expect any response but we got one. This // may happen if for example, subscribe with wrong syntax. - if (reqs_.front()->get_adapter().get_remaining_responses() == 0) + if (reqs_.front()->get_remaining_responses() == 0) return true; // Added to deal with MONITOR and also to fix PR170 which @@ -308,9 +309,9 @@ bool multiplexer::is_waiting_response() const noexcept bool multiplexer::is_writing() const noexcept { return !write_buffer_.empty(); } -void multiplexer::set_receive_response(any_adapter adapter) +void multiplexer::set_receive_adapter(any_adapter adapter) { - receive_adapter_ = any_adapter_wrapper{std::move(adapter), static_cast(-1)}; + receive_adapter_ = std::move(adapter); } auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr diff --git a/test/test_any_adapter.cpp b/test/test_any_adapter.cpp index 36f0b042..f0345ac1 100644 --- a/test/test_any_adapter.cpp +++ b/test/test_any_adapter.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -16,7 +16,7 @@ using boost::redis::generic_response; using boost::redis::response; using boost::redis::ignore; using boost::redis::any_adapter; -using boost::redis::detail::make_any_adapter; +using boost::redis::any_adapter; BOOST_AUTO_TEST_CASE(any_adapter_response_types) { @@ -25,17 +25,17 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types) response r2; generic_response r3; - BOOST_CHECK_NO_THROW(make_any_adapter(r1)); - BOOST_CHECK_NO_THROW(make_any_adapter(r2)); - BOOST_CHECK_NO_THROW(make_any_adapter(r3)); - BOOST_CHECK_NO_THROW(make_any_adapter(ignore)); + BOOST_CHECK_NO_THROW(any_adapter{r1}); + BOOST_CHECK_NO_THROW(any_adapter{r2}); + BOOST_CHECK_NO_THROW(any_adapter{r3}); + BOOST_CHECK_NO_THROW(any_adapter{ignore}); } BOOST_AUTO_TEST_CASE(any_adapter_copy_move) { // any_adapter can be copied/moved response r; - auto ad1 = make_any_adapter(r); + auto ad1 = any_adapter{r}; // copy constructor auto ad2 = any_adapter(ad1); diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index 7d86f0c7..4c69cdda 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -31,7 +31,7 @@ using boost::redis::ignore; using boost::redis::operation; using boost::redis::request; using boost::redis::response; -using boost::redis::detail::make_any_adapter; +using boost::redis::any_adapter; using boost::system::error_code; using namespace std::chrono_literals; @@ -230,7 +230,7 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter) bool finished = false; - conn->async_exec(req, make_any_adapter(res), [&](error_code ec, std::size_t) { + conn->async_exec(req, res, [&](error_code ec, std::size_t) { BOOST_TEST(ec == error_code()); conn->cancel(); finished = true; diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp index 29a5e5c8..1d73864b 100644 --- a/test/test_exec_fsm.cpp +++ b/test/test_exec_fsm.cpp @@ -83,10 +83,7 @@ struct elem_and_request { { // Empty requests are not valid. The request needs to be populated before creating the element req.push("get", "mykey"); - - elm = std::make_shared( - req, - [](parse_event, resp3::node_view const&, error_code&) { }); + elm = std::make_shared(req, any_adapter{}); elm->set_done_callback([this] { ++done_calls; diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 0d59eb3f..9b198c4b 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -20,19 +20,19 @@ #include #include -using boost::redis::request; -using boost::redis::config; -using boost::redis::detail::push_hello; -using boost::redis::response; using boost::redis::adapter::adapt2; using boost::redis::adapter::result; -using boost::redis::resp3::detail::deserialize; -using boost::redis::ignore_t; +using boost::redis::config; using boost::redis::detail::multiplexer; +using boost::redis::detail::push_hello; using boost::redis::generic_response; +using boost::redis::ignore_t; +using boost::redis::request; +using boost::redis::resp3::detail::deserialize; using boost::redis::resp3::node; using boost::redis::resp3::to_string; -using boost::redis::detail::make_any_adapter; +using boost::redis::response; +using boost::redis::any_adapter; using boost::system::error_code; #define RESP3_SET_PART1 "~6\r\n+orange\r" @@ -265,7 +265,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push) { multiplexer mpx; generic_response resp; - mpx.set_receive_response(make_any_adapter(resp)); + mpx.set_receive_adapter(any_adapter{resp}); boost::system::error_code ec; auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec); @@ -287,7 +287,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) { multiplexer mpx; generic_response resp; - mpx.set_receive_response(make_any_adapter(resp)); + mpx.set_receive_adapter(any_adapter{resp}); std::string msg; // Only part of the message. @@ -323,7 +323,7 @@ struct test_item { // to Redis. req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg"); - elem_ptr = std::make_shared(req, make_any_adapter(resp)); + elem_ptr = std::make_shared(req, any_adapter{resp}); elem_ptr->set_done_callback([this]() { done = true; @@ -470,8 +470,7 @@ BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size) BOOST_AUTO_TEST_CASE(check_counter_adapter) { - using boost::redis::parse_event; - using boost::redis::detail::any_adapter_wrapper; + using boost::redis::any_adapter; using boost::redis::resp3::parse; using boost::redis::resp3::parser; using boost::redis::resp3::node_view; @@ -481,15 +480,15 @@ BOOST_AUTO_TEST_CASE(check_counter_adapter) int node = 0; int done = 0; - auto counter_adapter = [&](parse_event ev, node_view const&, error_code&) mutable { + auto counter_adapter = [&](any_adapter::parse_event ev, node_view const&, error_code&) mutable { switch (ev) { - case parse_event::init: init++; break; - case parse_event::node: node++; break; - case parse_event::done: done++; break; + case any_adapter::parse_event::init: init++; break; + case any_adapter::parse_event::node: node++; break; + case any_adapter::parse_event::done: done++; break; } }; - any_adapter_wrapper wrapped{counter_adapter, 1}; + any_adapter wrapped{any_adapter::impl_t{counter_adapter}}; error_code ec; parser p; diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index cda286df..db11df7f 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -23,7 +23,7 @@ using redis::detail::reader_fsm; using redis::detail::multiplexer; using redis::detail::read_buffer; using redis::generic_response; -using redis::detail::make_any_adapter; +using redis::any_adapter; using action = redis::detail::reader_fsm::action; namespace boost::redis::detail { @@ -45,7 +45,7 @@ void test_push() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(make_any_adapter(resp)); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -93,7 +93,7 @@ void test_read_needs_more() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(make_any_adapter(resp)); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -139,7 +139,7 @@ void test_read_error() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(make_any_adapter(resp)); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -170,7 +170,7 @@ void test_parse_error() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(make_any_adapter(resp)); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -201,7 +201,7 @@ void test_push_deliver_error() read_buffer rbuf; multiplexer mpx; generic_response resp; - mpx.set_receive_response(make_any_adapter(resp)); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -237,7 +237,7 @@ void test_max_read_buffer_size() rbuf.set_config({5, 7}); multiplexer mpx; generic_response resp; - mpx.set_receive_response(make_any_adapter(resp)); + mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act;