From 6be0d122fb516278e857e423711a58204d2dc866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anarthal=20=28Rub=C3=A9n=20P=C3=A9rez=29?= <34971811+anarthal@users.noreply.github.com> Date: Mon, 20 Oct 2025 15:56:46 +0200 Subject: [PATCH] Moves the setup request execution to run_fsm (#333) Adds unit tests to cover setup request execution in run_fsm Entails no functional change --- include/boost/redis/connection.hpp | 51 +-------- .../boost/redis/detail/connection_logger.hpp | 3 +- .../boost/redis/detail/connection_state.hpp | 4 +- .../redis/detail/setup_request_utils.hpp | 3 - include/boost/redis/impl/connection.ipp | 25 ----- .../boost/redis/impl/connection_logger.ipp | 6 +- include/boost/redis/impl/error.ipp | 3 +- include/boost/redis/impl/run_fsm.ipp | 44 +++++++- .../boost/redis/impl/setup_request_utils.ipp | 11 -- test/test_run_fsm.cpp | 102 +++++++++++++++++- test/test_setup_request_utils.cpp | 31 ------ 11 files changed, 156 insertions(+), 127 deletions(-) diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 993bc526..7ad01f29 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -286,48 +287,12 @@ public: } }; -system::error_code translate_parallel_group_errors( - std::array order, - system::error_code setup_ec, - system::error_code reader_ec, - system::error_code writer_ec); - template class run_op { private: connection_impl* conn_; run_fsm fsm_{}; - static system::error_code on_setup_finished( - connection_impl& conn, - system::error_code ec) - { - ec = check_setup_response(ec, conn.st_.setup_resp); - conn.st_.logger.on_setup(ec, conn.st_.setup_resp); - return ec; - } - - template - auto send_setup(CompletionToken&& token) - { - // clang-format off - // Skip sending the setup request if it's empty - return asio::deferred_t::when(conn_->st_.cfg.setup.get_commands() != 0u) - .then( - conn_->async_exec( - conn_->st_.cfg.setup, - any_adapter(conn_->st_.setup_resp), - asio::deferred([&conn = *this->conn_](system::error_code ec, std::size_t) { - return asio::deferred.values(on_setup_finished(conn, ec)); - }) - ) - ) - .otherwise(asio::deferred.values(system::error_code())) - (std::forward(token)) - ; - // clang-format on - } - template auto reader(CompletionToken&& token) { @@ -355,12 +320,11 @@ public: template void operator()( Self& self, - std::array order, - system::error_code setup_ec, + std::array order, system::error_code reader_ec, system::error_code writer_ec) { - (*this)(self, translate_parallel_group_errors(order, setup_ec, reader_ec, writer_ec)); + (*this)(self, order[0u] == 0u ? reader_ec : writer_ec); } template @@ -377,21 +341,14 @@ public: conn_->stream_.async_connect(conn_->st_.cfg, conn_->st_.logger, std::move(self)); return; case run_action_type::parallel_group: - // Note: Order is important here because the writer might - // trigger an async_write before the setup request is sent, - // causing other requests to be sent before the setup request, - // violating the setup request contract. asio::experimental::make_parallel_group( - [this](auto token) { - return this->send_setup(token); - }, [this](auto token) { return this->reader(token); }, [this](auto token) { return this->writer(token); }) - .async_wait(asio::experimental::wait_for_one_error(), std::move(self)); + .async_wait(asio::experimental::wait_for_one(), std::move(self)); return; case run_action_type::cancel_receive: conn_->receive_channel_.cancel(); diff --git a/include/boost/redis/detail/connection_logger.hpp b/include/boost/redis/detail/connection_logger.hpp index b1cb28e6..7dbb158c 100644 --- a/include/boost/redis/detail/connection_logger.hpp +++ b/include/boost/redis/detail/connection_logger.hpp @@ -8,7 +8,6 @@ #define BOOST_REDIS_CONNECTION_LOGGER_HPP #include -#include #include #include @@ -38,7 +37,7 @@ public: void on_ssl_handshake(system::error_code const& ec); void on_write(std::size_t n); void on_read(system::error_code const& ec, std::size_t n); - void on_setup(system::error_code const& ec, generic_response const& resp); + void on_setup(system::error_code const& ec, std::string_view diagnostic); void log(logger::level lvl, std::string_view msg); void log(logger::level lvl, std::string_view msg1, std::string_view msg2); void log(logger::level lvl, std::string_view op, system::error_code const& ec); diff --git a/include/boost/redis/detail/connection_state.hpp b/include/boost/redis/detail/connection_state.hpp index 8e13e43c..addf9f3f 100644 --- a/include/boost/redis/detail/connection_state.hpp +++ b/include/boost/redis/detail/connection_state.hpp @@ -15,6 +15,8 @@ #include #include +#include + namespace boost::redis::detail { // Contains all the members in connection that don't depend on the Executor. @@ -23,7 +25,7 @@ struct connection_state { connection_logger logger; config cfg{}; multiplexer mpx{}; - generic_response setup_resp{}; + std::string setup_diagnostic{}; request ping_req{}; }; diff --git a/include/boost/redis/detail/setup_request_utils.hpp b/include/boost/redis/detail/setup_request_utils.hpp index bd8a3bea..993c5b42 100644 --- a/include/boost/redis/detail/setup_request_utils.hpp +++ b/include/boost/redis/detail/setup_request_utils.hpp @@ -20,9 +20,6 @@ void compose_setup_request(config& cfg); // Completely clears a response void clear_response(generic_response& res); -// Checks that the response to the setup request was successful -system::error_code check_setup_response(system::error_code io_ec, const generic_response&); - } // namespace boost::redis::detail #endif // BOOST_REDIS_RUNNER_HPP diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index f127a469..088b1ac3 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -21,31 +21,6 @@ logger detail::make_stderr_logger(logger::level lvl, std::string prefix) }); } -system::error_code detail::translate_parallel_group_errors( - std::array order, - system::error_code setup_ec, - system::error_code reader_ec, - system::error_code writer_ec) -{ - // The setup request is special: it might complete successfully, - // without causing the other tasks to exit. - // The other tasks will always complete with an error. - - // If the setup task errored and was the first to exit, use its code - if (order[0] == 0u && setup_ec) { - return setup_ec; - } - - // Use the code corresponding to the task that finished first, - // excluding the setup task - std::size_t task_number = order[0] == 0u ? order[1] : order[0]; - switch (task_number) { - case 1u: return reader_ec; - case 2u: return writer_ec; - default: BOOST_ASSERT(false); return system::error_code(); - } -} - connection::connection(executor_type ex, asio::ssl::context ctx, logger lgr) : impl_{std::move(ex), std::move(ctx), std::move(lgr)} { } diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index 968f1705..3ff24e9b 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -142,7 +142,7 @@ void connection_logger::on_read(system::error_code const& ec, std::size_t bytes_ logger_.fn(logger::level::debug, msg_); } -void connection_logger::on_setup(system::error_code const& ec, generic_response const& resp) +void connection_logger::on_setup(system::error_code const& ec, std::string_view diagnostic) { if (logger_.lvl < logger::level::info) return; @@ -150,9 +150,9 @@ void connection_logger::on_setup(system::error_code const& ec, generic_response msg_ = "Setup request execution: "; if (ec) { format_error_code(ec, msg_); - if (resp.has_error()) { + if (!diagnostic.empty()) { msg_ += " ("; - msg_ += resp.error().diagnostic; + msg_ += diagnostic; msg_ += ')'; } } else { diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index dcd7ec67..0ac39952 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -45,7 +45,8 @@ struct error_category_impl : system::error_category { return "Can't receive server push synchronously without blocking."; case error::incompatible_node_depth: return "Incompatible node depth."; case error::resp3_hello: - return "The setup request sent during connection establishment failed."; + return "The server response to the setup request sent during connection establishment " + "contains an error."; case error::unix_sockets_unsupported: return "The configuration specified a UNIX socket address, but UNIX sockets are not " "supported by the system."; diff --git a/include/boost/redis/impl/run_fsm.ipp b/include/boost/redis/impl/run_fsm.ipp index 31954063..bdc0dcd7 100644 --- a/include/boost/redis/impl/run_fsm.ipp +++ b/include/boost/redis/impl/run_fsm.ipp @@ -6,8 +6,11 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // +#include #include +#include #include +#include #include #include #include @@ -37,6 +40,36 @@ inline void compose_ping_request(const config& cfg, request& to) to.push("PING", cfg.health_check_id); } +inline void process_setup_node( + connection_state& st, + resp3::basic_node const& nd, + system::error_code& ec) +{ + switch (nd.data_type) { + case resp3::type::simple_error: + case resp3::type::blob_error: + case resp3::type::null: + ec = redis::error::resp3_hello; + st.setup_diagnostic = nd.value; + break; + default:; + } +} + +inline any_adapter make_setup_adapter(connection_state& st) +{ + return any_adapter{ + [&st](any_adapter::parse_event evt, resp3::node_view const& nd, system::error_code& ec) { + if (evt == any_adapter::parse_event::node) + process_setup_node(st, nd, ec); + }}; +} + +inline void on_setup_done(const multiplexer::elem& elm, connection_state& st) +{ + st.logger.on_setup(elm.get_error(), st.setup_diagnostic); +} + run_action run_fsm::resume( connection_state& st, system::error_code ec, @@ -74,7 +107,16 @@ run_action run_fsm::resume( if (!ec) { // Initialization st.mpx.reset(); - clear_response(st.setup_resp); + st.setup_diagnostic.clear(); + + // Add the setup request to the multiplexer + if (st.cfg.setup.get_commands() != 0u) { + auto elm = make_elem(st.cfg.setup, make_setup_adapter(st)); + elm->set_done_callback([&elem_ref = *elm, &st] { + on_setup_done(elem_ref, st); + }); + st.mpx.add(elm); + } // Run the tasks BOOST_REDIS_YIELD(resume_point_, 3, run_action_type::parallel_group) diff --git a/include/boost/redis/impl/setup_request_utils.ipp b/include/boost/redis/impl/setup_request_utils.ipp index e1402ec2..c8f562f5 100644 --- a/include/boost/redis/impl/setup_request_utils.ipp +++ b/include/boost/redis/impl/setup_request_utils.ipp @@ -54,15 +54,4 @@ void clear_response(generic_response& res) res.emplace(); } -system::error_code check_setup_response(system::error_code io_ec, const generic_response& resp) -{ - if (io_ec) - return io_ec; - - if (resp.has_error()) - return error::resp3_hello; - - return system::error_code(); -} - } // namespace boost::redis::detail diff --git a/test/test_run_fsm.cpp b/test/test_run_fsm.cpp index cd18a611..117b4e78 100644 --- a/test/test_run_fsm.cpp +++ b/test/test_run_fsm.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -76,14 +77,22 @@ struct fixture : detail::log_fixture { detail::connection_state st; run_fsm fsm; - fixture(config&& cfg = {}) + static config default_config() + { + config res; + res.use_setup = true; + res.setup.clear(); + return res; + } + + fixture(config&& cfg = default_config()) : st{make_logger(), std::move(cfg)} { } }; config config_no_reconnect() { - config res; + auto res = fixture::default_config(); res.reconnect_wait_interval = 0s; return res; } @@ -446,6 +455,91 @@ void test_setup_ping_requests() BOOST_TEST_EQ(fix.st.cfg.setup.payload(), expected_setup); } +// We correctly send and log the setup request +void test_setup_request_success() +{ + // Setup + fixture fix; + fix.st.cfg.setup.clear(); + fix.st.cfg.setup.push("HELLO", 3); + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // At this point, the setup request should be already queued. Simulate the writer + BOOST_TEST_EQ(fix.st.mpx.prepare_write(), 1u); + BOOST_TEST(fix.st.mpx.commit_write(fix.st.mpx.get_write_buffer().size())); + + // Simulate a successful read + read(fix.st.mpx, "+OK\r\n"); + error_code ec; + auto res = fix.st.mpx.consume(ec); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST(res.first == detail::consume_result::got_response); + + // Check log + fix.check_log({ + {logger::level::info, "Setup request execution: success"} + }); +} + +// We don't send empty setup requests +void test_setup_request_empty() +{ + // Setup + fixture fix; + fix.st.cfg.setup.clear(); + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // Nothing was added to the multiplexer + BOOST_TEST_EQ(fix.st.mpx.prepare_write(), 0u); + + // Check log + fix.check_log({}); +} + +// A server error would cause the reader to exit +void test_setup_request_server_error() +{ + // Setup + fixture fix; + fix.st.setup_diagnostic = "leftover"; // simulate a leftover from previous runs + fix.st.cfg.setup.clear(); + fix.st.cfg.setup.push("HELLO", 3); + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // At this point, the setup request should be already queued. Simulate the writer + BOOST_TEST_EQ(fix.st.mpx.prepare_write(), 1u); + BOOST_TEST(fix.st.mpx.commit_write(fix.st.mpx.get_write_buffer().size())); + + // Simulate a successful read + read(fix.st.mpx, "-ERR: wrong command\r\n"); + error_code ec; + auto res = fix.st.mpx.consume(ec); + BOOST_TEST_EQ(ec, error::resp3_hello); + BOOST_TEST(res.first == detail::consume_result::got_response); + + // Check log + fix.check_log({ + {logger::level::info, + "Setup request execution: The server response to the setup request sent during connection " + "establishment contains an error. [boost.redis:23] (ERR: wrong command)"} + }); +} + } // namespace int main() @@ -471,5 +565,9 @@ int main() test_several_reconnections(); test_setup_ping_requests(); + test_setup_request_success(); + test_setup_request_empty(); + test_setup_request_server_error(); + return boost::report_errors(); } diff --git a/test/test_setup_request_utils.cpp b/test/test_setup_request_utils.cpp index b9aa1c70..2e2860a3 100644 --- a/test/test_setup_request_utils.cpp +++ b/test/test_setup_request_utils.cpp @@ -22,7 +22,6 @@ namespace asio = boost::asio; namespace redis = boost::redis; using redis::detail::compose_setup_request; using redis::detail::clear_response; -using redis::detail::check_setup_response; using boost::system::error_code; namespace { @@ -174,32 +173,6 @@ void test_clear_response_error() BOOST_TEST_EQ(resp.value().size(), 0u); } -// check response -void test_check_response_success() -{ - redis::generic_response resp; - resp->push_back({}); - auto ec = check_setup_response(error_code(), resp); - BOOST_TEST_EQ(ec, error_code()); -} - -void test_check_response_io_error() -{ - redis::generic_response resp; - auto ec = check_setup_response(asio::error::already_open, resp); - BOOST_TEST_EQ(ec, asio::error::already_open); -} - -void test_check_response_server_error() -{ - redis::generic_response resp{ - boost::system::in_place_error, - redis::adapter::error{redis::resp3::type::simple_error, "wrong password"} - }; - auto ec = check_setup_response(error_code(), resp); - BOOST_TEST_EQ(ec, redis::error::resp3_hello); -} - } // namespace int main() @@ -217,9 +190,5 @@ int main() test_clear_response_nonempty(); test_clear_response_error(); - test_check_response_success(); - test_check_response_io_error(); - test_check_response_server_error(); - return boost::report_errors(); } \ No newline at end of file