diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 9eb3ceca..7a3054eb 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -11,12 +11,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -87,13 +89,7 @@ struct connection_impl { timer_type ping_timer_; // to wait between pings receive_channel_type receive_channel_; asio::cancellation_signal run_signal_; - - config cfg_; - multiplexer mpx_; - connection_logger logger_; - generic_response setup_resp_; - request ping_req_; - generic_response ping_resp_; + connection_state st_; using executor_type = Executor; @@ -140,7 +136,7 @@ struct connection_impl { , reconnect_timer_{ex} , ping_timer_{ex} , receive_channel_{ex, 256} - , logger_{std::move(lgr)} + , st_{std::move(lgr)} { set_receive_adapter(any_adapter{ignore}); writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); @@ -149,10 +145,10 @@ struct connection_impl { void cancel(operation op) { switch (op) { - case operation::exec: mpx_.cancel_waiting(); break; + case operation::exec: st_.mpx.cancel_waiting(); break; case operation::receive: receive_channel_.cancel(); break; case operation::reconnection: - cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); + st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); break; case operation::run: case operation::resolve: @@ -160,10 +156,10 @@ struct connection_impl { case operation::ssl_handshake: case operation::health_check: cancel_run(); break; case operation::all: - mpx_.cancel_waiting(); // exec - receive_channel_.cancel(); // receive - cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect - cancel_run(); // run + st_.mpx.cancel_waiting(); // exec + receive_channel_.cancel(); // receive + st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect + cancel_run(); // run break; default: /* ignore */; } @@ -188,7 +184,7 @@ struct connection_impl { bool will_reconnect() const noexcept { - return cfg_.reconnect_wait_interval != std::chrono::seconds::zero(); + return st_.cfg.reconnect_wait_interval != std::chrono::seconds::zero(); } template @@ -202,12 +198,15 @@ struct connection_impl { }); return asio::async_compose( - exec_op{this, notifier, exec_fsm(mpx_, std::move(info))}, + exec_op{this, notifier, exec_fsm(st_.mpx, std::move(info))}, token, writer_timer_); } - void set_receive_adapter(any_adapter adapter) { mpx_.set_receive_adapter(std::move(adapter)); } + void set_receive_adapter(any_adapter adapter) + { + st_.mpx.set_receive_adapter(std::move(adapter)); + } }; template @@ -217,7 +216,7 @@ struct writer_op { explicit writer_op(connection_impl& conn) noexcept : conn_(&conn) - , fsm_(conn.mpx_, conn.logger_) + , fsm_(conn.st_.mpx, conn.st_.logger) { } template @@ -230,7 +229,7 @@ struct writer_op { case writer_action_type::write: asio::async_write( conn_->stream_, - asio::buffer(conn_->mpx_.get_write_buffer()), + asio::buffer(conn_->st_.mpx.get_write_buffer()), std::move(self)); return; case writer_action_type::wait: conn_->writer_timer_.async_wait(std::move(self)); return; @@ -246,7 +245,7 @@ struct reader_op { public: reader_op(connection_impl& conn) noexcept : conn_{&conn} - , fsm_{conn.mpx_} + , fsm_{conn.st_.mpx} { } template @@ -255,13 +254,13 @@ public: for (;;) { auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); - conn_->logger_.on_fsm_resume(act); + conn_->st_.logger.on_fsm_resume(act); switch (act.type_) { case reader_fsm::action::type::needs_more: case reader_fsm::action::type::read_some: { - auto const buf = conn_->mpx_.get_prepared_read_buffer(); + auto const buf = conn_->st_.mpx.get_prepared_read_buffer(); conn_->stream_.async_read_some(asio::buffer(buf), std::move(self)); } return; @@ -288,26 +287,26 @@ struct health_checker_op { { // Did we have a cancellation? We might not have an error code here if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) { - conn_->logger_.log(logger::level::info, "Health checker: cancelled"); + conn_->st_.logger.log(logger::level::info, "Health checker: cancelled"); return asio::error::operation_aborted; } // operation_aborted and no cancel state means that asio::cancel_after timed out if (io_ec == asio::error::operation_aborted) { - conn_->logger_.log(logger::level::info, "Health checker: ping timed out"); + conn_->st_.logger.log(logger::level::info, "Health checker: ping timed out"); return error::pong_timeout; } // Did we have other unknown error? if (io_ec) { - conn_->logger_.log(logger::level::info, "Health checker: ping error", io_ec); + conn_->st_.logger.log(logger::level::info, "Health checker: ping error", io_ec); return io_ec; } // Did the server answer with an error? - if (conn_->ping_resp_.has_error()) { - auto error = conn_->ping_resp_.error(); - conn_->logger_.log( + if (conn_->st_.ping_resp.has_error()) { + auto error = conn_->st_.ping_resp.error(); + conn_->st_.logger.log( logger::level::info, "Health checker: server answered ping with an error", error.diagnostic); @@ -328,8 +327,8 @@ public: { BOOST_ASIO_CORO_REENTER(coro_) { - if (conn_->cfg_.health_check_interval == std::chrono::seconds::zero()) { - conn_->logger_.trace("ping_op (1): timeout disabled."); + if (conn_->st_.cfg.health_check_interval == std::chrono::seconds::zero()) { + conn_->st_.logger.trace("ping_op (1): timeout disabled."); // Wait until we're cancelled. This simplifies parallel group handling a lot conn_->ping_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); @@ -340,16 +339,16 @@ public: for (;;) { // Clean up any previous leftover - clear_response(conn_->ping_resp_); + clear_response(conn_->st_.ping_resp); // Execute the request BOOST_ASIO_CORO_YIELD { auto* conn = conn_; // avoid use-after-move problems - auto timeout = conn->cfg_.health_check_interval; + auto timeout = conn->st_.cfg.health_check_interval; conn->async_exec( - conn->ping_req_, - any_adapter{conn->ping_resp_}, + conn->st_.ping_req, + any_adapter{conn->st_.ping_resp}, asio::cancel_after(conn->ping_timer_, timeout, std::move(self))); } @@ -361,13 +360,13 @@ public: } // Wait before pinging again. - conn_->ping_timer_.expires_after(conn_->cfg_.health_check_interval); + conn_->ping_timer_.expires_after(conn_->st_.cfg.health_check_interval); BOOST_ASIO_CORO_YIELD conn_->ping_timer_.async_wait(std::move(self)); if (is_cancelled(self)) { - conn_->logger_.trace("ping_op (2): cancelled"); + conn_->st_.logger.trace("ping_op (2): cancelled"); self.complete(asio::error::operation_aborted); return; } @@ -376,24 +375,6 @@ public: } }; -inline void compose_ping_request(const config& cfg, request& to) -{ - to.clear(); - to.push("PING", cfg.health_check_id); -} - -inline system::error_code check_config(const config& cfg) -{ - if (!cfg.unix_socket.empty()) { -#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS - return error::unix_sockets_unsupported; -#endif - if (cfg.use_ssl) - return error::unix_sockets_ssl_unsupported; - } - return system::error_code{}; -} - system::error_code translate_parallel_group_errors( std::array order, system::error_code setup_ec, @@ -404,16 +385,15 @@ system::error_code translate_parallel_group_errors( template class run_op { private: - connection_impl* conn_ = nullptr; - asio::coroutine coro_{}; - system::error_code stored_ec_; + connection_impl* conn_; + run_fsm fsm_{}; static system::error_code on_setup_finished( connection_impl& conn, system::error_code ec) { - ec = check_setup_response(ec, conn.setup_resp_); - conn.logger_.on_setup(ec, conn.setup_resp_); + ec = check_setup_response(ec, conn.st_.setup_resp); + conn.st_.logger.on_setup(ec, conn.st_.setup_resp); return ec; } @@ -422,11 +402,11 @@ private: { // clang-format off // Skip sending the setup request if it's empty - return asio::deferred_t::when(conn_->cfg_.setup.get_commands() != 0u) + return asio::deferred_t::when(conn_->st_.cfg.setup.get_commands() != 0u) .then( conn_->async_exec( - conn_->cfg_.setup, - any_adapter(conn_->setup_resp_), + conn_->st_.cfg.setup, + any_adapter(conn_->st_.setup_resp), asio::deferred([&conn = *this->conn_](system::error_code ec, std::size_t) { return asio::deferred.values(on_setup_finished(conn, ec)); }) @@ -488,102 +468,45 @@ public: template void operator()(Self& self, system::error_code ec = {}) { - BOOST_ASIO_CORO_REENTER(coro_) - { - // Check config - ec = check_config(conn_->cfg_); - if (ec) { - conn_->logger_.log(logger::level::err, "Invalid configuration", ec); - stored_ec_ = ec; - BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self)); - self.complete(stored_ec_); + auto act = fsm_.resume(conn_->st_, ec, self.get_cancellation_state().cancelled()); + + switch (act.type) { + case run_action_type::done: self.complete(act.ec); return; + case run_action_type::immediate: + asio::async_immediate(self.get_io_executor(), std::move(self)); return; - } - - // Compose the setup request. This only depends on the config, so it can be done just once - compose_setup_request(conn_->cfg_); - - // Compose the PING request. Same as above - compose_ping_request(conn_->cfg_, conn_->ping_req_); - - for (;;) { - // Try to connect - BOOST_ASIO_CORO_YIELD - conn_->stream_.async_connect(conn_->cfg_, conn_->logger_, std::move(self)); - - // Check for cancellations - if (is_cancelled(self)) { - self.complete(asio::error::operation_aborted); - return; - } - - // If we were successful, run all the connection tasks - if (!ec) { - conn_->mpx_.reset(); - clear_response(conn_->setup_resp_); - - // Note: Order is important here because the writer might - // trigger an async_write before the setup request is sent, - // causing other requests to be sent before the setup request, - // violating the setup request contract. - BOOST_ASIO_CORO_YIELD - asio::experimental::make_parallel_group( - [this](auto token) { - return this->send_setup(token); - }, - [this](auto token) { - return this->health_checker(token); - }, - [this](auto token) { - return this->reader(token); - }, - [this](auto token) { - return this->writer(token); - }) - .async_wait(asio::experimental::wait_for_one_error(), std::move(self)); - - // The parallel group result will be translated into a single error - // code by the specialized operator() overload - - // We've lost connection or otherwise been cancelled. - // Remove from the multiplexer the required requests. - conn_->mpx_.cancel_on_conn_lost(); - - // The receive operation must be cancelled because channel - // subscription does not survive a reconnection but requires - // re-subscription. - conn_->receive_channel_.cancel(); - } - - // If we are not going to try again, we're done - if (!conn_->will_reconnect()) { - self.complete(ec); - return; - } - - // Check for cancellations - if (is_cancelled(self)) { - self.complete(asio::error::operation_aborted); - return; - } - - // Wait for the reconnection interval - conn_->reconnect_timer_.expires_after(conn_->cfg_.reconnect_wait_interval); - BOOST_ASIO_CORO_YIELD + case run_action_type::connect: + conn_->stream_.async_connect(conn_->st_.cfg, conn_->st_.logger, std::move(self)); + return; + case run_action_type::parallel_group: + // Note: Order is important here because the writer might + // trigger an async_write before the setup request is sent, + // causing other requests to be sent before the setup request, + // violating the setup request contract. + asio::experimental::make_parallel_group( + [this](auto token) { + return this->send_setup(token); + }, + [this](auto token) { + return this->health_checker(token); + }, + [this](auto token) { + return this->reader(token); + }, + [this](auto token) { + return this->writer(token); + }) + .async_wait(asio::experimental::wait_for_one_error(), std::move(self)); + return; + case run_action_type::cancel_receive: + conn_->receive_channel_.cancel(); + (*this)(self); // this action does not require suspending + return; + case run_action_type::wait_for_reconnection: + conn_->reconnect_timer_.expires_after(conn_->st_.cfg.reconnect_wait_interval); conn_->reconnect_timer_.async_wait(std::move(self)); - - // Check for cancellations - if (is_cancelled(self)) { - self.complete(asio::error::operation_aborted); - return; - } - - // If we won't reconnect, exit - if (!conn_->will_reconnect()) { - self.complete(asio::error::operation_aborted); - return; - } - } + return; + default: BOOST_ASSERT(false); } } }; @@ -1076,7 +999,7 @@ public: } /// Returns connection usage information. - usage get_usage() const noexcept { return impl_->mpx_.get_usage(); } + usage get_usage() const noexcept { return impl_->st_.mpx.get_usage(); } private: using clock_type = std::chrono::steady_clock; @@ -1092,7 +1015,7 @@ private: // Used by both this class and connection void set_stderr_logger(logger::level lvl, const config& cfg) { - impl_->logger_.reset(detail::make_stderr_logger(lvl, cfg.log_prefix)); + impl_->st_.logger.reset(detail::make_stderr_logger(lvl, cfg.log_prefix)); } // Initiation for async_run. This is required because we need access @@ -1107,8 +1030,8 @@ private: template void operator()(Handler&& handler, config const* cfg) { - self->cfg_ = *cfg; - self->mpx_.set_config(*cfg); + self->st_.cfg = *cfg; + self->st_.mpx.set_config(*cfg); // If the token's slot has cancellation enabled, it should just emit // the cancellation signal in our connection. This lets us unify the cancel() diff --git a/include/boost/redis/detail/connection_state.hpp b/include/boost/redis/detail/connection_state.hpp new file mode 100644 index 00000000..688c56d5 --- /dev/null +++ b/include/boost/redis/detail/connection_state.hpp @@ -0,0 +1,33 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_CONNECTION_STATE_HPP +#define BOOST_REDIS_CONNECTION_STATE_HPP + +#include +#include +#include +#include +#include + +namespace boost::redis::detail { + +// Contains all the members in connection that don't depend on the Executor. +// Makes implementing sans-io algorithms easier +struct connection_state { + connection_logger logger; + config cfg{}; + multiplexer mpx{}; + generic_response setup_resp{}; + request ping_req{}; + generic_response ping_resp{}; +}; + +} // namespace boost::redis::detail + +#endif // BOOST_REDIS_CONNECTOR_HPP diff --git a/include/boost/redis/detail/run_fsm.hpp b/include/boost/redis/detail/run_fsm.hpp new file mode 100644 index 00000000..77a11e24 --- /dev/null +++ b/include/boost/redis/detail/run_fsm.hpp @@ -0,0 +1,65 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_RUN_FSM_HPP +#define BOOST_REDIS_RUN_FSM_HPP + +#include + +#include +#include + +// Sans-io algorithm for async_run, as a finite state machine + +namespace boost::redis::detail { + +// Forward decls +class connection_logger; +class multiplexer; + +// What should we do next? +enum class run_action_type +{ + done, // Call the final handler + immediate, // Call asio::async_immediate + connect, // Transport connection establishment + parallel_group, // Run the reader, writer and friends + cancel_receive, // Cancel the receiver channel + wait_for_reconnection, // Sleep for the reconnection period +}; + +struct run_action { + run_action_type type; + system::error_code ec; + + run_action(run_action_type type) noexcept + : type{type} + { } + + run_action(system::error_code ec) noexcept + : type{run_action_type::done} + , ec{ec} + { } +}; + +class run_fsm { + int resume_point_{0}; + system::error_code stored_ec_; + +public: + run_fsm() = default; + + run_action resume( + connection_state& st, + system::error_code ec, + asio::cancellation_type_t cancel_state); +}; + +} // namespace boost::redis::detail + +#endif // BOOST_REDIS_CONNECTOR_HPP diff --git a/include/boost/redis/impl/run_fsm.ipp b/include/boost/redis/impl/run_fsm.ipp new file mode 100644 index 00000000..31954063 --- /dev/null +++ b/include/boost/redis/impl/run_fsm.ipp @@ -0,0 +1,125 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include +#include + +#include +#include +#include // for BOOST_ASIO_HAS_LOCAL_SOCKETS +#include + +namespace boost::redis::detail { + +inline system::error_code check_config(const config& cfg) +{ + if (!cfg.unix_socket.empty()) { + if (cfg.use_ssl) + return error::unix_sockets_ssl_unsupported; +#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS + return error::unix_sockets_unsupported; +#endif + } + return system::error_code{}; +} + +inline void compose_ping_request(const config& cfg, request& to) +{ + to.clear(); + to.push("PING", cfg.health_check_id); +} + +run_action run_fsm::resume( + connection_state& st, + system::error_code ec, + asio::cancellation_type_t cancel_state) +{ + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + // Check config + ec = check_config(st.cfg); + if (ec) { + st.logger.log(logger::level::err, "Invalid configuration", ec); + stored_ec_ = ec; + BOOST_REDIS_YIELD(resume_point_, 1, run_action_type::immediate) + return stored_ec_; + } + + // Compose the setup request. This only depends on the config, so it can be done just once + compose_setup_request(st.cfg); + + // Compose the PING request. Same as above + compose_ping_request(st.cfg, st.ping_req); + + for (;;) { + // Try to connect + BOOST_REDIS_YIELD(resume_point_, 2, run_action_type::connect) + + // Check for cancellations + if (is_terminal_cancel(cancel_state)) { + st.logger.trace("Run: cancelled (1)"); + return system::error_code(asio::error::operation_aborted); + } + + // If we were successful, run all the connection tasks + if (!ec) { + // Initialization + st.mpx.reset(); + clear_response(st.setup_resp); + + // Run the tasks + BOOST_REDIS_YIELD(resume_point_, 3, run_action_type::parallel_group) + + // Store any error yielded by the tasks for later + stored_ec_ = ec; + + // We've lost connection or otherwise been cancelled. + // Remove from the multiplexer the required requests. + st.mpx.cancel_on_conn_lost(); + + // The receive operation must be cancelled because channel + // subscription does not survive a reconnection but requires + // re-subscription. + BOOST_REDIS_YIELD(resume_point_, 4, run_action_type::cancel_receive) + + // Restore the error + ec = stored_ec_; + } + + // Check for cancellations + if (is_terminal_cancel(cancel_state)) { + st.logger.trace("Run: cancelled (2)"); + return system::error_code(asio::error::operation_aborted); + } + + // If we are not going to try again, we're done + if (st.cfg.reconnect_wait_interval.count() == 0) { + return ec; + } + + // Wait for the reconnection interval + BOOST_REDIS_YIELD(resume_point_, 5, run_action_type::wait_for_reconnection) + + // Check for cancellations + if (is_terminal_cancel(cancel_state)) { + st.logger.trace("Run: cancelled (3)"); + return system::error_code(asio::error::operation_aborted); + } + } + } + + // We should never get here + BOOST_ASSERT(false); + return system::error_code(); +} + +} // namespace boost::redis::detail diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 715fd08a..a8a9dee2 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index cdabb61d..78a65ee1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -43,6 +43,7 @@ make_test(test_conn_logging) make_test(test_writer_fsm) make_test(test_reader_fsm) make_test(test_connect_fsm) +make_test(test_run_fsm) make_test(test_setup_request_utils) make_test(test_multiplexer) diff --git a/test/Jamfile b/test/Jamfile index 42245848..8e607279 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -59,6 +59,7 @@ local tests = test_conn_logging test_writer_fsm test_reader_fsm + test_run_fsm test_connect_fsm test_setup_request_utils test_multiplexer diff --git a/test/sansio_utils.cpp b/test/sansio_utils.cpp index 3851a01c..294763e5 100644 --- a/test/sansio_utils.cpp +++ b/test/sansio_utils.cpp @@ -57,12 +57,6 @@ std::ostream& operator<<(std::ostream& os, const log_message& v) return os << "log_message { .lvl=" << to_string(v.lvl) << ", .msg=" << v.msg << " }"; } -log_fixture::log_fixture() -: lgr{logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { - msgs.push_back({lvl, std::string(msg)}); -})} -{ } - void log_fixture::check_log(std::initializer_list expected, source_location loc) const { @@ -71,4 +65,11 @@ void log_fixture::check_log(std::initializer_list expected, s } } +logger log_fixture::make_logger() +{ + return logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { + msgs.push_back({lvl, std::string(msg)}); + }); +} + } // namespace boost::redis::detail diff --git a/test/sansio_utils.hpp b/test/sansio_utils.hpp index 7473a354..7428b116 100644 --- a/test/sansio_utils.hpp +++ b/test/sansio_utils.hpp @@ -7,7 +7,6 @@ #ifndef BOOST_REDIS_TEST_SANSIO_UTILS_HPP #define BOOST_REDIS_TEST_SANSIO_UTILS_HPP -#include #include #include @@ -38,12 +37,11 @@ struct log_message { struct log_fixture { std::vector msgs; - detail::connection_logger lgr; - log_fixture(); void check_log( std::initializer_list expected, source_location loc = BOOST_CURRENT_LOCATION) const; + logger make_logger(); }; } // namespace boost::redis::detail diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 73c5767e..641f67cf 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -105,6 +105,7 @@ auto resolver_data = [] { // Reduce duplication struct fixture : detail::log_fixture { config cfg; + connection_logger lgr{make_logger()}; connect_fsm fsm{cfg, lgr}; redis_stream_state st{}; diff --git a/test/test_run_fsm.cpp b/test/test_run_fsm.cpp new file mode 100644 index 00000000..cd18a611 --- /dev/null +++ b/test/test_run_fsm.cpp @@ -0,0 +1,475 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include +#include + +#include +#include // for BOOST_ASIO_HAS_LOCAL_SOCKETS +#include +#include + +#include "sansio_utils.hpp" + +#include +#include + +using namespace boost::redis; +namespace asio = boost::asio; +using detail::run_fsm; +using detail::multiplexer; +using detail::run_action_type; +using detail::run_action; +using boost::system::error_code; +using boost::asio::cancellation_type_t; +using detail::connection_logger; +using namespace std::chrono_literals; + +// Operators +static const char* to_string(run_action_type value) +{ + switch (value) { + case run_action_type::done: return "run_action_type::done"; + case run_action_type::immediate: return "run_action_type::immediate"; + case run_action_type::connect: return "run_action_type::connect"; + case run_action_type::parallel_group: return "run_action_type::parallel_group"; + case run_action_type::cancel_receive: return "run_action_type::cancel_receive"; + case run_action_type::wait_for_reconnection: return "run_action_type::wait_for_reconnection"; + default: return ""; + } +} + +namespace boost::redis::detail { + +std::ostream& operator<<(std::ostream& os, run_action_type type) +{ + os << to_string(type); + return os; +} + +bool operator==(const run_action& lhs, const run_action& rhs) noexcept +{ + return lhs.type == rhs.type && lhs.ec == rhs.ec; +} + +std::ostream& operator<<(std::ostream& os, const run_action& act) +{ + os << "run_action{ .type=" << act.type; + if (act.type == run_action_type::done) + os << ", .error=" << act.ec; + return os << " }"; +} + +} // namespace boost::redis::detail + +namespace { + +struct fixture : detail::log_fixture { + detail::connection_state st; + run_fsm fsm; + + fixture(config&& cfg = {}) + : st{make_logger(), std::move(cfg)} + { } +}; + +config config_no_reconnect() +{ + config res; + res.reconnect_wait_interval = 0s; + return res; +} + +// Config errors +#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS +void test_config_error_unix() +{ + // Setup + config cfg; + cfg.unix_socket = "/var/sock"; + fixture fix{std::move(cfg)}; + + // Launching the operation fails immediately + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::immediate); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::unix_sockets_unsupported)); + + // Log + fix.check_log({ + {logger::level::err, + "Invalid configuration: The configuration specified a UNIX socket address, but UNIX sockets " + "are not supported by the system. [boost.redis:24]"}, + }); +} +#endif + +void test_config_error_unix_ssl() +{ + // Setup + config cfg; + cfg.use_ssl = true; + cfg.unix_socket = "/var/sock"; + fixture fix{std::move(cfg)}; + + // Launching the operation fails immediately + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::immediate); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::unix_sockets_ssl_unsupported)); + + // Log + fix.check_log({ + {logger::level::err, + "Invalid configuration: The configuration specified UNIX sockets with SSL, which is not " + "supported. [boost.redis:25]"}, + }); +} + +// An error in connect with reconnection enabled triggers a reconnection +void test_connect_error() +{ + // Setup + fixture fix; + + // Launch the operation + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + + // Connect errors. We sleep and try to connect again + act = fix.fsm.resume(fix.st, error::connect_timeout, cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + + // This time we succeed and we launch the parallel group + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // Run doesn't log, it's the subordinate tasks that do + fix.check_log({}); +} + +// An error in connect without reconnection enabled makes the operation finish +void test_connect_error_no_reconnect() +{ + // Setup + fixture fix{config_no_reconnect()}; + + // Launch the operation + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + + // Connect errors. The operation finishes + act = fix.fsm.resume(fix.st, error::connect_timeout, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::connect_timeout)); + + // Run doesn't log, it's the subordinate tasks that do + fix.check_log({}); +} + +// A cancellation in connect makes the operation finish even with reconnection enabled +void test_connect_cancel() +{ + // Setup + fixture fix; + + // Launch the operation + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + + // Connect cancelled. The operation finishes + act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // We log on cancellation only + fix.check_log({ + {logger::level::debug, "Run: cancelled (1)"} + }); +} + +// Same, but only the cancellation is set +void test_connect_cancel_edge() +{ + // Setup + fixture fix; + + // Launch the operation + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + + // Connect cancelled. The operation finishes + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // We log on cancellation only + fix.check_log({ + {logger::level::debug, "Run: cancelled (1)"} + }); +} + +// An error in the parallel group triggers a reconnection +// (the parallel group always exits with an error) +void test_parallel_group_error() +{ + // Setup + fixture fix; + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // This exits with an error. We sleep and connect again + act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // Run doesn't log, it's the subordinate tasks that do + fix.check_log({}); +} + +// An error in the parallel group makes the operation exit if reconnection is disabled +void test_parallel_group_error_no_reconnect() +{ + // Setup + fixture fix{config_no_reconnect()}; + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // This exits with an error. We cancel the receive operation and exit + act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + + // Run doesn't log, it's the subordinate tasks that do + fix.check_log({}); +} + +// A cancellation in the parallel group makes it exit, even if reconnection is enabled. +// Parallel group tasks always exit with an error, so there is no edge case here +void test_parallel_group_cancel() +{ + // Setup + fixture fix; + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // This exits because the operation gets cancelled. Any receive operation gets cancelled + act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // We log on cancellation only + fix.check_log({ + {logger::level::debug, "Run: cancelled (2)"} + }); +} + +void test_parallel_group_cancel_no_reconnect() +{ + // Setup + fixture fix{config_no_reconnect()}; + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // This exits because the operation gets cancelled. Any receive operation gets cancelled + act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // We log on cancellation only + fix.check_log({ + {logger::level::debug, "Run: cancelled (2)"} + }); +} + +// If the reconnection wait gets cancelled, we exit +void test_wait_cancel() +{ + // Setup + fixture fix; + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // This exits with an error. We sleep + act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection); + + // We get cancelled during the sleep + act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // We log on cancellation only + fix.check_log({ + {logger::level::debug, "Run: cancelled (3)"} + }); +} + +void test_wait_cancel_edge() +{ + // Setup + fixture fix; + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // This exits with an error. We sleep + act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection); + + // We get cancelled during the sleep + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // We log on cancellation only + fix.check_log({ + {logger::level::debug, "Run: cancelled (3)"} + }); +} + +void test_several_reconnections() +{ + // Setup + fixture fix; + + // Run the operation. Connect errors and we sleep + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error::connect_timeout, cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection); + + // Connect again, this time successfully. We launch the tasks + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // This exits with an error. We sleep and connect again + act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // Exit with cancellation + act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + + // The cancellation was logged + fix.check_log({ + {logger::level::debug, "Run: cancelled (2)"} + }); +} + +// Setup and ping requests are only composed once at startup +void test_setup_ping_requests() +{ + // Setup + config cfg; + cfg.health_check_id = "some_value"; + cfg.username = "foo"; + cfg.password = "bar"; + cfg.clientname = ""; + fixture fix{std::move(cfg)}; + + // Run the operation. We connect and launch the tasks + auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // At this point, the requests are set up + const std::string_view expected_ping = "*2\r\n$4\r\nPING\r\n$10\r\nsome_value\r\n"; + const std::string_view + expected_setup = "*5\r\n$5\r\nHELLO\r\n$1\r\n3\r\n$4\r\nAUTH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"; + BOOST_TEST_EQ(fix.st.ping_req.payload(), expected_ping); + BOOST_TEST_EQ(fix.st.cfg.setup.payload(), expected_setup); + + // Reconnect + act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::cancel_receive); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::connect); + act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none); + BOOST_TEST_EQ(act, run_action_type::parallel_group); + + // The requests haven't been modified + BOOST_TEST_EQ(fix.st.ping_req.payload(), expected_ping); + BOOST_TEST_EQ(fix.st.cfg.setup.payload(), expected_setup); +} + +} // namespace + +int main() +{ +#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS + test_config_error_unix(); +#endif + test_config_error_unix_ssl(); + + test_connect_error(); + test_connect_error_no_reconnect(); + test_connect_cancel(); + test_connect_cancel_edge(); + + test_parallel_group_error(); + test_parallel_group_error_no_reconnect(); + test_parallel_group_cancel(); + test_parallel_group_cancel_no_reconnect(); + + test_wait_cancel(); + test_wait_cancel_edge(); + + test_several_reconnections(); + test_setup_ping_requests(); + + return boost::report_errors(); +} diff --git a/test/test_writer_fsm.cpp b/test/test_writer_fsm.cpp index 96e2a9ba..61674693 100644 --- a/test/test_writer_fsm.cpp +++ b/test/test_writer_fsm.cpp @@ -90,6 +90,7 @@ struct test_elem { struct fixture : detail::log_fixture { multiplexer mpx; + connection_logger lgr{make_logger()}; writer_fsm fsm{mpx, lgr}; };